我如何保存Kafka消息Key in文档用于MongoDB Sink?

0sgqnhkj  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(108)

现在我有一个MongoDB接收器,它可以正确地保存传入的AVRO消息的值。我需要它来保存文档中的Kafka消息键。我已经尝试了org.apache.kafka.connect.transforms.HoistField$Key来将键添加到正在保存的值中,但这没有任何效果。使用ProvidedInKeyStrategy时它确实工作,但我不希望my _id成为Kafka消息键。
我的配置:

"config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "connection.uri": "mongodb://mongo1",
    "database": "mongodb",
    "collection": "sink",
    "topics": "topics.foo",

    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    
    "transforms": "hoistKey",

    "transforms.hoistKey.type":"org.apache.kafka.connect.transforms.HoistField$Key",
    "transforms.hoistKey.field":"kafkaKey"
  }

Kafka的信息图式:

{
    "type": "record",
    "name": "Smoketest",
    "namespace": "some_namespace",
    "fields": [
        {
            "name": "timestamp",
            "type": "int",
            "logicalType": "timestamp-millis"
        }
    ]
}

Kafka密钥模式:

[
  {
    "type": "enum",
    "name": "EnvironmentType",
    "namespace": "some_namespace",
    "doc": "DEV",
    "symbols": [
      "Dev",
      "Test",
      "Accept",
      "Sandbox",
      "Prod"
    ]
  },
  {
    "type": "record",
    "name": "Key",
    "namespace": "some_namespace",
    "doc": "The standard Key type that is used as key",
    "fields": [
      {
        "name": "conversation_id",
        "doc": "The first system producing an event sets this field",
        "type": "string"
      },
      {
        "name": "broker_key",
        "doc": "The key of the broker",
        "type": "string"
      },
      {
        "name": "user_id",
        "doc": "User identification",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "application",
        "doc": "The name of the application",
        "type": [
          "null",
          "string"
        ]
      },
      {
        "name": "environment",
        "doc": "The type of environment",
        "type": "type.EnvironmentType"        
      }
    ]
  }
]
qyswt5oh

qyswt5oh1#

使用https://github.com/f0xdx/kafka-connect-wrap-smt,我现在可以将Kafka邮件中的所有数据 Package 成一个文档,保存在mongodb接收器中。

相关问题