我正在尝试为oracle数据库运行kafka连接接收器。Kafka中的avro消息是嵌套的。
消息如下所示:
{
"header" : {
"id" : "",
"source" : "",
"name" : ""
},
"somefields" : {
"something" : "value",
"other" : "its value",
}
}
我的问题是:
使用Kafka连接可以实现这一点吗?如果是,怎么办?我记得读过一些不支持嵌套结构的文章。是真的吗?为什么?我得到一个错误,说:
org.apache.kafka.connect.errors.ConnectException:(STRUCT) type doesn't have a mapping to the SQL database column type
at io.confluent.connect.jdbc.sink.dialect.DbDialect.getSqlType(DbDialect.java:202)
at io.confluent.connect.jdbc.sink.dialect.OracleDialect.getSqlType(OracleDialect.java:73)
at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnSpec(DbDialect.java:140)
at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:132)
at io.confluent.connect.jdbc.sink.dialect.DbDialect$3.apply(DbDialect.java:128)
at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:54)
at io.confluent.connect.jdbc.sink.dialect.StringBuilderUtil.joinToBuilder(StringBuilderUtil.java:37)
at io.confluent.connect.jdbc.sink.dialect.DbDialect.writeColumnsSpec(DbDialect.java:128)
at io.confluent.connect.jdbc.sink.dialect.DbDialect.getCreateQuery(DbDialect.java:96)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:87)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:62)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:66)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
任何帮助或建议都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!