我在跟踪https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 用于用mqtt源连接器连接mosquitto和kafka。我正在将mosquitto发行商发送的数据发送到mosquitto订户和kafka消费者。但是kafka consumer的consumerrecord对象中的key和value字段有一些前缀字节字符。下面是我得到的代码片段和输出。
mqttpublisher.py版本
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttsubscriber.py文件
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
Kafka消费者.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
output:mqttsubscriber.py
传感器/距离b'{“时间”:“12:44:30.817462”,“val”:0}'
传感器/距离b'{“时间”:“12:44:32.820040”,“val”:1}'
传感器/距离b'{“时间”:“12:44:34.822657”,“val”:2}'
输出:kafkaconsumer.py
使用者记录(topic='mqtt',partition=0,offset=225,timestamp=1545117270870,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:30.817462”,“val”:0},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
consumerrecord(topic='mqtt',partition=0,offset=226,timestamp=1545117272821,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:32.820040”,“val”:1},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
consumerrecord(topic='mqtt',partition=0,offset=227,timestamp=1545117274824,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:34.822657”,“val”:2},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
是什么导致Kafka消费程序中出现上述额外字节的前置?提前谢谢。
1条答案
按热度按时间0kjbasz61#
作为演示的一部分,您将启动一个模式注册表
启动kafka connect和依赖项(kafka、zookeeper、schema registry):
confluent start connect
如果您查看前5个字节,您将看到它们以0开头,然后是表示整数的4个字节。请参阅schema registry wire格式并尝试执行
curl localhost:8081/subjects
看看它是否列出了你的主题名mqtt-key
以及mqtt-value
.如果不需要avro,则需要配置和编辑kafka connect属性文件以使用不同的转换器,而不是使用
confluent start
除了让Kafka和Zookeeper跑或者,如果希望python反序列化avro,可以参考github上的合流kafka python repo