rkafka.read()不返回消息(仅返回双引号)

58wvjzkj  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(371)

试图通过r的rkafka库返回消息。
遵循相同的rkafka文档@https://cran.r-project.org/web/packages/rkafka/vignettes/rkafka.pdf
输出返回 "" 里面没有真正的信息。Kafka工具确认消息是由生产者发送的。
代码:

prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test")
print(rkafka.read(consumer1))

输出:

[1] ""

期望的输出将返回 "Testing once" .

62o28rlo

62o28rlo1#

为了读取已写入主题(在使用者启动之前)的主题消息,需要将offset值设置为尽可能小的值(相当于 --from-beginning ). 根据rkafka文件 autoOffseetReset 参数默认为 largest 自动偏移集
最小:自动将偏移重置为最小偏移
最大:自动将偏移量重置为最大偏移量
其他:向消费者抛出异常
required:optional
type:string
default:largest
为了能够使用消息,您需要设置 autoOffsetReset"smallest" .

consumer1=rkafka.createConsumer("127.0.0.1:2181","test", autoOffsetReset="smallest")
prdp8dxp

prdp8dxp2#

更新:此代码适用于:

library(rkafka)

prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing twice")

rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test",groupId = "test-consumer- 
group",zookeeperConnectionTimeoutMs = "100000",autoCommitEnable = "NULL", 
autoCommitInterval = "NULL",autoOffsetReset = "NULL")

print(rkafka.read(consumer1))
print(rkafka.readPoll(consumer1))
rkafka.closeConsumer(consumer1)

关键是删除kafka生成的日志后重新启动它。

相关问题