如何将WebSocket整合到我的Expo应用程序的一个路由中(在我的后端使用Express.js进行路由和REST API)

aor9mmx1  于 2023-08-05  发布在  其他
关注(0)|答案(1)|浏览(72)

我有一个构建在Expo React Native上的前端,我使用Nod.js与我的Azure IoT Hub在我的后端使用Express.js进行路由。我从前端使用REST API。
我的应用程序有多个页面,在其中一个页面上,我需要能够从IoT Hub获取实时数据。我在网上读到REST API不适合实时数据,WebSocket是最好的选择。
我正在使用Azure示例中的code连接到Azure上的端点,但我不确定如何将其合并到server.js或前端中。我仍然希望在我的页面的其余部分使用REST API。
iothubConnectionStringWebsockets.js --改编自link和this

const crypto = require("crypto");
const { Buffer } = require("buffer");
const { Connection, ReceiverEvents, parseConnectionString } = require("rhea-promise");
const rheaPromise = require("rhea-promise");
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
const WebSocket = require("ws");

// Load the .env file if it exists
require("dotenv").config();

/**
 * Type guard for AmqpError.
 * @param err - An unknown error.
 */
function isAmqpError(err) {
  return rheaPromise.isAmqpError(err);
}

// This code is modified from https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens.
function generateSasToken(resourceUri, signingKey, policyName, expiresInMins) {
  resourceUri = encodeURIComponent(resourceUri);

  const expiresInSeconds = Math.ceil(Date.now() / 1000 + expiresInMins * 60);
  const toSign = resourceUri + "\n" + expiresInSeconds;

  // Use the crypto module to create the hmac.
  const hmac = crypto.createHmac("sha256", Buffer.from(signingKey, "base64"));
  hmac.update(toSign);
  const base64UriEncoded = encodeURIComponent(hmac.digest("base64"));

  // Construct authorization string.
  return `SharedAccessSignature sr=${resourceUri}&sig=${base64UriEncoded}&se=${expiresInSeconds}&skn=${policyName}`;
}

/**
 * Converts an IotHub Connection string into an Event Hubs-compatible connection string.
 * @param connectionString - An IotHub connection string in the format:
 * `"HostName=<your-iot-hub>.azure-devices.net;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
 * @returns An Event Hubs-compatible connection string in the format:
 * `"Endpoint=sb://<hostname>;EntityPath=<your-iot-hub>;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>"`
 */
async function convertIotHubToEventHubsConnectionString(connectionString) {
  const { HostName, SharedAccessKeyName, SharedAccessKey } =
    parseConnectionString(connectionString);

  // Verify that the required info is in the connection string.
  if (!HostName || !SharedAccessKey || !SharedAccessKeyName) {
    throw new Error(`Invalid IotHub connection string.`);
  }

  //Extract the IotHub name from the hostname.
  const [iotHubName] = HostName.split(".");

  if (!iotHubName) {
    throw new Error(`Unable to extract the IotHub name from the connection string.`);
  }

  // Generate a token to authenticate to the service.
  // The code for generateSasToken can be found at https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-security#security-tokens
  const token = generateSasToken(
    `${HostName}/messages/events`,
    SharedAccessKey,
    SharedAccessKeyName,
    5 // token expires in 5 minutes
  );

  const connection = new Connection({
    transport: "tls",
    host: HostName,
    hostname: HostName,
    username: `${SharedAccessKeyName}@sas.root.${iotHubName}`,
    port: 443,
    reconnect: false,
    password: token,
    webSocketOptions: {
      webSocket: WebSocket,
      protocol: ["AMQPWSB10"],
      url: `wss://${HostName}:${443}/$servicebus/websocket`,
    },
  });
  await connection.open();

  // Create the receiver that will trigger a redirect error.
  const receiver = await connection.createReceiver({
    source: { address: `amqps://${HostName}/messages/events/$management` },
  });

  return new Promise((resolve, reject) => {
    receiver.on(ReceiverEvents.receiverError, (context) => {
      const error = context.receiver && context.receiver.error;
      if (isAmqpError(error) && error.condition === "amqp:link:redirect") {
        const hostname = error.info && error.info.hostname;
        const parsedAddress = error.info.address.match(/5671\/(.*)\/\$management/i);

        if (!hostname) {
          reject(error);
        } 
        else if (parsedAddress == undefined || (parsedAddress && parsedAddress[1] == undefined)) {
          const msg = `Cannot parse the EventHub name from the given address: ${error.info.address} in the error: ` +
            `${error.stack}\n${JSON.stringify(error.info)}.\nThe parsed result is: ${JSON.stringify(parsedAddress)}.`;
          reject(Error(msg));
        } 
        else {
          const entityPath = parsedAddress[1];
          resolve(`Endpoint=sb://${hostname}/;EntityPath=${entityPath};SharedAccessKeyName=${SharedAccessKeyName};SharedAccessKey=${SharedAccessKey}`);
        }
      } else {
        reject(error);
      }
      connection.close().catch(() => {
        /* ignore error */
      });
    });
  });
}

class EventHubReader {
  constructor(iotHubConnectionString, consumerGroup) {
    this.iotHubConnectionString = iotHubConnectionString;
    this.consumerGroup = consumerGroup;
  }

