我已经从confluent部署了一个样本https://github.com/confluentinc/kafka-connect-insert-uuid 添加简单的uuid字段,但我得到一个错误,它需要struct。我在debezium mysqlconnector中应用这个
Only Struct objects supported for [adding UUID to record], found:
java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
什么是只按原样返回记录的极简applywithschema方法?我正在尝试调试,需要一个HelloWorldSMT没有任何错误,必须应用包括applywithschema在内的方法
我认为这对于应用程序来说可能是最简单的,但是需要applywithschema
Override
public R apply(R record) {
return record.newRecord(
record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(),
record.valueSchema(), record.value(),
record.timestamp()
);
}
Override
public R applyWithSchema(R record) {
// what is minimal transform here??
}
我现在只需要运行这些函数而不出错,因为我只需要更改record.headers().add()。
以下是给出错误的applywithschema方法:
private R applyWithSchema(R record) {
// FAILS HERE!
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
if(updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : value.schema().fields()) {
// updatedValue.put(field.name(), value.get(field));
}
//updatedValue.put(fieldName, getRandomUuid());
return newRecord(record, updatedSchema, updatedValue);
}
暂无答案!
目前还没有任何答案,快来回答吧!