我正在尝试在elasticsearch中仅使用kafka的connect api中的smt对消息进行索引。
到目前为止,我只幸运地使用了主题和时间戳路由器功能。但是,现在我想基于消息中的某个字段创建单独的索引。
假设消息的格式如下:
{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}
是否有可能根据产品类别将这些指标编入以下索引?
产品船
产品直升机
产品车
或者我必须为每个类别创建单独的主题(知道它可能会变成成百上千个)?
我是在监督一个可以做到这一点的转换,还是这根本不可能,是否必须构建一个自定义组件?
2条答案
按热度按时间xzlaal3s1#
Kafka连接没有现成的东西能做到这一点。您有几个选择:
elasticsearch接收器连接器将根据主题将消息路由到目标索引,因此您可以编写一个自定义smt来检查消息并相应地将其路由到不同的主题
使用流处理器对消息进行预处理,以便在elasticsearch接收器连接器使用这些消息时,它们已经在不同的主题上了。例如,kafka streams或ksql。
ksql您需要硬编码每个类别(
CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat'
等)kafka streams现在有了动态路由(kip-303),这将是一种更灵活的方法
手工编码一个定制的elasticsearch接收器连接器,其逻辑编码用于根据消息内容将消息路由到索引。这感觉是三种方法中最糟糕的一种。
kx5bkwkv2#
如果您正在使用
Confluent Platform
您可以根据消息中的字段值执行某种路由。要做到这一点,你必须使用
ExtractTopic
smt来自汇合。有关smt的更多详细信息,请访问https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopickafka接收器连接器处理消息,消息由
SinkRecord
. 每个SinkRecord
包含多个字段:topic
,partition
,value
,key
这些字段由kafka connect设置,使用转换可以更改这些值。ExtractTopic
smt更改topic
基于value
或者key
信息的一部分。转换配置如下:
一个限制是,必须提前创建索引。
我假设你使用的是elasticsearchFlume连接器。ElasticSearchConnector有能力创建索引,但当它的方法为特定分区创建writer时,它就会这样做(
ElasticsearchSinkTask::open
). 在您的用例中,此时无法创建所有索引,因为所有消息的值都不可用。也许这不是最纯粹的方法,因为
ExtractTopic
应该用于源连接器,但在您的情况下,它可能会工作。