如何使用转换自定义json

ncgqoxb0  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(198)

我在努力实现

{
"source": "NEWS",
"metadata": {
"publishTime": "02/06/2019 09:56:24.317",
"channel": "paper",
"status":"active"
},
"Data": {
"NAME": 67,
"GENDER": "MALE",
    ...
}
}

但我被这个困住了

{
    "Data": {
        "NAME": 67,
        "GENDER": "MALE",
    ...

    },
    "source": "NEWS",
    "metadata": "{\"channel\":'paper'}"
}

下面是我的connector.properties

name=source-sqlserver-user
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://localhost:1433;database=testing;username=xyz;password=xyz;
table.whitelist=Tickets
mode=incrementing 
incrementing.column.name=Ticket_id
validate.non.null=false
topic.prefix=my-mssql-
transforms=MakeMap123,value,extract,InsertSourceDetails,xyz
transforms.MakeMap123.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap123.field=Data
transforms.value.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.value.fields=Ticket_id
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field=Ticket_id

# transforms.InsertTopic.type=org.apache.kafka.connect.transforms.InsertField$Value

# transforms.InsertTopic.topic.field=messagetopic

eetransforms.InsertSourceDetails.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSourceDetails.static.field=source
transforms.InsertSourceDetails.static.value=NEWS
transforms.xyz.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.xyz.static.field=metadata
transforms.xyz.static.value={"channel":'paper'}

在这里source:news is 静态字段并作为例外工作,但是有没有可能我们提取同一个表的少数列(所有列都属于一个表)并将其作为另一个json键(在我的例子中是元数据)放置,如上所示。是的,我试过了 ValueToKey 以及 ExtractField$Value 但是valuetokey抛出了npe
这是否可能实现与Kafka连接的转变?还是我错过了什么??或者我应该使用自定义avro模式?任何自定义avro模式的例子都非常感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题