如何识别和合并kafka中不同队列的消息

zazmityj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(568)

背景:
我们以前使用hibernate搜索、lucene和jbosshornetq队列进行索引。
我们的应用程序是生产者,将元数据(用于标识数据库中记录的唯一数据信息)发送到hornetq。使用者接收此元数据并对数据库进行查询,以获取完整的记录详细信息(包括子对象)。这是一种更加以数据库为中心的方法。
现在我们要消除以数据库为中心的索引方法。我们决定用Kafka而不是大黄蜂。
用户创建数据时没有问题。
我们看到,当用户编辑数据(比如一个父实体有两个子对象)时,有一个潜在的问题。当从数据库中提取数据供用户显示时,
我们把同样的数据推给Kafka主题1。当用户修改数据(比如parenet级数据)并提交时。我们只获取父级数据(不获取子对象数据),我们将更改的数据推送到topic2。现在我们必须将topic1(子对象)中的消息与topic2(父级数据)中的相应消息合并
注意:我们必须采取这一路线,因为您知道索引中没有更新,而是先删除,然后插入。
问题:
如果采用上述方法,如何将主题1中的特定消息Map到主题2中的特定消息。有没有办法在topic1和topic2中提供相同的消息ID?
如果我只使用一个主题,有没有办法解决这个问题?
有没有更好的设计/方法来解决上述问题?
提前谢谢。

b5buobof

b5buobof1#

如果采用上述方法,如何将主题1中的特定消息Map到主题2中的特定消息。有没有办法在topic1和topic2中提供相同的消息ID?
要在同一kafka集群中的主题之间Map或连接特定消息,kafka流和ksql可能是一个很好的方向。你能在这里找到参考资料吗。
有很多方法可以使对象唯一,我建议在向topic1和topic2发送消息时使用父实体id。java代码示例如下:

ProducerRecord<String, ParentEntity> record = new ProducerRecord<>(topic1, 
ParentEntity.getId(), ParentEntity);
ListenableFuture<SendResult<String, ParentEntity>> future = 
kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, 
ParentEntity>>() {
    @Override
    public void onSuccess(SendResult<String, ParentEntity> result) {}

    @Override
    public void onFailure(Throwable ex) {
        //print out error log 
    }
});

ProducerRecord<String, ChildEntity> record = new ProducerRecord<>(topic2, 
ChildEntity.getParentEntityId(), ChildEntity);
ListenableFuture<SendResult<String, ChildEntity>> future = 
kafkaTemplate.send(record);
future.addCallback(new ListenableFutureCallback<SendResult<String, 
ChildEntity>>() {
    @Override
    public void onSuccess(SendResult<String, ChildEntity> result) {}

    @Override
    public void onFailure(Throwable ex) {
        //print out error log 
    }
});

如果我只使用一个主题,有没有办法解决这个问题?
您可以在数据库中创建一个新表(称为a)来存储要发送用于索引的完整消息。每次用户创建或更新数据时,也会将消息插入/更新到表a中。最后,您的kafka客户机从表a中提取消息对象,并生成kafka集群中的唯一主题。
有没有更好的设计/方法来解决上述问题?
你可以试试Kafka流和ksql,正如我上面提到的。

相关问题