我正在尝试将Kafka的日志/主题流式传输到pyspark脚本。
我已使用docker spark cluster提交此作业
到目前为止,我已经完成了以下配置
# Used 2182 for both zookeeper and kafka, since 2181 was already used
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --topic event --bootstrap-server localhost:9092
$ bin/kafka-console-producer.sh --topic event --bootstrap-server localhost:9092
$ bin/kafka-console-consumer.sh --topic event --from-beginning --bootstrap-server localhost:9092
在这里,生产者和消费者的工作很好。。在那里我可以看到交换的信息
所以在脚本中,我使用了端口9092作为生产者和消费者使用的端口
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from pprint import pprint
sc = SparkContext("spark://<mymaster>:7077",appName="Kafka-logs")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc,5)
print("Pyspark Launched")
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:9092', 'event', {'imagetext':1})
# print('contexts =================== {} {}');
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()
ssc.start()
ssc.awaitTermination()
## below is the spark-submit i used
docker run --add-host="localhost: <myec2ip>" --rm -it --link master:master --volumes-from pyspark_vol spark-submit_update spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.1.1 --jars /home/spark/spark-2.1.1-bin-hadoop2.6/jars/spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar --master spark://mymaster:7077 /data/spark_kafka.py localhost 9092 –topic event
但我得到以下错误:
TimeoutException: Unable to connect to zookeeper server within timeout: 10000
所以我在脚本中将端口改为2182
kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2182', 'event', {'imagetext':1})
之后,这个问题得到了解决。但发现脚本一直在监听流,甚至我从producer传递了主题,而没有从脚本生成输出
Time: 2020-09-08 05:35:05
-------------------------------------------
-------------------------------------------
Time: 2020-09-08 05:35:10
-------------------------------------------
-------------------------------------------
Time: 2020-09-08 05:35:15
-------------------------------------------
-------------------------------------------
Time: 2020-09-08 05:35:20
-------------------------------------------
-------------------------------------------
Time: 2020-09-08 05:35:25
-------------------------------------------
我不了解这类问题以及如何解决它?
感谢你的帮助?
谢谢
暂无答案!
目前还没有任何答案,快来回答吧!