kafka connect自定义转换无法识别无模式数据

czfnxgou  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(186)

我正在创建一个自定义的Kafka连接转换,用于在将源代码中的某些数据写入Kafka主题之前将其转换。我试着让它在没有模式的情况下工作(也许只工作)。然而,我的代码似乎总是认为有一个模式。
下面是转换中的一些代码:

@Override
  public R apply(R record) {
    if (operatingSchema(record) == null) {
      return applySchemaless(record);
    } else {

//      return applyWithSchema(record);
      throw new IllegalStateException("MyTransform does not currently support schemas" + operatingSchema(record));
    }
  }

  public static class Value<R extends ConnectRecord<R>> extends MyTransform<R> {

    @Override
    protected Schema operatingSchema(R record) {
      return record.valueSchema();
    }

    @Override
    protected Object operatingValue(R record) {
      return record.value();
    }

那个 IllegalStateException 每次都抛出。这是我的连接器配置,我在本地kafka connect上运行它 confluent local start .

"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,

 "transforms" : "convert",

 "transforms.convert.type" : "com.org.team.kafka.connect.transform.common.MyTransform$Value"

如果我移除转换,让kafka connect消费并将未修改的数据推送到我的主题,它实际上会将它推送到没有模式的主题,它看起来是这样的:

{
  "id": 1,
  "application": "app",
  "event": "{\"id\": 42, \"type\": \"add\", \"eventDate\": {\"hour\": 15, \"nano\": 298633600, \"year\": 2020, \"month\": \"JUNE\", \"minute\": 34, \"offset\": {\"id\": \"-04:00\", \"rules\": {\"fixedOffset\": true, \"transitions\": [], \"transitionRules\": []}, \"totalSeconds\": -14400}, \"second\": 37, \"dayOfWeek\": \"THURSDAY\", \"dayOfYear\": 177, \"dayOfMonth\": 25, \"monthValue\": 6}}",
  "created_ts": 1593099276319,
  "updated_ts": 1593099276319,
  "context": "action"
}

所以,如果它没有模式,只是纯json,为什么我的代码会返回 record.valueSchema . 它是一个 Struct ,顺便说一下。
谢谢!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题