ESP 32上的Rust-如何使用MQTT协议向AWS IoT(Core)发送(和接收)数据?

g6baxovj  于 2023-06-06  发布在  其他
关注(0)|答案(1)|浏览(525)

首先:我知道在ESP32上运行Rust还不是一个很常见的实践,并且可以预料到一些(相当多的)麻烦。但我好像遇到了障碍。
什么工作:

  • 在ESP32上 Flink 并运行代码
  • 沿着src/certificates目录中的证书
  • WiFi连接(简单的WPA个人,没有什么花哨的像WPA企业)
  • 使用MQTT发布和订阅主题

什么不起作用:

  • 使用MQTT发布和订阅AWS IoT(Core)。这需要证书,据我所知,我正在正确地处理这个问题(请参阅下面的代码)。

一些额外的信息(见下面的代码):

  • server.cert.crt从AWS提供的root-CA.crt重命名
  • client.cert.pem从AWS提供的my-thing-rev1.cert.pem重命名
  • client.private.key从AWS提供的my-thing-rev1.private.key重命名
  • 我也收到了my-thing-rev1.public.keymy-thing-rev1-Policy,但我不认为我需要这些...?
  • 我知道这不是实现它的正确方法(我不应该直接提供证书,而是使用服务来获取它们,但这是一个非常基本的POC)
  • 如果我不想连接到AWS,而是使用自己的代理或broker.emqx.io进行测试(即使包含证书),代码也可以正常工作。

这是我目前使用的代码(主要基于ESP32 STD演示应用上的Rust):

use embedded_svc::httpd::Result;
use embedded_svc::mqtt::client::{Connection, MessageImpl, QoS};
use esp_idf_svc::mqtt::client::{EspMqttClient, MqttClientConfiguration};
use esp_idf_svc::tls::X509;
use esp_idf_sys::EspError;
// other needed imports (not relevant here)

extern crate dotenv_codegen;
extern crate core;

const AWS_IOT_ENDPOINT: &str = dotenv!("AWS_IOT_ENDPOINT");
const AWS_IOT_CLIENT_ID: &str = dotenv!("AWS_IOT_CLIENT_ID");
const AWS_IOT_TOPIC: &str = dotenv!("AWS_IOT_TOPIC");

fn main() -> Result<()> {
    esp_idf_sys::link_patches();

    // other code

    let mqtt_client: EspMqttClient<ConnState<MessageImpl, EspError>> = test_mqtt_client()?;

    // more code

    Ok(())
}

fn convert_certificate(mut certificate_bytes: Vec<u8>) -> X509<'static> {
    // append NUL
    certificate_bytes.push(0);

    // convert the certificate
    let certificate_slice: &[u8] = unsafe {
        let ptr: *const u8 = certificate_bytes.as_ptr();
        let len: usize = certificate_bytes.len();
        mem::forget(certificate_bytes);

        slice::from_raw_parts(ptr, len)
    };

    // return the certificate file in the correct format
    X509::pem_until_nul(certificate_slice)
}

fn test_mqtt_client() -> Result<EspMqttClient<ConnState<MessageImpl, EspError>>> {
    info!("About to start MQTT client");

    let server_cert_bytes: Vec<u8> = include_bytes!("certificates/server.cert.crt").to_vec();
    let client_cert_bytes: Vec<u8> = include_bytes!("certificates/client.cert.pem").to_vec();
    let private_key_bytes: Vec<u8> = include_bytes!("certificates/client.private.key").to_vec();

    let server_cert: X509 = convert_certificate(server_cert_bytes);
    let client_cert: X509 = convert_certificate(client_cert_bytes);
    let private_key: X509 = convert_certificate(private_key_bytes);

    // TODO: fix the following error: `E (16903) esp-tls-mbedtls: mbedtls_ssl_handshake returned -0x7280`
    let conf = MqttClientConfiguration {
        client_id: Some(AWS_IOT_CLIENT_ID),
        crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
        server_certificate: Some(server_cert),
        client_certificate: Some(client_cert),
        private_key: Some(private_key),
        ..Default::default()
    };
    let (mut client, mut connection) =
        EspMqttClient::new_with_conn(AWS_IOT_ENDPOINT, &conf)?;

    info!("MQTT client started");

    // Need to immediately start pumping the connection for messages, or else subscribe() and publish() below will not work
    // Note that when using the alternative constructor - `EspMqttClient::new` - you don't need to
    // spawn a new thread, as the messages will be pumped with a backpressure into the callback you provide.
    // Yet, you still need to efficiently process each message in the callback without blocking for too long.
    //
    // Note also that if you go to http://tools.emqx.io/ and then connect and send a message to the specified topic,
    // the client configured here should receive it.
    thread::spawn(move || {
        info!("MQTT Listening for messages");

        while let Some(msg) = connection.next() {
            match msg {
                Err(e) => info!("MQTT Message ERROR: {}", e),
                Ok(msg) => info!("MQTT Message: {:?}", msg),
            }
        }

        info!("MQTT connection loop exit");
    });

    client.subscribe(AWS_IOT_TOPIC, QoS::AtMostOnce)?;

    info!("Subscribed to all topics ({})", AWS_IOT_TOPIC);

    client.publish(
        AWS_IOT_TOPIC,
        QoS::AtMostOnce,
        false,
        format!("Hello from {}!", AWS_IOT_TOPIC).as_bytes(),
    )?;

    info!("Published a hello message to topic \"{}\".", AWS_IOT_TOPIC);

    Ok(client)
}

