这是我第一次与Kafka和spark流媒体互动,我正在尝试运行下面给出的wordcount脚本。正如许多在线博客中给出的那样,这个脚本相当标准。但不管出于什么原因,spark streaming并不是在打印单词。它没有抛出任何错误,只是不显示计数。我已经通过console consumer测试了这个主题,并且正确地显示了一些消息。我甚至试着用foreachrdd来查看输入的行,但也没有显示任何内容。
提前谢谢!
版本:Kafka2.11-0.8.2.2、spark2.2.1、spark-streaming-kafka-0-8-assemblyKafka2.11-2.2.1
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.context import SQLContext
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
sc.setCheckpointDir('c:\Playground\spark\logs')
ssc = StreamingContext(sc, 10)
ssc.checkpoint('c:\Playground\spark\logs')
zkQuorum, topic = sys.argv[1:]
print(str(zkQuorum))
print(str(topic))
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
print(kvs)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint(num=10)
ssc.start()
ssc.awaitTermination()
生产商代码:
import sys,os
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time
producer = KafkaProducer(bootstrap_servers="localhost:9092")
topic = "KafkaSparkWordCount"
def read_file(fileName):
with open(fileName) as f:
print('started reading...')
contents = f.readlines()
for content in contents:
future = producer.send(topic,content.encode('utf-8'))
try:
future.get(timeout=10)
except KafkaError as e:
print(e)
break
print('.',end='',flush=True)
time.sleep(0.2)
print('done')
if __name__== '__main__':
read_file('C:\\\PlayGround\\spark\\BookText.txt')
1条答案
按热度按时间rslzwgfq1#
你用多少核?
spark流至少需要两个内核,一个用于接收器,一个用于处理器。