我们计划使用jms源连接器将数据流传输到kafka集群中。activemq中的数据是xml格式的。jms源连接器使用内部messageid(message.getjmsmessageid())作为键。
需要从(xml)有效负载中检索作为键的字段(在连接器流到的Kafka主题上)。
要实现这一点,需要在连接器中执行几个步骤。
为了将xml转换为内部kafka connect结构,我们使用了一个定制的转换插件(https://github.com/jcustenborder/kafka-connect-transform-xml)
然后valuetokey和extractfield变压器设置作为有效负载一部分的密钥。
现在这个键值对可以发送到我们的kafka主题了。
我们正在处理金融交易,需要保证信息的顺序。我们的吞吐量很高,据我所知,configuring tasks.max通过在kafka connect worker之间分配任务来实现并行性。
第一个问题:并行如何与单消息转换器结合使用?“'(source)connector-transformer-converter'是通过设置tasks.max形成一个分布在一起的管道,还是tasks.max设置以某种方式仅适用于连接器?
后者似乎有点奇怪,所以假设前者是正确的,我还有另一个疑问。
我们的Kafka主题密钥-保证Kafka主题的顺序-在连接器的任务中确定。选择tasks.max>1,传入的消息将分布在正在运行的任务中。
由于分布在多个任务中,两个(或多个)消息(在有效负载中包含相同的密钥)以特定顺序从activemq到达,并且可以发送到不同的kafka connect任务。
理论上,当最终进入Kafka主题时,顺序可以颠倒(在同一个分区上,因为它们现在有相同的键)。
我这样推理是对的吗?有没有办法绕过这个问题?或者只有在使用一个任务的用例中才可能有订购保证。
1条答案
按热度按时间ibrsph3r1#
tasks.max设置是否仅适用于连接器?
这个
我们的Kafka主题密钥-这将保证Kafka主题的秩序
不,没有。它只保证分区,仅此而已
只有在这个用例中,使用一个任务才能保证排序。
这取决于来源。我不知道amq,但是如果读取消息会将其从队列中删除,那么多个任务就不可能获得消息