spark:如何通过receiverlauncher.launch创建的dsstream foreachrdd获取数据?

yrdbyhpb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(333)
// consumer data from kafka

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY)

tmp_stream.foreachRDD(rdd => {
  rdd.collect()
  val count = rdd.count()  // here I can get the count of datas
     // How can I get data here ?
})

你知道如何完成这个代码来获取数据吗 foreachRDD 从创建的流 ReceiverLauncher.launch

xfyts7mz

xfyts7mz1#

你可以打电话给 getPayLoad 去吃生的 byte[] . 像这样的

val stream = tmp_stream.map(x => { val s = new String(x.getPayload); s })

相关问题