我在用Java把Kafka主题的数据写入postgres,
我正在使用下面的代码循环Kafka的记录
for (SinkRecord record : records)
{
log.info("new set records: {}", record);
log.info("The value before modified: {}",record.value());
}
打印记录时得到以下输出
SinkRecord{kafkaOffset=1, timestampType=CreateTime} ConnectRecord{topic='DES.EMPLOYEE_INCOME', kafkaPartition=0, key=Struct{EMPID=2
}, keySchema=Schema{com.attunity.queue.msg.New.DES.EMPLOYEE_INCOME.KeyRecord:STRUCT}, value=Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000,operation=REFRESH,timestamp=}, valueSchema=Schem
a{DataRecord:STRUCT}, timestamp=1678699184106, headers=ConnectHeaders(headers=)}
当我打印record.value()时得到以下输出
Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000,operation=REFRESH,timestamp=} (io.confluent.connect.jdbc.sink.JdbcDbW
riter:80)
从record.value
对象中,我希望过滤掉操作和时间戳,
编辑record.value
后,预期输出如下所示,
Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000} (io.confluent.connect.jdbc.sink.JdbcDbW
riter:80)
怎么做,
1条答案
按热度按时间a2mppw5e1#
如果我理解你的问题,
https://kafka.apache.org/34/javadoc//org/apache/kafka/connect/data/Struct.html
否则,在Java中就不会这样做了,可以使用ReplaceField$Value来
blacklist
操作和时间戳字段。