我在Kafka迪的下面创作。
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
然后将值另存为:
val lines = messages.map(_.value)
然后,当我没有更多的偏移量可供使用时,停止流式处理上下文,如下所示:
lines.foreachRDD(rdd => {
if(rdd.isEmpty()) {
messages.stop()
ssc.stop(false)
} else {
}
})
然后我打印 lines
具体如下:
lines.print()
然后我开始流为:
ssc.start()
它工作得很好。它读取RDD并打印前10条,停止消息流并停止流上下文。但当我执行同一行时 lines.print()
它抛出一个异常,表示在停止streamingcontext之后不能执行新的输入、转换或输出。
我如何实现我的目标?我在spark shell中运行它,而不是作为二进制文件(强制要求)。
以下是我真正想要达到的目标:
1) 使用kafka主题中的所有json记录。
2) 停止获取更多记录(保证消费后,Kafka主题中不会添加新记录,所以不想继续处理no records。)
3) 通过从json字段中提取一些字段来进行一些预处理。
4) 对预处理后的数据做进一步的操作。
5) 完成。
1条答案
按热度按时间5sxhfpxr1#
当您再次调用“lines.print()”时,它将再次尝试调用转换“messages.map(\ u.value)”。当你停止上下文时,它就失败了。
通过在停止上下文之前执行操作来保存lines变量。