我正在研究mqtt-kafka源连接器。消息来自mqtt发布者,然后连接器将订阅mqtt主题(/iot/sensor/#),这些消息将发布到kafka代理。但同时,如果我停止连接器并再次重新启动连接器,那么连接器正在使用来自mqtt代理的消息,但某些mqtt消息丢失(如果我从mqtt发布服务器发送1000条消息,而在kafka中没有获得1000条消息),我将mqtt clientid指定为唯一名称。我使用setcleansession=false、qos=1设置mqtt paho客户机属性,并使用mqttdefaultfilepersistence(dir)。rabbitmq版本为3.6.10
伪码
Public class MyTask extends SourceTask implements MqttCallback{
//Initialzied the queue
@Override
public void start(Map<String, String> map) {
//set necessary properties and configuration
//set the required mqttConnect options and mqtt broker
Paho mClient = new MqttClient(mqttURl,Unique ClientID, new MqttDefaultFilePersistence(DIR))
mClient.connect(RequiredConnectProperties)
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
//poll the queue
// publish messages to kafka
}
@Override
public void stop() {
//disconnect the mqtt paho clent
}
@Override
public void messageArrived(String mqtttopic, MqttMessage message) throws Exception {
//add the messages to queue
}
}
暂无答案!
目前还没有任何答案,快来回答吧!