paho mqtt消息可靠性

dnph8jn4  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(394)

我正在研究mqtt-kafka源连接器。消息来自mqtt发布者,然后连接器将订阅mqtt主题(/iot/sensor/#),这些消息将发布到kafka代理。但同时,如果我停止连接器并再次重新启动连接器,那么连接器正在使用来自mqtt代理的消息,但某些mqtt消息丢失(如果我从mqtt发布服务器发送1000条消息,而在kafka中没有获得1000条消息),我将mqtt clientid指定为唯一名称。我使用setcleansession=false、qos=1设置mqtt paho客户机属性,并使用mqttdefaultfilepersistence(dir)。rabbitmq版本为3.6.10
伪码

Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
    public void start(Map<String, String> map) {
    //set necessary properties and configuration
    //set the required mqttConnect options and mqtt broker
     Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
     mClient.connect(RequiredConnectProperties)
    }
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
    //poll the queue
    // publish messages to kafka
    }
    @Override
    public void stop() {
    //disconnect the mqtt paho clent
    }
    @Override
    public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
    //add the messages to queue

    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题