kafka connect smt applywithschema需要结构错误

aiqt4smr  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(222)

我已经从confluent部署了一个样本https://github.com/confluentinc/kafka-connect-insert-uuid 添加简单的uuid字段,但我得到一个错误,它需要struct。我在debezium mysqlconnector中应用这个

Only Struct objects supported for [adding UUID to record], found: 
  java.lang.String\n\tat org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)

什么是只按原样返回记录的极简applywithschema方法?我正在尝试调试,需要一个HelloWorldSMT没有任何错误,必须应用包括applywithschema在内的方法
我认为这对于应用程序来说可能是最简单的,但是需要applywithschema

Override
public R apply(R record) {

    return record.newRecord(
            record.topic(), record.kafkaPartition(),
            record.keySchema(), record.key(),
            record.valueSchema(), record.value(),
            record.timestamp()
    );
}

Override
public R applyWithSchema(R record) {
    // what is minimal transform here??
}

我现在只需要运行这些函数而不出错,因为我只需要更改record.headers().add()。
以下是给出错误的applywithschema方法:

private R applyWithSchema(R record) {
    // FAILS HERE!
    final Struct value = requireStruct(operatingValue(record), PURPOSE);

    Schema updatedSchema = schemaUpdateCache.get(value.schema());
    if(updatedSchema == null) {
        updatedSchema = makeUpdatedSchema(value.schema()); 

        final Struct updatedValue = new Struct(updatedSchema);

        for (Field field : value.schema().fields()) {
         // updatedValue.put(field.name(), value.get(field));
        }

        //updatedValue.put(fieldName, getRandomUuid());

        return newRecord(record, updatedSchema, updatedValue);
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题