我有一个包含json的字符串主题。例如,消息可以是:
'{"id":"foo", "datetime":1}'
在本主题中,所有内容都被视为字符串。
我想发信息进来 postgresql
带的表格 kafka-connect
. 我的目标是让 postgresql
理解信息是 json
. 的确, postgresql
操控性不错 json
.
如何告诉kafka connect或postgresql消息实际上是json?
谢谢
编辑:
现在,我用 ./bin/connect-standalone config/connect-standalone.properties config/sink-sql-rules.properties
.
使用:
connect-standalone.properties属性
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
rest.port=8084
plugin.path=share/java
sink-sql-rules.properties属性
name=mysink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=mytopic
# Configuration specific to the JDBC sink connector.
connection.url=***
connection.user=***
connection.password=***
mode=timestamp+incremeting
auto.create=true
auto.evolve=true
table.name.format=mytable
batch.size=500
编辑2:
通过这些配置,我得到以下错误: org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table
暂无答案!
目前还没有任何答案,快来回答吧!