  async startReadMessage(startReadMessageCallback) {
    try {
      const eventHubConnectionString = await convertIotHubToEventHubsConnectionString(this.iotHubConnectionString);
      const consumerClient = new EventHubConsumerClient(this.consumerGroup, eventHubConnectionString);
      console.log('Successfully created the EventHubConsumerClient from IoT Hub event hub-compatible connection string.');

      const partitionIds = await consumerClient.getPartitionIds();
      console.log('The partition ids are: ', partitionIds);

      consumerClient.subscribe({
        processEvents: (events, context) => {
          for (let i = 0; i < events.length; ++i) {
            startReadMessageCallback(
              events[i].body,
              events[i].enqueuedTimeUtc,
              events[i].systemProperties["iothub-connection-device-id"]);
          }
        },
        processError: (err, context) => {
          console.error(err.message || err);
        }
      });
    } catch (ex) {
      console.error(ex.message || ex);
    }
  }

  // Close connection to Event Hub.
  async stopReadMessage() {
    const disposeHandlers = [];
    this.receiveHandlers.forEach((receiveHandler) => {
      disposeHandlers.push(receiveHandler.stop());
    });
    await Promise.all(disposeHandlers);

    this.consumerClient.close();
  }
}

module.exports = EventHubReader;

字符串
server.js

const express = require('express');
const cors = require('cors');

const PORT = 3000;
const app = express();

app.use(express.json());
app.use(cors());

app.get('/home', (req, res) => {
  // I only want to get real time data on this page
});

app.get('/login', (req, res) => {

});

app.post('/control', (req, res) => {

});

app.post('/configure', (req, res) => {

});

app.post('/settings', (req, res) => {

});

app.listen(PORT, () => {
  console.log("Server Listening on PORT:", PORT);
});
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

wss.broadcast = (data) => {
  wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
      try {
        console.log(`Broadcasting data ${data}`);
        client.send(data);
      } catch (e) {
        console.error(e);
      }
    }
  });
};

server.listen(PORT, () => {
  console.log('Listening on %d.', server.address().port);
});

const eventHubReader = new EventHubReader(iotHubConnectionString, eventHubConsumerGroup);

(async () => {
  await eventHubReader.startReadMessage((message, date, deviceId) => {
    try {
      const payload = {
        IotData: message,
        MessageDate: date || Date.now().toISOString(),
        DeviceId: deviceId,
      };

      wss.broadcast(JSON.stringify(payload));
    } catch (err) {
      console.error('Error broadcasting: [%s] from [%s].', err, message);
    }
  });
})().catch();


home.js

// how to connect with backend to use the websocket?


我不完全确定这个路由的WebSocket应该如何与我的其他路由和页面相关。它是否会影响服务器对其余页面的工作方式?我该怎么把这个写进去?

ars1skjm

ars1skjm1#

它是否会影响服务器对其余页面的工作方式?我该怎么把这个写进去?

  • 将WebSocket服务器和Azure IoT Hub通信集成到现有的server.js文件中。
  • 尝试将WebSocket服务器和Express服务器组合到一个HTTP服务器中,以处理WebSocket和REST API请求。检查下面我创建的WebSocket服务器和HTTP服务器。
// Incorrect: Separate creation of WebSocket server and HTTP server
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
server.listen(PORT, () => {
  console.log('Listening on %d.', server.address().port);
});

// Correct: Combined creation of WebSocket server and HTTP server
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });

// Continue with the WebSocket and IoT Hub logic...

字符串

  • 建立WebSocket连接以从IoT Hub接收实时数据。您将需要使用React Native(Expo)提供的WebSocket API。
    home.js:
import React, { useEffect } from 'react';
import { View, Text } from 'react-native';

const HomeScreen = () => {
  // State to hold the real-time data received from the WebSocket
  const [realTimeData, setRealTimeData] = React.useState('');

  useEffect(() => {
    // WebSocket connection URL
    const webSocketURL = 'ws://YOUR_SERVER_IP_ADDRESS:3000';

    // Create a new WebSocket instance
    const socket = new WebSocket(webSocketURL);

    // Event handler for when the WebSocket connection is established
    socket.onopen = () => {
      console.log('WebSocket connected.');
    };

    // Event handler for when the WebSocket receives a message
    socket.onmessage = (event) => {
      // Parse the received message
      const data = JSON.parse(event.data);
      // Update the state with the received data
      setRealTimeData(data.IotData);
    };

    // Event handler for WebSocket errors
    socket.onerror = (error) => {
      console.error('WebSocket error:', error);
    };

    // Clean up the WebSocket connection when the component is unmounted
    return () => {
      socket.close();
    };
  }, []); // Empty dependency array to run the effect only once

  return (
    <View>
      <Text>Real-time IoT Data:</Text>
      <Text>{realTimeData}</Text>
      {/* Add other components and UI elements for your Home screen */}
    </View>
  );
};

export default HomeScreen;

  • WebSocket URL应该设置为服务器的地址和端口,您的后端正在运行。
  • WebSocket将正常工作,前端将通过WebSocket连接从IoT Hub接收实时数据,同时仍将REST API用于其他页面和功能。

有关更多详细信息,请参阅此WebSocket与React-RouterSO的连接

相关问题