kafka connect topic.key.ignore未按预期工作

uqcuzwp8  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(396)

我从kafka connect的文档中了解到,这种配置应该忽略metricbeat和filebeat主题的键,而不是报警键。但Kafka连接不忽略任何关键。
这就是我推给kafka connect而不是rest的完整json配置

{
 "auto.create.indices.at.start": false,
 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "connection.url": "http://elasticsearch:9200",
 "connection.timeout.ms": 5000,
 "read.timeout.ms": 5000,
 "tasks.max": "5",
 "topics": "filebeat,metricbeat,alarms",
 "behavior.on.null.values": "delete",
 "behavior.on.malformed.documents": "warn",
 "flush.timeout.ms":60000,
 "max.retries":42,
 "retry.backoff.ms": 100,
 "max.in.flight.requests": 5,
 "max.buffered.records":20000,
 "batch.size":4096,
 "drop.invalid.message": true,
 "schema.ignore": true,
 "topic.key.ignore": "metricbeat,filebeat",
 "key.ignore": false
 "name": "elasticsearch-ecs-connector",
 "type.name": "_doc",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false",
 "transforms":"routeTS",
 "transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
 "transforms.routeTS.topic.format":"${topic}-${timestamp}",
 "transforms.routeTS.timestamp.format":"YYYY.MM.dd",
 "errors.tolerance": "all" ,
 "errors.log.enable": false ,
 "errors.log.include.messages": false,
 "errors.deadletterqueue.topic.name":"logstream-dlq",
 "errors.deadletterqueue.context.headers.enable":true ,
 "errors.deadletterqueue.topic.replication.factor": 1
}

这是连接器启动期间的日志记录

[2020-05-01 21:07:49,960] INFO ElasticsearchSinkConnectorConfig values:
    auto.create.indices.at.start = false
    batch.size = 4096
    behavior.on.malformed.documents = warn
    behavior.on.null.values = delete
    compact.map.entries = true
    connection.compression = false
    connection.password = null
    connection.timeout.ms = 5000
    connection.url = [http://elasticsearch:9200]
    connection.username = null
    drop.invalid.message = true
    elastic.https.ssl.cipher.suites = null
    elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    elastic.https.ssl.endpoint.identification.algorithm = https
    elastic.https.ssl.key.password = null
    elastic.https.ssl.keymanager.algorithm = SunX509
    elastic.https.ssl.keystore.location = null
    elastic.https.ssl.keystore.password = null
    elastic.https.ssl.keystore.type = JKS
    elastic.https.ssl.protocol = TLS
    elastic.https.ssl.provider = null
    elastic.https.ssl.secure.random.implementation = null
    elastic.https.ssl.trustmanager.algorithm = PKIX
    elastic.https.ssl.truststore.location = null
    elastic.https.ssl.truststore.password = null
    elastic.https.ssl.truststore.type = JKS
    elastic.security.protocol = PLAINTEXT
    flush.timeout.ms = 60000
    key.ignore = false
    linger.ms = 1
    max.buffered.records = 20000
    max.in.flight.requests = 5
    max.retries = 42
    read.timeout.ms = 5000
    retry.backoff.ms = 100
    schema.ignore = true
    topic.index.map = []
    topic.key.ignore = [metricbeat, filebeat]
    topic.schema.ignore = []
    type.name = _doc
    write.method = insert

iam使用confluent platform 5.5.0

mznpcxlj

mznpcxlj1#

让我们在这里重述一下,因为您的问题和问题陈述已被多次编辑:)
您希望使用单个连接器将多个主题流式传输到elasticsearch
对于某些主题,您希望使用消息键作为elasticsearch文档id,而对于其他主题,您不希望使用,而是希望使用kafka消息坐标(主题+分区+偏移)
你想和我一起做这件事 key.ignore 以及 topic.key.ignore 设置
这是我三个主题的测试数据, test01 , test02 , test03 :

ksql> PRINT test01 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:32.441 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:32.594 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}

ksql> PRINT test02 from beginning;
Key format: KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:08:50.865 Z, key: X, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:08:50.936 Z, key: Y, value: {"COL1": 2, "COL2": "BAR"}

ksql> PRINT test03 from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/05/12 11:16:15.166 Z, key: <null>, value: {"COL1": 1, "COL2": "FOO"}
rowtime: 2020/05/12 11:16:46.404 Z, key: <null>, value: {"COL1": 2, "COL2": "BAR"}

使用这些数据,我创建了一个连接器(我正在使用ksqldb,但这与直接使用restapi是一样的):

CREATE SINK CONNECTOR SINK_ELASTIC_TEST WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'connection.url'  = 'http://elasticsearch:9200',
  'key.converter'   = 'org.apache.kafka.connect.storage.StringConverter',
  'type.name'       = '_doc',
  'topics'          = 'test02,test01,test03',
  'key.ignore'      = 'false',
  'topic.key.ignore'= 'test02,test03',
  'schema.ignore'   = 'false'
);

生成的索引在elasticsearch中创建和填充。以下是文档的索引和文档id:

➜ curl -s http://localhost:9200/test01/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test01","Y"]
["test01","X"]

➜ curl -s http://localhost:9200/test02/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test02","test02+0+0"]
["test02","test02+0+1"]

➜ curl -s http://localhost:9200/test03/_search \
    -H 'content-type: application/json' \
    -d '{ "size": 5 }' |jq -c '.hits.hits[] | [._index, ._id]'
["test03","test03+0+0"]
["test03","test03+0+1"]

所以呢 key.ignore 是默认值,对于 test01 实际上,这意味着消息的键用于文档id。
主题 test02 以及 test03 被列为 topic.key.ignore 这意味着消息的键被忽略(即生效 key.ignore=true ),因此文档id是消息的主题/分区/偏移量。
我建议,考虑到我已经证明了这一点,你重新开始你的测试,重新检查你的工作。

相关问题