是否在JAVA中编辑用户定义的对象?

ogsagwnx  于 2023-03-16  发布在  Java
关注(0)|答案(1)|浏览(95)

我在用Java把Kafka主题的数据写入postgres,
我正在使用下面的代码循环Kafka的记录

for (SinkRecord record : records) 
 {  
   log.info("new set records: {}", record); 
   log.info("The value before modified: {}",record.value());
 }

打印记录时得到以下输出

SinkRecord{kafkaOffset=1, timestampType=CreateTime} ConnectRecord{topic='DES.EMPLOYEE_INCOME', kafkaPartition=0, key=Struct{EMPID=2
}, keySchema=Schema{com.attunity.queue.msg.New.DES.EMPLOYEE_INCOME.KeyRecord:STRUCT}, value=Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000,operation=REFRESH,timestamp=}, valueSchema=Schem
a{DataRecord:STRUCT}, timestamp=1678699184106, headers=ConnectHeaders(headers=)}

当我打印record.value()时得到以下输出

Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000,operation=REFRESH,timestamp=} (io.confluent.connect.jdbc.sink.JdbcDbW
riter:80)

record.value对象中,我希望过滤掉操作和时间戳,
编辑record.value后,预期输出如下所示,

Struct{data.EMPID=2,data.NAME=feb,data.SALARY=10000} (io.confluent.connect.jdbc.sink.JdbcDbW
    riter:80)

怎么做,

a2mppw5e

a2mppw5e1#

如果我理解你的问题,

final Struct value = record.value();
String operation = (String) value.get("operation");
Long timestamp = (Long) value.get("timestamp");

https://kafka.apache.org/34/javadoc//org/apache/kafka/connect/data/Struct.html
否则,在Java中就不会这样做了,可以使用ReplaceField$Valueblacklist操作和时间戳字段。

相关问题