现在我有一个MongoDB接收器,它可以正确地保存传入的AVRO消息的值。我需要它来保存文档中的Kafka消息键。我已经尝试了org.apache.kafka.connect.transforms.HoistField$Key
来将键添加到正在保存的值中,但这没有任何效果。使用ProvidedInKeyStrategy
时它确实工作,但我不希望my _id成为Kafka消息键。
我的配置:
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://mongo1",
"database": "mongodb",
"collection": "sink",
"topics": "topics.foo",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "hoistKey",
"transforms.hoistKey.type":"org.apache.kafka.connect.transforms.HoistField$Key",
"transforms.hoistKey.field":"kafkaKey"
}
Kafka的信息图式:
{
"type": "record",
"name": "Smoketest",
"namespace": "some_namespace",
"fields": [
{
"name": "timestamp",
"type": "int",
"logicalType": "timestamp-millis"
}
]
}
Kafka密钥模式:
[
{
"type": "enum",
"name": "EnvironmentType",
"namespace": "some_namespace",
"doc": "DEV",
"symbols": [
"Dev",
"Test",
"Accept",
"Sandbox",
"Prod"
]
},
{
"type": "record",
"name": "Key",
"namespace": "some_namespace",
"doc": "The standard Key type that is used as key",
"fields": [
{
"name": "conversation_id",
"doc": "The first system producing an event sets this field",
"type": "string"
},
{
"name": "broker_key",
"doc": "The key of the broker",
"type": "string"
},
{
"name": "user_id",
"doc": "User identification",
"type": [
"null",
"string"
]
},
{
"name": "application",
"doc": "The name of the application",
"type": [
"null",
"string"
]
},
{
"name": "environment",
"doc": "The type of environment",
"type": "type.EnvironmentType"
}
]
}
]
1条答案
按热度按时间qyswt5oh1#
使用https://github.com/f0xdx/kafka-connect-wrap-smt,我现在可以将Kafka邮件中的所有数据 Package 成一个文档,保存在mongodb接收器中。