如何使用apachebeam(kafkaio)反序列化avro数据

4nkexdtk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(328)

我只看到一个线程包含关于我提到的主题的信息:如何使用apachebeam反序列化kafkaavro消息
然而,在尝试了Kafka序列化程序的一些变体之后,我仍然无法反序列化Kafka消息。这是我的密码:

public class Readkafka {
    private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);

    public static void main(String[] args) throws IOException {
        // Create the Pipeline object with the options we defined above.
        Pipeline p = Pipeline.create(
                PipelineOptionsFactory.fromArgs(args).withValidation().create());
       PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
                KafkaIO.<action_states_pkey, String>read()
                    .withBootstrapServers("mybootstrapserver")
                    .withTopic("action_States")
                    .withKeyDeserializer(MyClassKafkaAvroDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistryurl"))
                    .withMaxNumRecords(5)
                    .withoutMetadata();

        p.apply(kafka)
            .apply(Keys.<action_states_pkey>create())
}

MyClasskafkaavrodeserizer在哪里

public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<action_states_pkey> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public action_states_pkey deserialize(String s, byte[] bytes) {
    return (action_states_pkey) this.deserialize(bytes);
}

@Override
public void close() {} }

集体诉讼是由avro工具生成的代码

java -jar pathtoavrotools/avro-tools-1.8.1.jar compile schema pathtoschema/action_states_pkey.avsc destination path

这里的动作是

{"type":"record","name":"action_states_pkey","namespace":"namespace","fields":[{"name":"ad_id","type":["null","int"]},{"name":"action_id","type":["null","int"]},{"name":"state_id","type":["null","int"]}]}

用这个代码我得到了一个错误:

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
    at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:20)
    at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:1)
    at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:221)
    at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.advanceWithBackoff(BoundedReadFromUnboundedSource.java:279)
    at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.start(BoundedReadFromUnboundedSource.java:256)
    at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
    ... 14 more

似乎尝试将avro数据Map到我的自定义类时出错了?
或者,我尝试了以下代码:

PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
                KafkaIO.<action_states_pkey, String>read()
                    .withBootstrapServers("bootstrapserver")
                    .withTopic("action_states")
                    .withKeyDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(action_states_pkey.class))
                    .withValueDeserializer(StringDeserializer.class)
                    .updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistry"))
                    .withMaxNumRecords(5)
                    .withoutMetadata();

        p.apply(kafka);
            .apply(Keys.<action_states_pkey>create())
//            .apply("ExtractWords", ParDo.of(new DoFn<action_states_pkey, String>() {
//                @ProcessElement
//                public void processElement(ProcessContext c) {
//                  action_states_pkey key = c.element();
//                    c.output(key.getAdId().toString());
//                }
//            }));

这不会给我任何错误,直到我尝试打印出数据。我必须验证我是否以某种方式成功地读取了数据,因此我的目的是在控制台中记录数据。如果我取消注解已注解的部分,我会再次出现相同的错误:

SEVERE: 2019-09-13T07:53:56.168Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
    at my.mudah.beam.test.Readkafka$1.processElement(Readkafka.java:151)

另外需要注意的是,如果我指定:

.updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))

总是给我一个错误

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class NAMESPACE.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord.

我的方法好像有问题?如果有人有任何使用apachebeam从kafka流读取avro数据的经验,请帮助我。我非常感激。
下面是我的包的快照,其中还包含模式和类:包/工作路径详细信息
谢谢。

qhhrdooz

qhhrdooz1#

公共类myclasskafkavrodeserializer扩展了抽象kafkavrodeserializer
你们班正在延长课程 AbstractKafkaAvroDeserializer 它回来了 GenericRecord .
你需要转换 GenericRecord 到您的自定义对象。

使用 SpecificRecord 如以下答案之一所述:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) {
    readerSchema = ReflectData.get().getSchema(type);
  }

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}

以上内容复制自https://stackoverflow.com/a/39617120/2534090
https://stackoverflow.com/a/42514352/2534090

相关问题