我从kafka connect的文档中了解到,这种配置应该忽略metricbeat和filebeat主题的键,而不是报警键。但Kafka连接不忽略任何关键。
这就是我推给kafka connect而不是rest的完整json配置
{
"auto.create.indices.at.start": false,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://elasticsearch:9200",
"connection.timeout.ms": 5000,
"read.timeout.ms": 5000,
"tasks.max": "5",
"topics": "filebeat,metricbeat,alarms",
"behavior.on.null.values": "delete",
"behavior.on.malformed.documents": "warn",
"flush.timeout.ms":60000,
"max.retries":42,
"retry.backoff.ms": 100,
"max.in.flight.requests": 5,
"max.buffered.records":20000,
"batch.size":4096,
"drop.invalid.message": true,
"schema.ignore": true,
"topic.key.ignore": "metricbeat,filebeat",
"key.ignore": false
"name": "elasticsearch-ecs-connector",
"type.name": "_doc",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms":"routeTS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"${topic}-${timestamp}",
"transforms.routeTS.timestamp.format":"YYYY.MM.dd",
"errors.tolerance": "all" ,
"errors.log.enable": false ,
"errors.log.include.messages": false,
"errors.deadletterqueue.topic.name":"logstream-dlq",
"errors.deadletterqueue.context.headers.enable":true ,
"errors.deadletterqueue.topic.replication.factor": 1
}
这是连接器启动期间的日志记录
[2020-05-01 21:07:49,960] INFO ElasticsearchSinkConnectorConfig values:
auto.create.indices.at.start = false
batch.size = 4096
behavior.on.malformed.documents = warn
behavior.on.null.values = delete
compact.map.entries = true
connection.compression = false
connection.password = null
connection.timeout.ms = 5000
connection.url = [http://elasticsearch:9200]
connection.username = null
drop.invalid.message = true
elastic.https.ssl.cipher.suites = null
elastic.https.ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
elastic.https.ssl.endpoint.identification.algorithm = https
elastic.https.ssl.key.password = null
elastic.https.ssl.keymanager.algorithm = SunX509
elastic.https.ssl.keystore.location = null
elastic.https.ssl.keystore.password = null
elastic.https.ssl.keystore.type = JKS
elastic.https.ssl.protocol = TLS
elastic.https.ssl.provider = null
elastic.https.ssl.secure.random.implementation = null
elastic.https.ssl.trustmanager.algorithm = PKIX
elastic.https.ssl.truststore.location = null
elastic.https.ssl.truststore.password = null
elastic.https.ssl.truststore.type = JKS
elastic.security.protocol = PLAINTEXT
flush.timeout.ms = 60000
key.ignore = false
linger.ms = 1
max.buffered.records = 20000
max.in.flight.requests = 5
max.retries = 42
read.timeout.ms = 5000
retry.backoff.ms = 100
schema.ignore = true
topic.index.map = []
topic.key.ignore = [metricbeat, filebeat]
topic.schema.ignore = []
type.name = _doc
write.method = insert
iam使用confluent platform 5.5.0
1条答案
按热度按时间mznpcxlj1#
让我们在这里重述一下,因为您的问题和问题陈述已被多次编辑:)
您希望使用单个连接器将多个主题流式传输到elasticsearch
对于某些主题,您希望使用消息键作为elasticsearch文档id,而对于其他主题,您不希望使用,而是希望使用kafka消息坐标(主题+分区+偏移)
你想和我一起做这件事
key.ignore
以及topic.key.ignore
设置这是我三个主题的测试数据,
test01
,test02
,test03
:使用这些数据,我创建了一个连接器(我正在使用ksqldb,但这与直接使用restapi是一样的):
生成的索引在elasticsearch中创建和填充。以下是文档的索引和文档id:
所以呢
key.ignore
是默认值,对于test01
实际上,这意味着消息的键用于文档id。主题
test02
以及test03
被列为topic.key.ignore
这意味着消息的键被忽略(即生效key.ignore=true
),因此文档id是消息的主题/分区/偏移量。我建议,考虑到我已经证明了这一点,你重新开始你的测试,重新检查你的工作。