为什么元数据被添加到这个kafka连接器的输出中?

e4eetjau  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(326)

我有一个Kafka连接器,代码如下 poll() 方法。

@Override
public List<SourceRecord> poll() throws InterruptedException 
{
    SomeType item = mQueue.take();
    List<SourceRecord> records = new ArrayList<>();
    SourceRecord[] sourceRecords = new SourceRecord[]{
        new SourceRecord(null, null, "data", null,
                         Schema.STRING_SCHEMA, "foo",
                         Schema.STRING_SCHEMA, "bar")
    };
    Collections.addAll(records, sourceRecords);

    return records;
}

如果将使用者附加到数据主题,则会从连接器发送以下消息:

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

如果我使用以下命令直接向主题发布消息:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

然后连接同一个消费者,我得到以下信息:

foo bar

这是我期望看到的连接器实现的输出,而不是 {"schema":... 我收到的消息。
如何更改 poll() 这样消息就不会在消息的实际键和值中出现模式元数据了?

p4tfgftt

p4tfgftt1#

好吧,原来只是因为我有以下几行 connect-standalone.properties ```
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

我早该知道的

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

作为替代方案,我还可以将以下设置从true更改为false

value.converter.schemas.enable=false

然后在我的处理器类中,我将代码更改为:

SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
null, "bar")
};

这有所不同,因为我不再为值指定模式。

相关问题