kafka connect jdbc接收器连接器嵌套avro

bmp9r5qi  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(271)

我正在尝试为oracle数据库运行kafka连接接收器。Kafka中的avro消息是嵌套的。
消息如下所示:

{
      "header" : {
        "id" : "",
        "source" : "",
        "name" : ""
      },
      "somefields" : {
        "something" : "value",
         "other" : "its value",

           }
}

我的问题是:
使用Kafka连接可以实现这一点吗?如果是,怎么办?我记得读过一些不支持嵌套结构的文章。是真的吗?为什么?我得到一个错误,说:

org.apache.kafka.connect.errors.ConnectException:(STRUCT) type doesn't have a mapping to the SQL database column type
    at io.confluent.connect.jdbc.sink.dialect.DbDialect.getSqlType(DbDialect.java:202)
    at io.confluent.connect.jdbc.sink.dialect.OracleDialect.getSqlType(OracleDialect.java:73)
    at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnSpec(DbDialect.java:140)
    at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:132)
    at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:128)
    at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:54)
    at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:37)
    at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnsSpec(DbDialect.java:128)
    at io.confluent.connect.jdbc.sink.dialect.DbDialect.getCreateQuery(DbDialect.java:96)
    at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:87)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:66)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)

任何帮助或建议都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题