试图通过r的rkafka库返回消息。
遵循相同的rkafka文档@https://cran.r-project.org/web/packages/rkafka/vignettes/rkafka.pdf
输出返回 ""
里面没有真正的信息。Kafka工具确认消息是由生产者发送的。
代码:
prod1=rkafka.createProducer("127.0.0.1:9092")
rkafka.send(prod1,"test","127.0.0.1:9092","Testing once")
rkafka.closeProducer(prod1)
consumer1=rkafka.createConsumer("127.0.0.1:2181","test")
print(rkafka.read(consumer1))
输出:
[1] ""
期望的输出将返回 "Testing once"
.
2条答案
按热度按时间62o28rlo1#
为了读取已写入主题(在使用者启动之前)的主题消息,需要将offset值设置为尽可能小的值(相当于
--from-beginning
). 根据rkafka文件autoOffseetReset
参数默认为largest
自动偏移集最小:自动将偏移重置为最小偏移
最大:自动将偏移量重置为最大偏移量
其他:向消费者抛出异常
required:optional
type:string
default:largest
为了能够使用消息,您需要设置
autoOffsetReset
至"smallest"
.prdp8dxp2#
更新:此代码适用于:
关键是删除kafka生成的日志后重新启动它。