我是 using confluent.connect.cassandra.CassandraSinkConnector
,Kafka连接CassandraFlume。
我想知道是否有可能使用 io.confluent.connect.cassandra.CassandraSinkConnector
作为连接器。
如果可能的话,请您建议设置什么配置来启用此功能。我已经尝试了文档中提到的所有配置,但是没有成功地创建表。
这是我正在使用的配置:
{
"name": "cassandra-test4",
"config": {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"tasks.max": "3",
"topics": "orders-topic2",
"cassandra.contact.points": "my_ip",
"cassandra.keyspace": "test_cas",
"cassandra.write.mode": "Insert",
"cassandra.table.manage.enabled": "true",
"cassandra.sink.route": "test_cas.orders",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"flush.size": "1",
"cassandra.keyspace.create.enabled": "true",
"name": "cassandra-test4"
},
"tasks": [
{
"connector": "cassandra-test4",
"task": 0
},
{
"connector": "cassandra-test4",
"task": 1
},
{
"connector": "cassandra-test4",
"task": 2
}
],
"type": null
}
1条答案
按热度按时间7kjnsjlb1#
这应该通过设置
cassandra.keyspace.create.enabled
&cassandra.table.manage.enabled
属性到true
. 参见文档。但是要非常小心—在集群中很容易出现模式不一致,然后需要执行其他步骤来从中恢复。最好在启动连接器之前预先创建表。。。