使用:confluent-5.1.0
接收器配置:
curl -X POST \
http://localhost:8083/connectors \
-H 'cache-control: no-cache' \
-H 'content-type: application/json' \
-d '{
"name": "dbz-sink-connector-1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "dbauditt4",
"topic.index.map": "our3.platform.business:plat_index",
"topics.regex":"our3.platform.business",
"key.ignore": "true",
"connection.url": "http://localhost:9200",
"group.id":"plot",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"transforms": "timestamp_convertor",
"transforms.timestamp_convertor.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp_convertor.target.type": "string",
"transforms.timestamp_convertor.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.timestamp_convertor.field":"data.ts_ms"
}
}';
Kafka主题中的示例消息是:
{
"data": {
"before": null,
"after": {
"Id": 331458,
"business_id": 532334,
"sms_opted": 1
},
"source": {
"version": "0.7.5",
"name": "our3",
"server_id": 810143323,
"ts_sec": 1548661255,
"gtid": null,
"file": "mysql-bin-changelog.001786",
"pos": 1719980,
"row": 0,
"snapshot": false,
"thread": 11674162,
"db": "platform",
"table": "business"
},
"op": "c",
"ts_ms": 1548661255851
}
}
连接器正在引发空指针异常。
java.lang.thread.run(线程。java:748)原因:org.apache.kafka.connect.transforms.timestampconverter.infertimestamptype(timestampconverter)上的java.lang.nullpointerexception。java:422)在
有人能帮帮我吗。我做错什么了?
1条答案
按热度按时间bis0qfac1#
开箱即用的SMT都不支持嵌套字段访问,例如
data.ts
(我想你的意思是data.ts_ms
(取而代之)看起来您正在使用debezium,所以您可以执行cdc事件展平,或者您可以只配置elasticsearch来执行时间字段的动态Map