为什么在集成spark流媒体和twitter api时总是将空rdd发送给spark?

h4cxqtbf  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(260)

我是新的Spark流,我正在做一个小的个人项目,研究技术。我想使用twitterapi获取实时tweets,然后使用spark streaming转换流数据以可视化流行的标记。
我写了一个python脚本 twitter_api.py 从twitter api获取tweets,然后通过tcp连接将数据发送到spark。我认为这一步没有问题,因为我可以打印出获得的推文。但是,在另一个脚本中 spark.py ,我总是 'ValueError' 在处理rdd时。上面说我的rdd是空的。
我想 pyspark 在我的笔记本电脑上配置得很好,因为我可以运行静态示例。
这个 spark.py 脚本如下:


# !/usr/bin/env python3

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
from twitter_app import TCP_IP, TCP_PORT, KEYWORD
import seaborn as sns
import matplotlib.pyplot as plt
import time
import sys

def spark(TCP_IP, TCP_PORT, KEYWORD):
   sc = SparkContext(appName='TwitterStreamingApp')
   sc.setLogLevel('ERROR')

   ssc = StreamingContext(sc, 5)

   ssc.checkpoint("checkpoint_TwitterApp")

   data_stream = ssc.socketTextStream(TCP_IP, TCP_PORT)
   lines = data_stream.window(20)

   words = lines.flatMap(lambda x: x.split(' '))
   hashtags = words.filter(lambda x: '#' in x)  

   def process_rdd(rdd):
       try:
           # initialization of SparkSession
           spark = SparkSession.builder.config(conf=rdd.context.getConf()).getOrCreate()

           # map each rdd to each row
           rdd_row = rdd.map(lambda x: Row(tag=x))

           # create the dataframe
           hashtags_df = spark.createDataFrame(rdd_row)

           hashtags_df.createOrReplaceTempView('tags')
           hashtags_count_df = spark.sql(
            'SELECT tag, count(tag) FROM tags GROUP BY tag ORDER BY COUNT(tag) DESC LIMIT 10')

           pd_df = hashtags_count_df.toPandas()

           plt.figure(figsize=(10, 8))
           sns.barplot(x="total", y="word", data=pd_df.head(20))
           plt.show()

       except:
           e = sys.exc_info()[0]
           print("Error: %s" % e)

   hashtags.foreachRDD(process_rdd)
   ssc.start()

   # wait for the streaming to finish
   ssc.awaitTermination()

if __name__ == "__main__":
    spark(TCP_IP, TCP_PORT, KEYWORD)

我犯的错误
我真的不知道出了什么问题!
我可以使用 twitter_api.py 我收到的推文
pyspark应该配置得很好,因为我可以在笔记本电脑上运行静态spark。例如,即使我收到警告,我也可以使用spark获得正确的输出
p、 我正在使用MacBookPro 2018和catalina 10.15.5,2.3 ghz四核,intel core i5

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题