以下是我尝试在设备上运行此命令时的最后几行输出(运行cargo run时,它的设置是编译并闪存到设备和监视器(调试模式)):

I (16913) esp32_aws_iot_with_std: About to start MQTT client
I (16923) esp32_aws_iot_with_std: MQTT client started
I (16923) esp32_aws_iot_with_std: MQTT Listening for messages
I (16933) esp32_aws_iot_with_std: MQTT Message: BeforeConnect
I (17473) esp-x509-crt-bundle: Certificate validated
E (19403) MQTT_CLIENT: mqtt_message_receive: transport_read() error: errno=119  # <- This is the actual error
E (19403) MQTT_CLIENT: esp_mqtt_connect: mqtt_message_receive() returned -1
E (19413) MQTT_CLIENT: MQTT connect failed
I (19413) esp32_aws_iot_with_std: MQTT Message ERROR: ESP_FAIL
I (19423) esp32_aws_iot_with_std: MQTT Message: Disconnected
E (19433) MQTT_CLIENT: Client has not connected
I (19433) esp32_aws_iot_with_std: MQTT connection loop exit
I (24423) esp_idf_svc::eventloop: Dropped
I (24423) esp_idf_svc::wifi: Stop requested
I (24423) wifi:state: run -> init (0)
I (24423) wifi:pm stop, total sleep time: 10737262 us / 14862601 us
W (24423) wifi:<ba-del>idx
I (24433) wifi:new:<1,0>, old:<1,1>, ap:<1,1>, sta:<1,0>, prof:1
W (24443) wifi:hmac tx: ifx0 stop, discard
I (24473) wifi:flush txq
I (24473) wifi:stop sw txq
I (24473) wifi:lmac stop hw txq
I (24473) esp_idf_svc::wifi: Stopping
I (24473) esp_idf_svc::wifi: Disconnect requested
I (24473) esp_idf_svc::wifi: Stop requested
I (24483) esp_idf_svc::wifi: Stopping
I (24483) wifi:Deinit lldesc rx mblock:10
I (24503) esp_idf_svc::wifi: Driver deinitialized
I (24503) esp_idf_svc::wifi: Dropped
I (24503) esp_idf_svc::eventloop: Dropped
Error: ESP_FAIL

这个错误似乎表明保存传入数据的缓冲区已满,不能再保存任何数据,但我不确定。我真的不知道该怎么补救。
(我假设实际的证书处理是正确的)
当我运行以下命令时,我确实在AWS IoT(MQTT测试客户端)中获得了消息:mosquitto_pub -h my.amazonawsIoT.com --cafile server.cert.crt --cert client.cert.pem --key client.private.key -i basicPubSub -t my/topic -m 'test'
有没有人在这方面有更多的经验,谁能给我指出正确的方向?这是否实际上是缓冲区错误,如果是:如何减轻此错误?我是否需要以某种方式增加缓冲区大小(它运行在基本的ESP32修订版1,ESP32_Devkitc_v4上,如果有帮助的话)。据我所知,这个版本有一个4MB的闪存大小,所以这可能解释了缓冲区过低,虽然我认为这应该是足够的。使用的总内存低于总存储的35%(App/part. size: 1347344/4128768 bytes, 32.63%

更新1:我已经意识到这些数据存储在RAM中,而不是闪存中(当时我没有想到),但我不完全确定我的特定设备上的RAM有多大(ESP32修订版1,ESP32_Devkitc_v4)。我最好的猜测是320KB,但我不确定。
更新2:我试着像这样改变缓冲区的大小:

let conf = MqttClientConfiguration {
        client_id: Some(AWS_IOT_CLIENT_ID),
        crt_bundle_attach: Some(esp_idf_sys::esp_crt_bundle_attach),
        server_certificate: Some(server_cert),
        client_certificate: Some(client_cert),
        private_key: Some(private_key),
        buffer_size: 50,      // added this (tried various sizes)
        out_buffer_size: 50,  // added this (tried various sizes)
        ..Default::default()
    };

我尝试了各种组合,但似乎没有太大变化:要么我得到完全相同的错误,或者这个(当选择较小的数字时,例如10):

E (18303) MQTT_CLIENT: Connect message cannot be created
E (18303) MQTT_CLIENT: MQTT connect failed
E (18313) MQTT_CLIENT: Client has not connected

我不确定这个缓冲区的大小应该有多大(当发送简单的时间戳到AWS IoT时),也找不到任何关于这个数字代表什么的文档:是比特还是千比特不知道。

3zwjbxry

3zwjbxry1#

我也遇到了同样的问题(证书正在验证,连接立即断开,错误代码为119)。从我设法理解的情况来看,该代码指的是服务器发送FIN,从而导致连接关闭。在我的情况下,这是由证书附带的策略引起的,该策略不允许使用我正在进行测试的主题(github.com/eclipse/paho.mqtt.rust/issues/119#issuecomment-825745402这是我弄清楚的原因)。您可以尝试在AWS控制台中编辑策略,并暂时允许所有资源()上的所有IoT操作()。请确保使用权限部分下方的复选框将新版本设置为活动版本。如果这解决了您的问题,您可以继续在策略中将权限适当限制为所需的IoT操作+主题。

相关问题