如何覆盖avro命名空间和名称,以在KafkaAvro命名器中实现特定的类

aoyhnmkz  于 12个月前  发布在  Apache
关注(0)|答案(1)|浏览(114)

我试图在Kafka中使用Avro序列化记录。不幸的是,生产者(不是我,并且已经为其他消费者生产了很长一段时间)设置了一个名为“record”的模式。当我使用avro-tool从这个模式生成一个类时,我得到了一个名为“record $”的类,大概是因为avro tool试图避免与java关键字record(?)冲突。
KafkaAvroxializer现在无法将这些记录转换为SpecificRecord。起初,因为它找不到要转换的类。我现在在配置中传递这个类:

SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
    Map<String, Object> config = new HashMap<>();
    config.put(SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
      config.put(SPECIFIC_AVRO_READER_CONFIG, true);
      config.put(AVRO_REFLECTION_ALLOW_NULL_CONFIG, true);
      config.put(AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

    if (isKey) {
      config.put(SPECIFIC_AVRO_KEY_TYPE_CONFIG, clazz.getName());
    } else {
      config.put(SPECIFIC_AVRO_VALUE_TYPE_CONFIG, clazz.getName());
    }
      deserializer = new KafkaAvroDeserializer(schemaRegistryClient, config, isKey);

字符串
但是,在运行时:

public T deserialize(byte[] array) throws IOException {
      Object bla = deserializer.deserialize(topic, array);
    return clazz.cast(bla);
  }


我现在得到java.lang.ClassCastException: Cannot cast org.apache.avro.generic.GenericData$Record to COMPANY_NAMESPACE.record$
必须有一种方法来处理生产者设置他们为模式选择的任何有效的命名空间和名称,但是我不知道如何在他们选择了这样的东西的情况下将其转换为SpecificRecord。
现在,我打算回退到使用GenericRecord。
我使用的是7.4.1版本的Confluent库。
我已经逐步调试了调试器,但找不到库回退到GenericRecord而不是我告诉它使用的类型的时刻。

jjjwad0x

jjjwad0x1#

我相当肯定avro库不会求助于GenericRecord作为一个回退时,proximalizing到SpecificRecord不工作,而是抛出一个异常,它无法找到你试图proximalizing到的类。所以一定是你的clazz-class或你的proximalizer配置有问题。

相关问题