有没有一种方法可以在kafka消费者中指定多个解码器(或者每个主题一个)?还有人需要这个吗?

b1zrtrql  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(367)

我做Kafka在斯卡拉(参考)工作Spark流使用

public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> ReceiverInputDStream<scala.Tuple2<K,V>> createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4)

我想在同一个系统中接收不同类型的消息(需要不同的解码器) DStream 以及潜在的 RDD 每个批次间隔。我将听多个主题,每个主题将对应于一个信息类型,因此需要自己的 Decoder . 目前看来,似乎还没有提供一个 kafka.serializer.Decoder<?> 每个主题(有吗?)。人们很可能会在每个主题上发送不同类型的信息( protobuf 序列化字节。还有其他人遇到过这个问题吗?
谢谢。
c。
它似乎是 topicvalueDecoder 这里的某个地方能帮上忙。

ttp71kqs

ttp71kqs1#

我想,你需要两个数据流,每个主题一个。然后,您将能够执行join或union以获得包含所有元素的单个数据流。

pgpifvop

pgpifvop2#

使用CreateDirectStreamAPI,它允许您通过hasoffsetranges按分区访问主题。对于kafka解码器,使用defaultdecoder为每条消息获取字节数组。
然后在mappartitions中进行实际解码,在mappartitions中匹配主题名称,以了解如何解释字节数组。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

相关问题