confluentinc连接器一个elasticsearch索引中有多个连接器以特定文档类型发布

yhuiod9q  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(327)

我对confluentinc连接器有问题。
创建连接器时,需要指定主题(elasticsearch index)和类型(es中的文档类型)。

{
  "name": "test-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "key.ignore": "false",
    "schema.ignore": "false",
    "connection.url": "http://elastic:9200",
    "type.name": "type1",
    "name": "elasticsearch-sink"
  }
}

我想发表在同一个索引(Kafka主题),但不同的类型,有可能吗?
我尝试创建多个连接器,但问题是,每个连接器都会使用消息,因为它是同一个主题。
我已经尝试用特定类型动态创建连接器,在那里发布,然后删除连接器。但有时它删除得太早,并不是所有的消息都被消费(没有出现在elastic中)。另外,当我删除连接器并创建另一个具有其他文档类型的连接器时,这个新连接器会使用一些旧消息。
有人知道怎么处理吗?

tktrz96b

tktrz96b1#

我找到了一个解决方案,不幸的是它被否决了。如果你知道什么,请告诉我。
来自官方文件:

topic.index.map
This option is now deprecated. A future version may remove it completely. Please use single message transforms, such as RegexRouter, to map topic names to index names.

A map from Kafka topic name to the destination Elasticsearch index, represented as a list of topic:index pairs.

Type: list
Default: ""
Importance: low

所以我创建了这样的连接器:

{
  "name": "test-connector-old",
  "config": {
  .....
    "topics": "old",
    "topic.index.map": "old:test",
....
  }
}

现在我可以推到主题“旧”,它将索引ElasticSearch“测试”索引
然后我创建了更多的连接器,并使用“topic.index.map”:“topic_name:test“,我可以在同一索引上索引不同的类型
在将来的版本中,它将是topic=>index。合流团队,请不要删除topic.index.map,或者为这种情况找到更好的解决方案谢谢!

wf82jlnq

wf82jlnq2#

每个连接器可以将消息路由到一种类型。您可以使用单个消息转换将消息路由到不同的索引,但这不是您想要的。
我建议使用流处理将消息拆分为不同的主题。然后,每个主题由不同的连接器传输到同一索引,但根据需要使用不同的类型。
要进行流处理,您可以使用kafka流、spark流等。还有ksql,它可以让您执行以下操作:

CREATE STREAM FOO_TYPE_A AS SELECT * FROM FOO WHERE TYPE='A';
CREATE STREAM FOO_TYPE_B AS SELECT * FROM FOO WHERE TYPE='B';
CREATE STREAM FOO_TYPE_C AS SELECT * FROM FOO WHERE TYPE='C';

然后你有三个主题( FOO_TYPE_A , FOO_TYPE_B , FOO_TYPE_C )创建三个连接器,流式传输到索引 FOO 但类型不同。
免责声明:我为confluent工作,这是一家支持开源ksql项目的公司。

相关问题