我的Flume属性:
{
"name": "jdbc-oracle",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac",
"connection.user": "ersin",
"connection.password": "ersin!",
"auto.create": "true",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "id",
"insert.mode": "upsert",
"plugin.path": "/home/ersin/confluent-5.4.1/share/java/",
"name": "jdbc-oracle"
},
"tasks": [
{
"connector": "jdbc-oracle",
"task": 0
}
],
"type": "sink"
}
我的connect-avro-distributed.properties:
bootstrap.servers=10.0.0.0:9092
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://10.0.0.0:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://10.0.0.0:8081
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
我发送的数据如下:
./bin/kafka-avro-console-producer \
--broker-list 10.0.0.0:9092 --topic orders \
--property parse.key="true" \
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' \
--property key.separator="$" \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":["null","int"],"default": null},{"name":"product","type": ["null","string"],"default": null}, {"name":"quantity", "type": ["null","int"],"default": null}, {"name":"price","type": ["null","int"],"default": null}]}' \
--property schema.registry.url=http://10.0.0.0:8081
我可以像这样插入或更新oracle上的数据
{"id":3}${"id": {"int":2}, "product": {"string":"Yağız Gülbahar"}, "quantity": {"int":1071}, "price": {"int":1453}}
但当我把它放到oracle上删除记录时,它不能删除数据,只能将列更新为null
{"id":2}${"id": null, "product": null , "quantity": null , "price": null }
我怎样才能解决这个问题?
提前谢谢
1条答案
按热度按时间jljoyd4f1#
实际上你需要制作一个墓碑记录。在kafka中,使用jdbc接收器连接器进行删除的工作方式如下:
连接器可以在使用tombstone记录时删除数据库表中的行,tombstone记录是具有非null键和null值的kafka记录。默认情况下,此行为是禁用的,这意味着任何逻辑删除记录都将导致连接器故障,从而可以轻松升级jdbc连接器并保留先前的行为。
可以使用启用删除
delete.enabled=true
,但只有当pk.mode
设置为record_key
. 这是因为从表中删除行需要将主键用作条件。启用删除模式不会影响
insert.mode
.还要注意的是,这种记录只有在
delete.retention.ms
毫秒,当前默认为24小时。因此,试着在属性中减少这个配置,看看它是否有效。为此,需要运行以下命令:
现在,配置完成后,您所需要做的就是生成一条带有非空键和空值的消息。注意,kafka控制台使用者不能用于生成空记录,因为用户输入被解析为utf-8。因此,
不是真正的墓碑信息。
但是你可以利用
kafkacat
但它只适用于json消息:但在你的情况下,这不会工作,因为你需要发送avro信息。因此,我建议用你最喜欢的语言写一个非常简单的avro制作人,这样你就可以发送一个墓碑信息了。