Kafka的消费者不消费生成为主题的资料

rur96b6h  于 2023-01-01  发布在  Apache
关注(0)|答案(1)|浏览(185)

我必须创建一个Kafka生成器,它生成一个从1到300的数字序列。我写的每条消息都必须包含关于主题、键和值的信息,该值是要写的值的二进制值。
这是我创建的代码:

from kafka import KafkaProducer
import numpy as np
import time

producer = KafkaProducer(bootstrap_servers='Cloudera02:9092')

for i in range(1,300):
    value = bytes(str(i), 'utf-8')
    key = (str(i), 'utf-8')
    producer.send('PEC5', key = key, value = value)
    time.sleep(3) 
producer.flush()

Kafka消费者应该读取生产者,并且只在控制台中显示值。

from kafka import KafkaConsumer
from codecs import utf_8_decode

consumer = KafkaConsumer('PEC5', bootstrap_servers='Cloudera02:9092', auto_offset_reset='smallest', consumer_timeout_ms=10000)

for message in consumer:
    for value in message.values:
        print(value)

我正在运行两个终端,一个与生产者,第二个与消费者,但我没有得到任何打印在控制台。任何想法是什么问题?

9gm1akwq

9gm1akwq1#

KafkaConsumer中的auto_offset_reset值错误,根据文档,它只有两个有效值latestearliest任何其他值都将引发异常
auto_offset_reset(str)-用于在发生OffsetOutOfRange错误时重置偏移的策略:'earliest'将移动到最早的可用消息,'latest'将移动到最近的消息。任何其他值都将引发异常。默认值:“最新的”
所以用earliest构造KafkaConsumer

consumer = KafkaConsumer('PEC5', bootstrap_servers='Cloudera02:9092', auto_offset_reset='earliest', consumer_timeout_ms=10000,group_id='test-1')

当您将auto_offset_reset更新为earliest时,还要添加新的group_id,因为只有在没有为使用者的组找到以前的偏移量时,Kafka才会将offest重置为earliest。docs
group_id(str或None)-要加入以进行动态分区分配(如果启用)以及用于提取和提交偏移的使用者组的名称。如果为None,则禁用自动分区分配(通过组协调器)和偏移提交。默认值:无

相关问题