Kafka与mqtt源和jdbc汇

toe95027  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(116)

我有一个Kafka,并使用MQTT代理作为源代码,其中define this是json,

{
"name": "mqtt-source",
"config": {
    "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max": 1,
    "mqtt.server.uri": "tcp://localhost:1883",
    "mqtt.topics":"shellies/+/emeter/+/power", 
    "mqtt.username": "test",
    "mqtt.password": "testPass",
    "kafka.topic": "mqtt_messages",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "confluent.topic.bootstrap.servers": "kafka:9092",
    "confluent.topic.replication.factor": 1
}

}
当我运行命令时

docker exec -it kafka ./bin/kafka-topics --bootstrap-server localhost:9092 --describe --topic mqtt_messages

我可以看到这样的信息,

shellies/shellyem-M/emeter/0/power  51.70
shellies/shellyem-M/emeter/1/power  0.00
shellies/shellyem-M/emeter/0/power  54.20
shellies/shellyem-M/emeter/1/power  0.00

然后我想使用jdbc sink将这些消息存储到postgres的power_metrics表中,该表有三个字段。主题(字符串),我想保存消息的键,时间(时间戳)时间和值(浮点)消息的值。并使用这个json

{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": 1,
    "topics": "mqtt_messages",
    "connection.url": "jdbc:postgresql://timescaledb:5432/metrics",
    "connection.user": "postgres",
    "connection.password": "password",
    "auto.create": true,
    "auto.evolve": true,
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "topic",
    "table.name.format": "power_metrics",
    "fields.whitelist": "topic,value",
    "transforms": "InsertField,ConvertTimestamp",
    "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.InsertField.static.field": "topic",
    "transforms.InsertField.static.value": "sample_topic_value",
    "transforms.ConvertTimestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.ConvertTimestamp.field": "time",
    "transforms.ConvertTimestamp.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.ConvertTimestamp.target.type": "Timestamp",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
  }
}

and I get this error错误for sinkFlumeconnector连接器,

org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Invalid value null for configuration static.value: No value specified for static field: org.apache.kafka.connect.transforms.InsertField$InsertionSpec@1108a1c5
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:264)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:528)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:465)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value null for configuration static.value: No value specified for static field: org.apache.kafka.connect.transforms.InsertField$InsertionSpec@1108a1c5
    at org.apache.kafka.connect.transforms.InsertField.configure(InsertField.java:122)
    at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:261)
    ... 10 more

你知道我该怎么解决吗?

gev0vcfq

gev0vcfq1#

首先,JDBC Sink需要一个模式。
如果你想在数据库中使用和存储JSON作为TEXT而不是BLOB,你需要使用JSONConverter。使用ByteArrayConverter时,没有要操作的字段来进行转换。
为了使用修改/插入值字段的转换,您需要为值添加HoistField转换。
您还需要两个InsertField转换来添加记录时间戳和主题名称,一个使用timestamp.field,另一个使用topic.field,而不是static.value
我建议您首先使用Filesink连接器调试输出

相关问题