使用kafkaavroserializer的额外字节

w7t8yxp5  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(394)

我的设置如下:我从ftp服务器检索xml文件,将其解组到pojo中,将其Map到avro生成的类中,然后将其转发到alpakkas的producer sink中,如下所示:

Ftp.ls("/", ftpSettings)
  .filter(FtpFile::isFile)
  .mapAsyncUnordered(10,
    ftpFile -> {
      CompletionStage<ByteString> fetchFile =
        Ftp.fromPath(ftpFile.path(), ftpSettings).runWith(Sink.reduce((a, b) -> a), materializer);
      return fetchFile;
    })
  .map(b -> b.decodeString(Charsets.ISO_8859_1))
  .map(StringReader::new)
  .map(AlpakkaProducerDemo::unmarshalFile)
  .map(AlpakkaProducerDemo::convertToAvroSerializable)
  .map(a -> new ProducerRecord<>(kafkaTopic, a.id().toString(), a))
  .map(record -> ProducerMessage.single(record))
  .runWith(Producer.committableSink(producerSettings, kafkaProducer), materializer);

问题是序列化显然不能正常工作。e、 g.我希望密钥也被avro序列化,尽管它只是一个字符串(requirement,don't ask)。其配置如下所示:

Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
final KafkaAvroSerializer keyAvroSerializer = new KafkaAvroSerializer();
keyAvroSerializer.configure(kafkaAvroSerDeConfig, true);
final Serializer<Object> keySerializer = keyAvroSerializer;
final Config config = system.settings().config().getConfig("akka.kafka.producer");
final ProducerSettings producerSettings = ProducerSettings.create(config, keySerializer, valueSerializer)
  .withBootstrapServers(kafkaServer);

在kafka中,这会产生一个具有正确内容的键,但在字符串开头有一些(明显的)额外字节: \u0000\u0000\u0000\u0000\u0001N . 正如你所能想象的,这对价值造成了巨大的破坏。我怀疑avro序列化与alpakka使用的信封api不兼容,因此可能需要序列化为 byte[] 事先和使用的共同点 ByteSerializer . 但是,使用 SchemaRegistry 那么。

eanckbw9

eanckbw91#

前五个字节与模式注册表中的序列化格式版本(字节0)和avro模式版本(字节1-4)有关:https://docs.confluent.io/current/schema-registry/serializer-formatter.html#wire-格式。
另一种选择是使用kafkaconnect,使用ftp源代码和xml转换。

相关问题