springkafka,springcloudstream和avro兼容性未知的魔法字节

sqougxex  于 2021-06-06  发布在  Kafka
关注(0)|答案(4)|浏览(428)

我在反序列化来自Kafka主题的消息时遇到问题。消息已使用springcloudstream和apacheavro序列化。我正在读他们使用SpringKafka,并试图反序列化他们。如果我使用springcloud来生成和使用消息,那么我可以很好地反序列化消息。问题是当我用springkafka消费它们,然后尝试反序列化。
我使用的是模式注册表(既有用于开发的spring引导模式注册表,也有用于生产的合流模式),但反序列化问题似乎发生在调用模式注册表的事件之前。
很难发布关于这个问题的所有相关代码,因此我在git hub的回购中发布了它:https://github.com/robjwilkins/avro-example
我在主题上发送的对象只是一个简单的pojo:

@Data
public class Request {
  private String message;
}

在Kafka上生成消息的代码如下所示:

@EnableBinding(MessageChannels.class)
@Slf4j
@RequiredArgsConstructor
@RestController
public class ProducerController {

  private final MessageChannels messageChannels;

  @GetMapping("/produce")
  public void produceMessage() {
    Request request = new Request();
    request.setMessage("hello world");
    Message<Request> requestMessage = MessageBuilder.withPayload(request).build();
    log.debug("sending message");
    messageChannels.testRequest().send(requestMessage);
  }
}

和应用程序.yaml:

