datastax kafka连接器无法解析json主题

o2gm4chl  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(365)

我有一个producer类使用github的定制jsonserializer发送到一个主题

public class JsonSerializer<T> implements Serializer<T> {
    ...
    @Override
    public byte[] serialize(String topic, T data) {
        try {
            return this.objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }
    }
    ...
}

我使用以下配置运行datastax kafka连接器:

value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

连接器尝试使用主题时出现以下错误:

[2020-01-12 13:57:53,324] WARN Error inserting/updating row for Kafka record SinkRecord{kafkaOffset=416, timestampType=CreateTime} ConnectRecord{topic='test-3', kafkaPartition=17, key=null, keySchema=Schema{STRING}, value={}, valueSchema=null, timestamp=1578811437723, headers=ConnectHeaders(headers=)}: Primary key column(s) mmsi, ts cannot be left unmapped. Check that your mapping setting matches your dataset contents. (com.datastax.kafkaconnector.DseSinkTask:286)

根据这个错误,我认为连接器无法检索json数据。我做错什么了?
更新
我试过Kafkajsonserializer。
我尝试了stringserializer,正如connector所说,它也受支持。
我发现有些数据实际上是写入数据库的,但与Kafka主题发送的数据总量相比,它的数量总是相对较少的。大约5到10个数据。
我试图保持连接器运行,我发现在它写失败后,它就不会再写了。

tpgth1q7

tpgth1q71#

实际上是配置相关的问题。正如我在update中提到的,它不再写数据以防出错。
这是因为数据集有配置 ignoreErrors 它们有默认值 false . 这意味着如果连接器在消息中发现错误,它将无限期地重试。我把它设为真,问题就解决了。

相关问题