debezium mongodb源json接收器到cassandra(lenses.io)

zfycwa2u  于 2021-06-13  发布在  Cassandra
关注(0)|答案(1)|浏览(504)

我需要将一个json对象从mongodb接收到cassandra中的一列。我用的是extractnewdocumentstate,avroconverter。但看来我错了。avroconverter用于源还是汇?如果我把它用在源里,那么我也用在Flume里?

{
"name": "mongodb_source_connector",
"config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "1",
    "mongodb.hosts": "rs0/mongo:27017",
    "mongodb.name": "dbserver1",
    "mongodb.user": "scorpion",
    "mongodb.password": "123123123",
    "database.whitelist": "ladiform",
    "database.history.kafka.bootstrap.servers": "kafka_3:9093",
    "transforms": "route,unwrap",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "drop",
    "transforms.unwrap.operation.header": "true",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
}

}

{
"name": "cassandra_sink_connector",
"config": {
    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "test34",
    "connect.cassandra.port": "9042",
    "connect.cassandra.key.space": "test",
    "connect.cassandra.contact.points": "cassandra",
    "connect.cassandra.username": "cassandra",
    "connect.cassandra.password": "123123123",
    "connect.cassandra.kcql": "INSERT INTO test33 SELECT id, data FROM test34"
}
s71maibg

s71maibg1#

源端和汇端的转换器必须相同,因为序列化在源端完成,反序列化在汇端完成。

相关问题