我做Kafka在斯卡拉(参考)工作Spark流使用
public static <K,V,U extends kafka.serializer.Decoder<?>,T extends kafka.serializer.Decoder<?>> ReceiverInputDStream<scala.Tuple2<K,V>> createStream(StreamingContext ssc, scala.collection.immutable.Map<String,String> kafkaParams, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel, scala.reflect.ClassTag<K> evidence$1, scala.reflect.ClassTag<V> evidence$2, scala.reflect.ClassTag<U> evidence$3, scala.reflect.ClassTag<T> evidence$4)
我想在同一个系统中接收不同类型的消息(需要不同的解码器) DStream
以及潜在的 RDD
每个批次间隔。我将听多个主题,每个主题将对应于一个信息类型,因此需要自己的 Decoder
. 目前看来,似乎还没有提供一个 kafka.serializer.Decoder<?>
每个主题(有吗?)。人们很可能会在每个主题上发送不同类型的信息( protobuf
序列化字节。还有其他人遇到过这个问题吗?
谢谢。
c。
它似乎是 topic
至 valueDecoder
这里的某个地方能帮上忙。
2条答案
按热度按时间ttp71kqs1#
我想,你需要两个数据流,每个主题一个。然后,您将能够执行join或union以获得包含所有元素的单个数据流。
pgpifvop2#
使用CreateDirectStreamAPI,它允许您通过hasoffsetranges按分区访问主题。对于kafka解码器,使用defaultdecoder为每条消息获取字节数组。
然后在mappartitions中进行实际解码,在mappartitions中匹配主题名称,以了解如何解释字节数组。
http://spark.apache.org/docs/latest/streaming-kafka-integration.html