kafka connect未在postgres中输入json

bkhjykvo  于 2021-06-05  发布在  Kafka
关注(0)|答案(0)|浏览(201)

我在c应用程序中将一个对象转换成json。我得到以下json:

{
  "Data": {
    "SomeData": null,
    "OtherData": {
      "Responses": [
        "Response 1",
        "Response 2"
      ]
    }
  }
}

然后,我将数据发布到kafka主题,并使用具有以下描述的kafka连接器尝试将数据输入postgres:

{
    "name": "my-pg-sink-name",
    "config": {
        "connector.class": "org.clojars.yanatan16.kafka.connect.pg.PostgresSinkConnector",
        "tasks.max": 1,
        "topics": "my.topic.name",
        "tuple.spec.json": "{\"Data\":[\"value\",\"Data\"]}",
        "value.converter":"org.apache.kafka.connect.storage.JsonConverter",
        "db.hostname": "1.1.1.1",
        "db.database": "myDB",
        "db.username": "myUser",
        "db.password": "123",
        "db.table": "my_table"
    }
}

这个 Data 字段输入 my_table 是用类型定义的 jsonb .
接收器为 INSERT ,这不起作用:

INSERT INTO my_table (Data)  VALUES ($1) data: ({:SomeData nil, :OtherData {:Responses ["Response 1" "Response 2"]}})

如何插入json?
更新
INSERT 尝试: INFO Postgres result: #error { :cause "OnError while emitting onNext value: com.github.pgasync.impl.PgConnection.class" :via [{:type java.lang.IllegalArgumentException :message "No implementation of method: :to-pg-value of protocol: #'postgres.async/IPgParameter found for class: clojure.lang.PersistentArrayMap"

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题