我有一个producer类使用github的定制jsonserializer发送到一个主题
public class JsonSerializer<T> implements Serializer<T> {
...
@Override
public byte[] serialize(String topic, T data) {
try {
return this.objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}
...
}
我使用以下配置运行datastax kafka连接器:
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
连接器尝试使用主题时出现以下错误:
[2020-01-12 13:57:53,324] WARN Error inserting/updating row for Kafka record SinkRecord{kafkaOffset=416, timestampType=CreateTime} ConnectRecord{topic='test-3', kafkaPartition=17, key=null, keySchema=Schema{STRING}, value={}, valueSchema=null, timestamp=1578811437723, headers=ConnectHeaders(headers=)}: Primary key column(s) mmsi, ts cannot be left unmapped. Check that your mapping setting matches your dataset contents. (com.datastax.kafkaconnector.DseSinkTask:286)
根据这个错误,我认为连接器无法检索json数据。我做错什么了?
更新
我试过Kafkajsonserializer。
我尝试了stringserializer,正如connector所说,它也受支持。
我发现有些数据实际上是写入数据库的,但与Kafka主题发送的数据总量相比,它的数量总是相对较少的。大约5到10个数据。
我试图保持连接器运行,我发现在它写失败后,它就不会再写了。
1条答案
按热度按时间tpgth1q71#
实际上是配置相关的问题。正如我在update中提到的,它不再写数据以防出错。
这是因为数据集有配置
ignoreErrors
它们有默认值false
. 这意味着如果连接器在消息中发现错误,它将无限期地重试。我把它设为真,问题就解决了。