我是新手,有人能帮我吗?
def streamStart() {
val sparkConf = new SparkConf().setAppName("kafkaStreamingNew!!").setMaster("spark://husnain:7077").setJars(Array("/home/husnain/Downloads/ScalaWorkspace/KafkaStreaming/target/KafkaStreaming-1.1.0-jar-with-dependencies.jar")) //,"/home/husnain/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.4.1/spark-streaming-kafka_2.10-1.4.1.jar" , "/home/husnain/.m2/repository/org/apache/spark/spark-streaming_2.10/1.4.1/spark-streaming_2.10-1.4.1.jar" ,"/home/husnain/.m2/repository/org/apache/spark/spark-core_2.10/1.4.1/spark-core_2.10-1.4.1.jar" ))
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test";
ssc.checkpoint("checkpoint")
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark", Map("test" -> 1)).map(_._2)
lines.print()
println("*****************************************************************************")
lines.foreachRDD(
iter => iter.foreach(
x => println(x + "\n***-------------------------------------------------------***\n")))
println("-----------------------------------------------------------------------------")
ssc.start()
ssc.awaitTermination()
在spark独立群集上,该代码不起作用,但在本地[*]上,它可以正常工作:
lines.foreachRDD(
iter => iter.foreach(
x => println(x + "\n***-------------------------------------------------------***\n")
)
)
1条答案
按热度按时间ocebsuys1#
我假设它被称为“正常工作”是因为你看到了
println
在控制台上。当您向集群提交相同的代码时
println
对控制台的调用在每个执行器上本地发生,因此如果其他所有操作都在工作,那么缺少输出仅仅是分布式执行的结果。在执行器的输出中查找
println
s