看了其他答案后,我还是想不出来。
我能够使用kafkaproducer和kafkaconsumer从我的笔记本中发送和接收消息。
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],value_serializer=lambda m: json.dumps(m).encode('ascii'))
consumer = KafkaConsumer('hr',bootstrap_servers=['127.0.0.1:9092'],group_id='abc' )
我尝试用spark上下文和spark会话连接到流。
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext("local[*]", "stream")
ssc = StreamingContext(sc, 1)
这给了我一个错误
Spark Streaming's Kafka libraries not found in class path. Try one
of the following.
1. Include the Kafka library and its dependencies with in the
spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-
kafka-0-8:2.3.2 ...
看来我需要把jar加到我的table上
!/usr/local/bin/spark-submit --master local[*] /usr/local/Cellar/apache-spark/2.3.0/libexec/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar pyspark-shell
它回来了
Error: No main class set in JAR; please specify one with --class
Run with --help for usage help or --verbose for debug output
我上什么课?如何让Pypark连接到消费者?
1条答案
按热度按时间jckbn6z71#
您的命令正在尝试运行
spark-streaming-kafka-0-8-assembly_2.11-2.3.2.jar
,并试图找到pyspark-shell
作为其中的一个java类。正如第一个错误所说,您错过了
--packages
之后spark-submit
,也就是说你会如果你只是本地的jupyter,你可能想尝试Kafkapython,例如,而不是Pypark。。。开销更小,而且没有java依赖关系。