kafka使用field from(json)消息将持久主题连接到elasticsearch索引

5f0d552i  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(386)

我正在尝试在elasticsearch中仅使用kafka的connect api中的smt对消息进行索引。
到目前为止,我只幸运地使用了主题和时间戳路由器功能。但是,现在我想基于消息中的某个字段创建单独的索引。
假设消息的格式如下:

{"productId": 1, "category": "boat", "price": 135000}
{"productId": 1, "category": "helicopter", "price": 300000}
{"productId": 1, "category": "car", "price": 25000}

是否有可能根据产品类别将这些指标编入以下索引?
产品船
产品直升机
产品车
或者我必须为每个类别创建单独的主题(知道它可能会变成成百上千个)?
我是在监督一个可以做到这一点的转换,还是这根本不可能,是否必须构建一个自定义组件?

xzlaal3s

xzlaal3s1#

Kafka连接没有现成的东西能做到这一点。您有几个选择:
elasticsearch接收器连接器将根据主题将消息路由到目标索引,因此您可以编写一个自定义smt来检查消息并相应地将其路由到不同的主题
使用流处理器对消息进行预处理,以便在elasticsearch接收器连接器使用这些消息时,它们已经在不同的主题上了。例如,kafka streams或ksql。
ksql您需要硬编码每个类别( CREATE STREAM product-boat AS SELECT * FROM messages WHERE category='boat' 等)
kafka streams现在有了动态路由(kip-303),这将是一种更灵活的方法
手工编码一个定制的elasticsearch接收器连接器,其逻辑编码用于根据消息内容将消息路由到索引。这感觉是三种方法中最糟糕的一种。

kx5bkwkv

kx5bkwkv2#

如果您正在使用 Confluent Platform 您可以根据消息中的字段值执行某种路由。
要做到这一点,你必须使用 ExtractTopic smt来自汇合。有关smt的更多详细信息,请访问https://docs.confluent.io/current/connect/transforms/extracttopic.html#extracttopic
kafka接收器连接器处理消息,消息由 SinkRecord . 每个 SinkRecord 包含多个字段: topic , partition , value , key 这些字段由kafka connect设置,使用转换可以更改这些值。 ExtractTopic smt更改 topic 基于 value 或者 key 信息的一部分。
转换配置如下:

{
...
    "transforms": "ExtractTopic",
    "transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.ExtractTopic.field": "name",  <-- name of field, that value will be used as index name
...
}

一个限制是,必须提前创建索引。
我假设你使用的是elasticsearchFlume连接器。ElasticSearchConnector有能力创建索引,但当它的方法为特定分区创建writer时,它就会这样做( ElasticsearchSinkTask::open ). 在您的用例中,此时无法创建所有索引,因为所有消息的值都不可用。
也许这不是最纯粹的方法,因为 ExtractTopic 应该用于源连接器,但在您的情况下,它可能会工作。

相关问题