kafka状态存储在使用avro时返回null

9q78igpj  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(259)

我们有一个ktable,其中我们使用以下代码Map值并具体化为keyvaluestore:

@Bean
    public KTable<String, CancelEvent> kTable(StreamsBuilder kStreamBuilder,
                                          ValueMapper<CancelEvent, CancelEvent> mapper,
                                          SpecificAvroSerde<CancelEvent> serde) {

      return kStreamBuilder
            .table(properties.getProperty("topics.cancel"), Consumed.with(Serdes.String(), serde))
            .mapValues(mapper, Materialized.as("cancel-event-store"));

    }

和Map器代码:

@Override
    public CancelEvent apply(CancelEvent value) {

      if (value.getData().getCancelled()) {
        return null;
      }

      return value;

    }

我们使用avro进行序列化和反序列化。当我们试图查询 state store 有了钥匙,它总会回来 null .
查询状态存储的代码:

ReadOnlyKeyValueStore<String, CancelEvent> store = streamsFactory.getKafkaStreams()
            .store("cancel-event-store", QueryableStoreTypes.keyValueStore());

    CancelEvent event = store.get(queryEvent.getMeta().getId().toString());
``` `event` 总是 `null` . 调试时,我遍历 `keyValuestore` ,并意识到某个东西出现在键上(可能是魔法字节!)。请参考下图
![](https://i.stack.imgur.com/nTjoM.png)
所以,问题是如何用key查询?
更新1

private Map<String, Object> kStreamsConfigsProperties() {
final Map<String, Object> config = new HashMap<>();

config.put(APPLICATION_ID_CONFIG, "app.name" + new Date().getTime());

config.put(BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("spring.kafka.bootstrap-servers"));
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getProperty("spring.kafka.schema-registry"));

config.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName());
config.put(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());

return config;

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题