我有一个Kafka,并使用MQTT代理作为源代码,其中define this是json,
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": 1,
"mqtt.server.uri": "tcp://localhost:1883",
"mqtt.topics":"shellies/+/emeter/+/power",
"mqtt.username": "test",
"mqtt.password": "testPass",
"kafka.topic": "mqtt_messages",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"confluent.topic.bootstrap.servers": "kafka:9092",
"confluent.topic.replication.factor": 1
}
}
当我运行命令时
docker exec -it kafka ./bin/kafka-topics --bootstrap-server localhost:9092 --describe --topic mqtt_messages
我可以看到这样的信息,
shellies/shellyem-M/emeter/0/power 51.70
shellies/shellyem-M/emeter/1/power 0.00
shellies/shellyem-M/emeter/0/power 54.20
shellies/shellyem-M/emeter/1/power 0.00
然后我想使用jdbc sink将这些消息存储到postgres的power_metrics表中,该表有三个字段。主题(字符串),我想保存消息的键,时间(时间戳)时间和值(浮点)消息的值。并使用这个json
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "mqtt_messages",
"connection.url": "jdbc:postgresql://timescaledb:5432/metrics",
"connection.user": "postgres",
"connection.password": "password",
"auto.create": true,
"auto.evolve": true,
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "topic",
"table.name.format": "power_metrics",
"fields.whitelist": "topic,value",
"transforms": "InsertField,ConvertTimestamp",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.static.field": "topic",
"transforms.InsertField.static.value": "sample_topic_value",
"transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertTimestamp.field": "time",
"transforms.ConvertTimestamp.format": "yyyy-MM-dd HH:mm:ss",
"transforms.ConvertTimestamp.target.type": "Timestamp",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
and I get this error错误for sinkFlumeconnector连接器,
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Invalid value null for configuration static.value: No value specified for static field: org.apache.kafka.connect.transforms.InsertField$InsertionSpec@1108a1c5
at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:264)
at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:528)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:465)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value null for configuration static.value: No value specified for static field: org.apache.kafka.connect.transforms.InsertField$InsertionSpec@1108a1c5
at org.apache.kafka.connect.transforms.InsertField.configure(InsertField.java:122)
at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:261)
... 10 more
你知道我该怎么解决吗?
1条答案
按热度按时间gev0vcfq1#
首先,JDBC Sink需要一个模式。
如果你想在数据库中使用和存储JSON作为TEXT而不是BLOB,你需要使用JSONConverter。使用ByteArrayConverter时,没有要操作的字段来进行转换。
为了使用修改/插入值字段的转换,您需要为值添加HoistField转换。
您还需要两个InsertField转换来添加记录时间戳和主题名称,一个使用
timestamp.field
,另一个使用topic.field
,而不是static.value
我建议您首先使用Filesink连接器调试输出