Kafka流没有返回正确的消息

gopyfrb3  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(156)

我对Apache·Kafka还相当陌生。
我正在构建的应用程序有一个主题“a”,用于传入作业。作业是一个简单的json{“jobid”:1,“processedds”:[],“ids”:[1,2]}
然后在侦听主题“a”的侦听器中处理每个作业的id
因此,例如对于上面的作业,将处理id 1,并将以下消息发送到主题“a”:{“jobid”1,“processedds”:[1],“ids”:[1,2]}
为了填充processedid,我从kafkastreams store by id获取作业,并将新的processedid附加到现有的集合中。
但是,当我在添加processedid之后从kstream获取作业时,返回的作业是初始作业{“jobid”:1,“processedds”:[],“ids”:[1,2]}
有人知道为什么吗?而且这似乎是偶然发生的,所以有时效果很好。
使用者配置属性:auto.offset.reset-earliest auto.commit.interval.ms-1000
我通过增减间隔尝试了不同的值。我还尝试了auto.offset.reset-最新版本
这是在我运行集成测试时发生的。对于同一应用程序中的另一个测试,我得到一个invalidstatestoreexception。我知道需要捕获错误并重试对存储的访问,因为在重新平衡期间它可能不可用。我还看到,如果在调试时运行测试,它通过了测试。所以我想它也可能是某种时间问题?

Producer<String, Request> p = new KafkaProducer<String, Request>(props,new StringSerializer(),new JsonSerializer<Request>());   

//for each id in job    
Request r1 = new Request();
r1.setId("1");
r1.setJobId("1");
r1.setId(1L);
future = p.send(new ProducerRecord<String, Request>("ResultsTopic",r1.getId(),r1));
future.get();

Request r2 = new Request();
r2.setId("2");
r2.setJobId("1");
r1.setId(2L);
future = p.send(new ProducerRecord<String, Request>("ResultsTopic",r2.getId(),r2));
future.get();

p.flush();
p.close(500, TimeUnit.MILLISECONDS);

//Kafka listener method for incoming jobs topic - "ResultsTopic"
@Autowired
KafkaStreams all_job_stream;
String jobId = request.getJobId();
ReadOnlyKeyValueStore<String, Job> store = all_job_stream.store("AStore", QueryableStoreTypes.keyValueStore());   //this throws a InvalidStateStoreException

Job job = store.get(jobId); //if I add a sleep before this the test passes

job.getCompletedResponseIds().add(request.getIds());

jobKafkaTemplate.sendDefault(partitionSelectionStrategy.getPartition(job), job.getId(),job);

我想知道这两个问题是否相关。
我还在请求之间添加了一个倒计时锁。任何提示都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题