kafka源连接器:有趣的字节字符

noj0wjuj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(698)

我在跟踪https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 用于用mqtt源连接器连接mosquitto和kafka。我正在将mosquitto发行商发送的数据发送到mosquitto订户和kafka消费者。但是kafka consumer的consumerrecord对象中的key和value字段有一些前缀字节字符。下面是我得到的代码片段和输出。
mqttpublisher.py版本

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttsubscriber.py文件

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

Kafka消费者.py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

output:mqttsubscriber.py
传感器/距离b'{“时间”:“12:44:30.817462”,“val”:0}'
传感器/距离b'{“时间”:“12:44:32.820040”,“val”:1}'
传感器/距离b'{“时间”:“12:44:34.822657”,“val”:2}'
输出:kafkaconsumer.py
使用者记录(topic='mqtt',partition=0,offset=225,timestamp=1545117270870,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:30.817462”,“val”:0},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
consumerrecord(topic='mqtt',partition=0,offset=226,timestamp=1545117272821,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:32.820040”,“val”:1},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
consumerrecord(topic='mqtt',partition=0,offset=227,timestamp=1545117274824,timestamp\u type=0,key=b'\x00\x00\x00\x01\x16sensor/dist',value=b'\x00\x00\x00\x02j{“time”:“12:44:34.822657”,“val”:2},headers=[('mqtt.message.id',b'0'),('mqtt.qos',b'0'),('mqtt.retained',b'false'),('mqtt.duplicate',b'false')],校验和=none,序列化\键\大小=17,序列化\值\大小=43,序列化\头\大小=62)
是什么导致Kafka消费程序中出现上述额外字节的前置?提前谢谢。

0kjbasz6

0kjbasz61#

作为演示的一部分,您将启动一个模式注册表
启动kafka connect和依赖项(kafka、zookeeper、schema registry): confluent start connect 如果您查看前5个字节,您将看到它们以0开头,然后是表示整数的4个字节。
请参阅schema registry wire格式并尝试执行 curl localhost:8081/subjects 看看它是否列出了你的主题名 mqtt-key 以及 mqtt-value .
如果不需要avro,则需要配置和编辑kafka connect属性文件以使用不同的转换器,而不是使用 confluent start 除了让Kafka和Zookeeper跑
或者,如果希望python反序列化avro,可以参考github上的合流kafka python repo

相关问题