我在努力实现
{
"source": "NEWS",
"metadata": {
"publishTime": "02/06/2019 09:56:24.317",
"channel": "paper",
"status":"active"
},
"Data": {
"NAME": 67,
"GENDER": "MALE",
...
}
}
但我被这个困住了
{
"Data": {
"NAME": 67,
"GENDER": "MALE",
...
},
"source": "NEWS",
"metadata": "{\"channel\":'paper'}"
}
下面是我的connector.properties
name=source-sqlserver-user
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://localhost:1433;database=testing;username=xyz;password=xyz;
table.whitelist=Tickets
mode=incrementing
incrementing.column.name=Ticket_id
validate.non.null=false
topic.prefix=my-mssql-
transforms=MakeMap123,value,extract,InsertSourceDetails,xyz
transforms.MakeMap123.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap123.field=Data
transforms.value.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.value.fields=Ticket_id
transforms.extract.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extract.field=Ticket_id
# transforms.InsertTopic.type=org.apache.kafka.connect.transforms.InsertField$Value
# transforms.InsertTopic.topic.field=messagetopic
eetransforms.InsertSourceDetails.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSourceDetails.static.field=source
transforms.InsertSourceDetails.static.value=NEWS
transforms.xyz.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.xyz.static.field=metadata
transforms.xyz.static.value={"channel":'paper'}
在这里source:news is 静态字段并作为例外工作,但是有没有可能我们提取同一个表的少数列(所有列都属于一个表)并将其作为另一个json键(在我的例子中是元数据)放置,如上所示。是的,我试过了 ValueToKey
以及 ExtractField$Value
但是valuetokey抛出了npe
这是否可能实现与Kafka连接的转变?还是我错过了什么??或者我应该使用自定义avro模式?任何自定义avro模式的例子都非常感谢。
暂无答案!
目前还没有任何答案,快来回答吧!