python kafkaconsumer未连接

pkwftd7m  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(300)

设置:
我有3个集装箱

1) For Kafka
2) For Zookeeper
3) For JupyterLab

我在这些容器之间建立了网络,我看到kafka producer能够运行并生成数据。
Kafka制作人.ipynb

KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=KAFKA_BROKER)

for _ in range(100):
    print("sending")
    producer.send('my-topic', key=b'foo', value=b'bar')
    print("success")

这里send()发送消息100次。
Kafka消费者.ipynb

KAFKA_BROKER = ['172.20.0.2:9093']
from kafka import KafkaConsumer

consumer = KafkaConsumer('my-topic',group_id='my-group',bootstrap_servers=KAFKA_BROKER)

print("Comm success")

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

在上述消费者代码中 print("Comm success") 永远不会被处决。基于生产者代码的执行,网络是开放的,jupyter能够与kafka经纪人交谈。但是,客户机不能连接到同一个代理进行数据使用。如何开始调试?

q0qdq0h2

q0qdq0h21#

默认情况下 auto.offset.reset 价值是 latest ,所以设置为 earliest 使用新的group.id

consumer = KafkaConsumer('my-topic',group_id='new-group',auto_offset_reset = 'earliest',bootstrap_servers=KAFKA_BROKER)

相关问题