我必须创建一个Kafka生成器,它生成一个从1到300的数字序列。我写的每条消息都必须包含关于主题、键和值的信息,该值是要写的值的二进制值。
这是我创建的代码:
from kafka import KafkaProducer
import numpy as np
import time
producer = KafkaProducer(bootstrap_servers='Cloudera02:9092')
for i in range(1,300):
value = bytes(str(i), 'utf-8')
key = (str(i), 'utf-8')
producer.send('PEC5', key = key, value = value)
time.sleep(3)
producer.flush()
Kafka消费者应该读取生产者,并且只在控制台中显示值。
from kafka import KafkaConsumer
from codecs import utf_8_decode
consumer = KafkaConsumer('PEC5', bootstrap_servers='Cloudera02:9092', auto_offset_reset='smallest', consumer_timeout_ms=10000)
for message in consumer:
for value in message.values:
print(value)
我正在运行两个终端,一个与生产者,第二个与消费者,但我没有得到任何打印在控制台。任何想法是什么问题?
1条答案
按热度按时间9gm1akwq1#
KafkaConsumer
中的auto_offset_reset
值错误,根据文档,它只有两个有效值latest
和earliest
,任何其他值都将引发异常auto_offset_reset(str)-用于在发生OffsetOutOfRange错误时重置偏移的策略:'earliest'将移动到最早的可用消息,'latest'将移动到最近的消息。任何其他值都将引发异常。默认值:“最新的”
所以用
earliest
构造KafkaConsumer
当您将
auto_offset_reset
更新为earliest
时,还要添加新的group_id
,因为只有在没有为使用者的组找到以前的偏移量时,Kafka才会将offest重置为earliest
。docsgroup_id(str或None)-要加入以进行动态分区分配(如果启用)以及用于提取和提交偏移的使用者组的名称。如果为None,则禁用自动分区分配(通过组协调器)和偏移提交。默认值:无