我有一个构建在Expo React Native上的前端,我使用Nod.js与我的Azure IoT Hub在我的后端使用Express.js进行路由。我从前端使用REST API。
我的应用程序有多个页面,在其中一个页面上,我需要能够从IoT Hub获取实时数据。我在网上读到REST API不适合实时数据,WebSocket是最好的选择。
我正在使用Azure示例中的code连接到Azure上的端点,但我不确定如何将其合并到server.js或前端中。我仍然希望在我的页面的其余部分使用REST API。
iothubConnectionStringWebsockets.js --改编自link和this
const crypto = require("crypto");
const { Buffer } = require("buffer");
const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise");
const rheaPromise = require("rhea-promise");
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
const WebSocket = require("ws");
// Load the .env file if it exists
require("dotenv").config();
/**
* Type guard for AmqpError.
* @param err - An unknown error.
*/
function isAmqpError(err) {
return rheaPromise.isAmqpError(err);
}
// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) {
resourceUri = encodeURIComponent(resourceUri);
const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
const toSign = resourceUri + "\n" + expiresInSeconds;
// Use the crypto module to create the hmac.
const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
hmac.update(toSign);
const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));
// Construct authorization string.
return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}
/**
* Converts an IotHub Connection string into an Event Hubs-compatible connection string.
* @param connectionString - An IotHub connection string in the format:
* `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
* @returns An Event Hubs-compatible connection string in the format:
* `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
*/
async function convertIotHubToEventHubsConnectionString(connectionString) {
const { HostName, SharedAccessKeyName, SharedAccessKey } =
parseConnectionString(connectionString);
// Verify that the required info is in the connection string.
if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
throw new Error(`Invalid IotHub connection string.`);
}
//Extract the IotHub name from the hostname.
const [iotHubName] = HostName.split(".");
if (!iotHubName) {
throw new Error(`Unable to extract the IotHub name from the connection string.`);
}
// Generate a token to authenticate to the service.
// The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
const token = generateSasToken(
`${HostName}/messages/events`,
SharedAccessKey,
SharedAccessKeyName,
5 // token expires in 5 minutes
);
const connection = new Connection({
transport: "tls",
host: HostName,
hostname: HostName,
username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
port: 443,
reconnect: false,
password: token,
webSocketOptions: {
webSocket: WebSocket,
protocol: ["AMQPWSB10"],
url: `wss://${HostName}:${443}/$servicebus/websocket`,
},
});
await connection.open();
// Create the receiver that will trigger a redirect error.
const receiver = await connection.createReceiver({
source: { address: `amqps://${HostName}/messages/events/$management` },
});
return new Promise((resolve, reject) => {
receiver.on(ReceiverEvents.receiverError, (context) => {
const error = context.receiver && context.receiver.error;
if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
const hostname = error.info && error.info.hostname;
const parsedAddress = error.info.address.match(/5671\/(.*)\/\$management/i);
if (!hostname) {
reject(error);
}
else if (parsedAddress == undefined || (parsedAddress && parsedAddress[1] == undefined)) {
const msg = `Cannot parse the EventHub name from the given address: ${error.info.address} in the error: ` +
`${error.stack}\n${JSON.stringify(error.info)}.\nThe parsed result is: ${JSON.stringify(parsedAddress)}.`;
reject(Error(msg));
}
else {
const entityPath = parsedAddress[1];
resolve(`Endpoint=sb://${hostname}/;EntityPath=${entityPath};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`);
}
} else {
reject(error);
}
connection.close().catch(() => {
/* ignore error */
});
});
});
}
class EventHubReader {
constructor(iotHubConnectionString, consumerGroup) {
this.iotHubConnectionString = iotHubConnectionString;
this.consumerGroup = consumerGroup;
}
async startReadMessage(startReadMessageCallback) {
try {
const eventHubConnectionString = await convertIotHubToEventHubsConnectionString(this.iotHubConnectionString);
const consumerClient = new EventHubConsumerClient(this.consumerGroup, eventHubConnectionString);
console.log('Successfully created the EventHubConsumerClient from IoT Hub event hub-compatible connection string.');
const partitionIds = await consumerClient.getPartitionIds();
console.log('The partition ids are: ', partitionIds);
consumerClient.subscribe({
processEvents: (events, context) => {
for (let i = 0; i < events.length; ++i) {
startReadMessageCallback(
events[i].body,
events[i].enqueuedTimeUtc,
events[i].systemProperties["iothub-connection-device-id"]);
}
},
processError: (err, context) => {
console.error(err.message || err);
}
});
} catch (ex) {
console.error(ex.message || ex);
}
}
// Close connection to Event Hub.
async stopReadMessage() {
const disposeHandlers = [];
this.receiveHandlers.forEach((receiveHandler) => {
disposeHandlers.push(receiveHandler.stop());
});
await Promise.all(disposeHandlers);
this.consumerClient.close();
}
}
module.exports = EventHubReader;
字符串
server.js
const express = require('express');
const cors = require('cors');
const PORT = 3000;
const app = express();
app.use(express.json());
app.use(cors());
app.get('/home', (req, res) => {
// I only want to get real time data on this page
});
app.get('/login', (req, res) => {
});
app.post('/control', (req, res) => {
});
app.post('/configure', (req, res) => {
});
app.post('/settings', (req, res) => {
});
app.listen(PORT, () => {
console.log("Server Listening on PORT:", PORT);
});
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
wss.broadcast = (data) => {
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
try {
console.log(`Broadcasting data ${data}`);
client.send(data);
} catch (e) {
console.error(e);
}
}
});
};
server.listen(PORT, () => {
console.log('Listening on %d.', server.address().port);
});
const eventHubReader = new EventHubReader(iotHubConnectionString, eventHubConsumerGroup);
(async () => {
await eventHubReader.startReadMessage((message, date, deviceId) => {
try {
const payload = {
IotData: message,
MessageDate: date || Date.now().toISOString(),
DeviceId: deviceId,
};
wss.broadcast(JSON.stringify(payload));
} catch (err) {
console.error('Error broadcasting: [%s] from [%s].', err, message);
}
});
})().catch();
型
home.js
// how to connect with backend to use the websocket?
型
我不完全确定这个路由的WebSocket应该如何与我的其他路由和页面相关。它是否会影响服务器对其余页面的工作方式?我该怎么把这个写进去?
1条答案
按热度按时间ars1skjm1#
它是否会影响服务器对其余页面的工作方式?我该怎么把这个写进去?
字符串
home.js:
型
有关更多详细信息,请参阅此WebSocket与React-Router和SO的连接