我创建了一个kafka集群,其中包含3个代理和以下详细信息:
创建了3个主题(“initial1”、“initial2”、“final”),每个主题的复制因子为3,分区为2。
创建了2个生产商,生产商1推送至“initial1”主题,生产商2推送至“initial2”主题。
创建了一个streams应用程序stream1,使用left join从“initial1”和“initial2”主题读取消息,对其进行处理并写入“final”主题。
以下是流配置:
kafka.sourcetopic.one=initial1
kafka.sourcetopic.two=initial2
kafka.desttopic.one=final
kafka.stream.id=Stream11
kafka.stream.state.dir=kafka-streams-11
kafka.stream.server=PLAINTEXT://x1.x2.x3.x4:9092, PLAINTEXT://x1.x2.x3.x4:9093, PLAINTEXT://x1.x2.x3.x4:9094
kafka.auto-commit=true
kafka.auto-commit-interval=10
kafka.auto-offset-reset=latest
kafka.retries=2
kafka.retries.backoff.ms=10
kafka.request.timeout_ms=900000
kafka.timewindow_ms=36000000
kafka.schema.registry.url=http://x1.x2.x3.x4:8081
场景1:如果producer1和stream1都在运行
当producer1将消息(比如1000条消息)推送到“initial1”主题时,stream1能够读取它。
场景2:如果producer1和stream1停止。经过很长的时间间隔(比如说10个小时),producer1首先启动,它推送1000条消息。推送1000条消息后,stream1启动。
stream1不会读取producer1推送的1000条消息。
查询
为什么stream1在长时间间隔后重新启动时无法读取过去的消息?如果流在很小的时间间隔(比如5分钟)内重新启动,那么它就能够读取producer1推送的过去的消息。
暂无答案!
目前还没有任何答案,快来回答吧!