pyspark,kafka时出错

h6my8fg2  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(325)

我需要使用pyspark.com从kafka检索数据,但我不断得到“py4j.protocol.py4jerror:调用o26.createstream时出错”。我对Kafka和皮斯帕克完全陌生。任何帮助都将是巨大的。我的代码是follows:-

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import os

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.10:2.2.1 pyspark-shell'

if __name__ == "__main__":

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 60)
    print("spark cotext set")

    zkQuorum, topic = 'localhost:2181','near_line'
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
    print("connection set")
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到一个错误follows:-

Exception in thread "Thread-2" java.lang.NoClassDefFoundError: kafka/common/TopicAndPartition
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.privateGetPublicMethods(Class.java:2902)
    at java.lang.Class.getMethods(Class.java:1615)
    at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: kafka.common.TopicAndPartition
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 12 more
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1035, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/home/nayanam/PycharmProjects/recommendation_engine/derivation/kafka_consumer_test.py", line 37, in <module>
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "console-consumer-68081", {topic: 1})
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/pyspark/streaming/kafka.py", line 70, in createStream
    jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/nayanam/anaconda3/lib/python3.5/site-packages/py4j/protocol.py", line 327, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o26.createStream

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题