我正在尝试实现apache kafka和spark流媒体集成这是我的python代码:
from __future__ import print_function
import sys
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
# conf = SparkConf().setAppName("Kafka-Spark").setMaster("spark://127.0.0.1:7077")
conf = SparkConf().setAppName("Kafka-Spark")
# sc = SparkContext(appName="KafkaSpark")
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,1)
map1={'demo':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "test-consumer-group", map1)
# kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) #tried with localhost:2181 too
lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
stream.start()
stream.awaitTermination()
当我运行上述程序时,它会在终端上显示输出:
16/10/24 15:27:20错误执行器:阶段0.0(tid 0)中任务0.0出现异常java.lang.noclassdeffounderror:scala/collection/gentraversableonce$class位于kafka.utils.pool.(pool)。scala:28)在kafka.consumer.zookeeperconsumerconnector。scala:91)在kafka.consumer.zookeeperconsumerconnector.(zookeeperconsumerconnector。scala:143)在kafka.consumer.consumer$.create(consumerconnector。scala:94)在org.apache.spark.streaming.kafka.kafkareceiver.onstart(kafkainputdstream。scala:100)在org.apache.spark.streaming.receiver.receiversupervisor.startreceiver(receiversupervisor。scala:149)在org.apache.spark.streaming.receiver.receiversupervisor.start(receiversupervisor。scala:131)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:597)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:587)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:70)在org.apache.spark.scheduler.task.run(task。scala:86)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)原因:java.lang.classnotfoundexception:scala.collection.gentraversableonce$class atjava.net.urlclassloader.findclass(urlclassloader。java:381)在java.lang.classloader.loadclass(classloader。java:424)在sun.misc.launcher$appclassloader.loadclass(launcher。java:331)在java.lang.classloader.loadclass(classloader。java:357) ... 17更多16/10/24 15:27:20错误sparkuncaughtexceptionhandler:线程[executor task launch worker-0,5,main]java.lang.noclassdeffounderror:scala/collection/gentraversableonce$class位于kafka.utils.pool.(pool。scala:28)在kafka.consumer.zookeeperconsumerconnector。scala:91)在kafka.consumer.zookeeperconsumerconnector。scala:143)在kafka.consumer.consumer$.create(consumerconnector。scala:94)在org.apache.spark.streaming.kafka.kafkareceiver.onstart(kafkainputdstream。scala:100)在org.apache.spark.streaming.receiver.receiversupervisor.startreceiver(receiversupervisor。scala:149)在org.apache.spark.streaming.receiver.receiversupervisor.start(receiversupervisor。scala:131)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:597)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:587)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:70)在org.apache.spark.scheduler.task.run(task。scala:86)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)原因:java.lang.classnotfoundexception:scala.collection.gentraversableonce$class atjava.net.urlclassloader.findclass(urlclassloader。java:381)在java.lang.classloader.loadclass(classloader。java:424)在sun.misc.launcher$appclassloader.loadclass(launcher。java:331)在java.lang.classloader.loadclass(classloader。java:357) ... 17更多16/10/24 15:27:20信息流上下文:从关闭挂钩调用stop(stopgracefully=false)16/10/24 15:27:20警告tasksetmanager:在阶段0.0中丢失任务0.0(tid 0,localhost):java.lang.noclassdeffounderror:scala/collection/gentraversableonce$class位于kafka.utils.pool.(pool。scala:28)在kafka.consumer.zookeeperconsumerconnector。scala:91)在kafka.consumer.zookeeperconsumerconnector。scala:143)在kafka.consumer.consumer$.create(consumerconnector。scala:94)在org.apache.spark.streaming.kafka.kafkareceiver.onstart(kafkainputdstream。scala:100)在org.apache.spark.streaming.receiver.receiversupervisor.startreceiver(receiversupervisor。scala:149)在org.apache.spark.streaming.receiver.receiversupervisor.start(receiversupervisor。scala:131)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:597)在org.apache.spark.streaming.scheduler.receivertracker$receivertrackerendpoint$$anonfun$9.apply(receivertracker。scala:587)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.sparkcontext$$anonfun$33.apply(sparkcontext。scala:1993)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:70)在org.apache.spark.scheduler.task.run(task。scala:86)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:274)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:617)在java.lang.thread.run(线程。java:745)原因:java.lang.classnotfoundexception:scala.collection.gentraversableonce$class atjava.net.urlclassloader.findclass(urlclassloader。java:381)在java.lang.classloader.loadclass(classloader。java:424)在sun.misc.launcher$appclassloader.loadclass(launcher。java:331)在java.lang.classloader.loadclass(classloader。java:357) ... 还有17个
16/10/24 15:27:20错误tasksetmanager:阶段0.0中的任务0失败1次;
1条答案
按热度按时间fdx2calv1#
scala 2.10和2.11的集合api不同