在我的代码中,我首先订阅一个kafka流,处理每个rdd以创建类的一个示例 People
然后,我想发布结果集( Dataset[People]
)Kafka的一个特定主题。需要注意的是,并不是从kafka接收到的所有传入消息都Map到 People
. 此外,人的示例应该按照从Kafka收到的顺序发送给Kafka。
但是,我不确定排序是否真的有必要,或者 People
在执行器上运行相应的代码时保持相同的顺序(我可以直接将数据集发布到kafka)。据我所知,排序是必要的,因为里面的代码 foreachRDD
可以在群集中的不同节点上执行。是这样吗?
这是我的密码:
val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))
def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
// More Code...
// In the end, I have: Dataset[People]
case _ =>
}
myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?
1条答案
按热度按时间svmlkihl1#
此外,人的示例应该按照从Kafka收到的顺序发送给Kafka。
除非您有一个单独的分区(然后您就不会使用spark了,是吗?),否则接收数据的顺序是不确定的,同样地,发送数据的顺序也不会确定。分类在这里没有任何区别。
如果您需要一个非常特定的处理顺序(这通常是一个设计错误,如果您使用的是数据密集型应用程序),那么您需要一个顺序应用程序,或者一个比spark具有更细粒度控制的系统。