pyspark流媒体+kafka字数不打印任何结果

1hdlvixo  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(415)

这是我第一次与Kafka和spark流媒体互动,我正在尝试运行下面给出的wordcount脚本。正如许多在线博客中给出的那样,这个脚本相当标准。但不管出于什么原因,spark streaming并不是在打印单词。它没有抛出任何错误,只是不显示计数。我已经通过console consumer测试了这个主题,并且正确地显示了一些消息。我甚至试着用foreachrdd来查看输入的行,但也没有显示任何内容。
提前谢谢!
版本:Kafka2.11-0.8.2.2、spark2.2.1、spark-streaming-kafka-0-8-assemblyKafka2.11-2.2.1

from __future__ import print_function

import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.context import SQLContext

sc = SparkContext(appName="PythonStreamingKafkaWordCount")
sc.setCheckpointDir('c:\Playground\spark\logs')
ssc = StreamingContext(sc, 10)
ssc.checkpoint('c:\Playground\spark\logs')

zkQuorum, topic = sys.argv[1:]
print(str(zkQuorum))
print(str(topic))
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
print(kvs)

counts = lines.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a+b)
counts.pprint(num=10)
ssc.start()
ssc.awaitTermination()

生产商代码:

import sys,os
from kafka import KafkaProducer
from kafka.errors import KafkaError
import time

producer = KafkaProducer(bootstrap_servers="localhost:9092")
topic = "KafkaSparkWordCount"

def read_file(fileName):
    with open(fileName) as f:
        print('started reading...')
        contents = f.readlines()
        for content in contents:
            future = producer.send(topic,content.encode('utf-8'))
            try:
                future.get(timeout=10)
            except KafkaError as e:
                print(e)
                break
            print('.',end='',flush=True)
            time.sleep(0.2)

    print('done')       

if __name__== '__main__':
    read_file('C:\\\PlayGround\\spark\\BookText.txt')
rslzwgfq

rslzwgfq1#

你用多少核?
spark流至少需要两个内核,一个用于接收器,一个用于处理器。

相关问题