spring:
  application.name: avro-producer
  kafka:
    bootstrap-servers: localhost:9092
    consumer.group-id: avro-producer
  cloud:
    stream:
      schema-registry-client.endpoint: http://localhost:8071
      schema.avro.dynamic-schema-generation-enabled: true
      kafka:
        binder:
          brokers: ${spring.kafka.bootstrap-servers}
      bindings:
        test-request:
          destination: test-request
          contentType: application/*+avro

然后我有一个消费者:

@Slf4j
@Component
public class TopicListener {

    @KafkaListener(topics = {"test-request"})
    public void listenForMessage(ConsumerRecord<String, Request> consumerRecord) {
        log.info("listenForMessage. got a message: {}", consumerRecord);
        consumerRecord.headers().forEach(header -> log.info("header. key: {}, value: {}", header.key(), asString(header.value())));
    }

    private String asString(byte[] byteArray) {
        return new String(byteArray, Charset.defaultCharset());
    }
}

使用的项目有application.yaml config:

spring:
  application.name: avro-consumer
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: avro-consumer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        schema.registry.url: http://localhost:8071

当使用者收到消息时,会导致异常:

2019-01-30 20:01:39.900 ERROR 30876 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-request-0 at offset 43. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我已经遍历了反序列化代码,直到抛出此异常为止

public abstract class AbstractKafkaAvroDeserializer extends AbstractKafkaAvroSerDe {
....
private ByteBuffer getByteBuffer(byte[] payload) {
  ByteBuffer buffer = ByteBuffer.wrap(payload);
  if (buffer.get() != 0) {
    throw new SerializationException("Unknown magic byte!");
  } else {
    return buffer;
  }
}

发生这种情况是因为反序列化程序检查序列化对象(字节数组)的字节内容,并期望它为0,但实际不是。因此,我质疑序列化该对象的spring cloud stream messageconverter是否与用于反序列化该对象的io.confluent对象兼容的原因。如果它们不兼容,我该怎么办?
谢谢你的帮助。

kknvjkwl

kknvjkwl1#

这个问题的关键是生产者使用springcloudstream向kafka发布消息,而消费者使用springkaka。原因如下:
现有的系统已经很好地建立起来,并使用spring云流
新使用者需要使用同一方法收听多个主题,仅绑定在主题名称的csv列表上
需要一次使用一组消息,而不是单独使用,因此可以将它们的内容批量写入数据库。
springcloudstream不允许使用者将侦听器绑定到多个主题,并且没有办法一次使用一组消息(除非我弄错了)。
我找到了一个解决方案,它不需要对生产者代码进行任何更改,它使用springcloudstream向kafka发布消息。Spring Cloud流用的是 MessageConverter 管理序列化和反序列化。在 AbstractAvroMessageConverter 方法有: convertFromInternal 以及 convertToInternal 处理与字节数组的转换。我的解决方案是扩展这个代码(创建一个扩展 AvroSchemaRegistryClientMessageConverter ),所以我可以重用大部分spring云流功能,但是有一个可以从我的spring kafka访问的接口 KafkaListener . 然后,我修改了topiclistener以使用该类进行转换:
转换器:

@Component
@Slf4j
public class AvroKafkaMessageConverter extends AvroSchemaRegistryClientMessageConverter {

  public AvroKafkaMessageConverter(SchemaRegistryClient schemaRegistryClient) {
    super(schemaRegistryClient, new NoOpCacheManager());
  }

  public <T> T convertFromInternal(ConsumerRecord<?, ?> consumerRecord, Class<T> targetClass,
      Object conversionHint) {
    T result;
    try {
      byte[] payload = (byte[]) consumerRecord.value();

      Map<String, String> headers = new HashMap<>();
      consumerRecord.headers().forEach(header -> headers.put(header.key(), asString(header.value())));

      MimeType mimeType = messageMimeType(conversionHint, headers);
      if (mimeType == null) {
        return null;
      }

      Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
      Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);

      @SuppressWarnings("unchecked")
      DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
      Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
      result = (T) reader.read(null, decoder);
    }
    catch (IOException e) {
      throw new RuntimeException("Failed to read payload", e);
    }
    return result;
  }

  private MimeType messageMimeType(Object conversionHint, Map<String, String> headers) {
    MimeType mimeType;
    try {
      String contentType = headers.get(MessageHeaders.CONTENT_TYPE);
      log.debug("contentType: {}", contentType);
      mimeType = MimeType.valueOf(contentType);
    } catch (InvalidMimeTypeException e) {
      log.error("Exception getting object MimeType from contentType header", e);
      if (conversionHint instanceof MimeType) {
        mimeType = (MimeType) conversionHint;
      }
      else {
        return null;
      }
    }
    return mimeType;
  }

  private String asString(byte[] byteArray) {
    String theString = new String(byteArray, Charset.defaultCharset());
    return theString.replace("\"", "");
  }
}

修正案 TopicListener :

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicListener {

  private final AvroKafkaMessageConverter messageConverter;

  @KafkaListener(topics = {"test-request"})
  public void listenForMessage(ConsumerRecord<?, ?> consumerRecord) {
    log.info("listenForMessage. got a message: {}", consumerRecord);
    Request request = messageConverter.convertFromInternal(
        consumerRecord, Request.class, MimeType.valueOf("application/vnd.*+avr"));
    log.info("request message: {}", request.getMessage());
  }
}

此解决方案一次只使用一条消息,但可以轻松修改以使用成批消息。
完整的解决方案如下:https://github.com/robjwilkins/avro-example/tree/develop

yrdbyhpb

yrdbyhpb2#

谢谢,这让我用nativeencode和springemoji nature:cloudstream:

泛型绑定属性

bindings:
    input:
      consumer:
        use-native-decoding: true
      destination: so54448732
      group: so54448732
    output:
      destination: so54448732
      producer:
        use-native-encoding: true

Kafka特定的绑定属性

kafka:
    bindings:
      input:
        consumer:
          configuration:
            value.deserializer: com.example.FooDeserializer
      output:
        producer:
          configuration:
            value.serializer: com.example.FooSerializer
7d7tgy0s

7d7tgy0s3#

您可以将绑定配置为以本机方式使用kafka序列化程序。
设置生产者属性 useNativeEncodingtrue 并使用 ...producer.configuration Kafka酒店。
编辑
例子:

spring:
  cloud:
    stream:

# Generic binding properties

      bindings:
        input:
          consumer:
            use-native-decoding: true
          destination: so54448732
          group: so54448732
        output:
          destination: so54448732
          producer:
            use-native-encoding: true

# Kafka-specific binding properties

      kafka:
        bindings:
          input:
            consumer:
              configuration:
                value.deserializer: com.example.FooDeserializer
          output:
            producer:
              configuration:
                value.serializer: com.example.FooSerializer
dy1byipe

dy1byipe4#

您应该通过创建 DefaultKafkaConsumerFactory 还有你的 TopicListener 配置中的bean,类似于:

@Configuration
@EnableKafka
public class TopicListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value(("${spring.kafka.consumer.group-id}"))
private String groupId;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.wilkins.avro.consumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

@Bean
public TopicListener topicListener() {
    return new TopicListener();
}
}

相关问题