我对Apache·Kafka还相当陌生。
我正在构建的应用程序有一个主题“a”,用于传入作业。作业是一个简单的json{“jobid”:1,“processedds”:[],“ids”:[1,2]}
然后在侦听主题“a”的侦听器中处理每个作业的id
因此,例如对于上面的作业,将处理id 1,并将以下消息发送到主题“a”:{“jobid”1,“processedds”:[1],“ids”:[1,2]}
为了填充processedid,我从kafkastreams store by id获取作业,并将新的processedid附加到现有的集合中。
但是,当我在添加processedid之后从kstream获取作业时,返回的作业是初始作业{“jobid”:1,“processedds”:[],“ids”:[1,2]}
有人知道为什么吗?而且这似乎是偶然发生的,所以有时效果很好。
使用者配置属性:auto.offset.reset-earliest auto.commit.interval.ms-1000
我通过增减间隔尝试了不同的值。我还尝试了auto.offset.reset-最新版本
这是在我运行集成测试时发生的。对于同一应用程序中的另一个测试,我得到一个invalidstatestoreexception。我知道需要捕获错误并重试对存储的访问,因为在重新平衡期间它可能不可用。我还看到,如果在调试时运行测试,它通过了测试。所以我想它也可能是某种时间问题?
Producer<String, Request> p = new KafkaProducer<String, Request>(props,new StringSerializer(),new JsonSerializer<Request>());
//for each id in job
Request r1 = new Request();
r1.setId("1");
r1.setJobId("1");
r1.setId(1L);
future = p.send(new ProducerRecord<String, Request>("ResultsTopic",r1.getId(),r1));
future.get();
Request r2 = new Request();
r2.setId("2");
r2.setJobId("1");
r1.setId(2L);
future = p.send(new ProducerRecord<String, Request>("ResultsTopic",r2.getId(),r2));
future.get();
p.flush();
p.close(500, TimeUnit.MILLISECONDS);
//Kafka listener method for incoming jobs topic - "ResultsTopic"
@Autowired
KafkaStreams all_job_stream;
String jobId = request.getJobId();
ReadOnlyKeyValueStore<String, Job> store = all_job_stream.store("AStore", QueryableStoreTypes.keyValueStore()); //this throws a InvalidStateStoreException
Job job = store.get(jobId); //if I add a sleep before this the test passes
job.getCompletedResponseIds().add(request.getIds());
jobKafkaTemplate.sendDefault(partitionSelectionStrategy.getPartition(job), job.getId(),job);
我想知道这两个问题是否相关。
我还在请求之间添加了一个倒计时锁。任何提示都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!