spring云流-如果条件满足,如何读取specificrecord,否则如何读取genericrecord

uqcuzwp8  于 2021-07-03  发布在  Java
关注(0)|答案(1)|浏览(307)

当从kafka输入主题中读取带有唯一avro模式的消息类型a时,服务a执行操作a;当从同一主题中读取消息类型b时,服务b执行操作b。
我不希望服务a知道b的模式,反之亦然。如果我有两个独立的主题,我会很容易地设置 spring.kafka.properties.specific.avro.readertrue 并在特定记录中使用,但由于我不希望模式b出现在服务a中,反之亦然(解耦原因),所以我理想地寻找的是如下所示的内容。
内部服务a:

@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='A'")
public void consumeInSpecificRecord(TypeACompiledAvroClass a){
// Some logic
}

@StreamListener(value = Processor.Input, condition = "new String(headers['message_type'])=='B'")
public void consumeInGenericRecord(GenericRecord b){
// Log and ignore (leave it for service B to process)
}

考虑到设置 specific.avro.reader 标记为true将导致消息b的反序列化错误,并将其设置为false将强制我使用泛型记录,即使对于我尝试避免的第一个流侦听器也是如此。
如果不可能,这里还可以提出什么其他的解决办法?

Spring cloud stream version: 2.2.0.RELEASE
Spring Kafka: 2.2.5.RELEASE
Confluent version for the serializer: 5.2.1
kx7yvsdv

kx7yvsdv1#

根据gary的评论,我可以设计一个定制的反序列化程序,它可以根据标题中的内容在通用记录和特定记录之间切换。这是它的代码 A .

public class GenericSpecificFlexibleDeserializer implements ExtendedDeserializer<Object> {
  private static final String SPECIFIC_AVRO_READER = "specific.avro.reader";
  private Map<String, ?> config;
  private KafkaAvroDeserializer specificKafkaAvroDeserializer;
  private KafkaAvroDeserializer genericKafkaAvroDeserializer;

  @Override
  public Object deserialize(String topic, Headers headers, byte[] data) {
    List<String> entityTypeHeader = new LinkedList<>();
    if (headers != null) {
      if (new String(headers.lastHeader("message_type").value()).equals("A")){
        return specificKafkaAvroDeserializer.deserialize(topic, data);
      }
    }
      return genericKafkaAvroDeserializer.deserialize(topic, data);
  }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    this.config = configs;
    HashMap<String, Object> specificConfig = new HashMap<>(config);
    specificConfig.put(SPECIFIC_AVRO_READER, true);

    HashMap<String, Object> genericConfig = new HashMap<>(config);
    genericConfig.put(SPECIFIC_AVRO_READER, false);

    specificKafkaAvroDeserializer = new KafkaAvroDeserializer();
    specificKafkaAvroDeserializer.configure(specificConfig, isKey);
    genericKafkaAvroDeserializer = new KafkaAvroDeserializer();
    genericKafkaAvroDeserializer.configure(genericConfig, isKey);
  }

  @Override
  public Object deserialize(String topic, byte[] data) {
    return deserialize(topic, null, data);
  }

  @Override
  public void close() {
    genericKafkaAvroDeserializer.close();
    specificKafkaAvroDeserializer.close();
  }
}

以及 spring.kafka.cloud.stream.kafka.binder.configuration.value.deserializer 在你的应用程序配置必须设置为 GenericSpecificFlexibleDeserializer 当然。

相关问题