我对confluentinc连接器有问题。
创建连接器时,需要指定主题(elasticsearch index)和类型(es中的文档类型)。
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test",
"key.ignore": "false",
"schema.ignore": "false",
"connection.url": "http://elastic:9200",
"type.name": "type1",
"name": "elasticsearch-sink"
}
}
我想发表在同一个索引(Kafka主题),但不同的类型,有可能吗?
我尝试创建多个连接器,但问题是,每个连接器都会使用消息,因为它是同一个主题。
我已经尝试用特定类型动态创建连接器,在那里发布,然后删除连接器。但有时它删除得太早,并不是所有的消息都被消费(没有出现在elastic中)。另外,当我删除连接器并创建另一个具有其他文档类型的连接器时,这个新连接器会使用一些旧消息。
有人知道怎么处理吗?
2条答案
按热度按时间tktrz96b1#
我找到了一个解决方案,不幸的是它被否决了。如果你知道什么,请告诉我。
来自官方文件:
所以我创建了这样的连接器:
现在我可以推到主题“旧”,它将索引ElasticSearch“测试”索引
然后我创建了更多的连接器,并使用“topic.index.map”:“topic_name:test“,我可以在同一索引上索引不同的类型
在将来的版本中,它将是topic=>index。合流团队,请不要删除topic.index.map,或者为这种情况找到更好的解决方案谢谢!
wf82jlnq2#
每个连接器可以将消息路由到一种类型。您可以使用单个消息转换将消息路由到不同的索引,但这不是您想要的。
我建议使用流处理将消息拆分为不同的主题。然后,每个主题由不同的连接器传输到同一索引,但根据需要使用不同的类型。
要进行流处理,您可以使用kafka流、spark流等。还有ksql,它可以让您执行以下操作:
然后你有三个主题(
FOO_TYPE_A
,FOO_TYPE_B
,FOO_TYPE_C
)创建三个连接器,流式传输到索引FOO
但类型不同。免责声明:我为confluent工作,这是一家支持开源ksql项目的公司。