pyspark协同组rdd

omhiaaxx  于 2021-05-17  发布在  Spark
关注(0)|答案(0)|浏览(328)

我是pyspark的新手,经过2天的搜索,我仍然不明白我在cogroup上犯了什么错。这就是我想要做的:我得到了一个包含很多单词的文本文件,每个单词都有一个值:

Hello 5
  .
  .
  .
Hi    8
Ops   9

我得到了另一个包含句子的文件 Hello my name is name 我想根据第一个文件计算整个句子的值。正如您在代码中看到的,我将第一个文件转换为rdd,如下所示: [(Hi,8),...(Ops,9)] 对于第二个文件,我要创建如下所示的rdd: [(Hello,1),...(Name,2)] 现在,当我尝试组合时,我遇到了以下错误:

AttributeError                            Traceback (most recent call last)
<ipython-input-3-c424da6be07f> in <module>
      2 lines = ssc.textFileStream(dataDirectory)
      3 
----> 4 counts = lines.flatMap(lambda line: line.split(" ")) \
      5    .map(lambda x: (x, 1)) \
      6    .reduceByKey(lambda a, b: a + b) \

/usr/local/spark/spark/python/pyspark/streaming/dstream.py in cogroup(self, other, numPartitions)
    350         if numPartitions is None:
    351             numPartitions = self._sc.defaultParallelism
--> 352         return self.transformWith(lambda a, b: a.cogroup(b, numPartitions), other)
    353 
    354     def join(self, other, numPartitions=None):

/usr/local/spark/spark/python/pyspark/streaming/dstream.py in transformWith(self, func, other, keepSerializer)
    313         jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer)
    314         dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
--> 315                                                           other._jdstream.dstream(), jfunc)
    316         jrdd_serializer = self._jrdd_deserializer if keepSerializer else self._sc.serializer
    317         return DStream(dstream.asJavaDStream(), self._ssc, jrdd_serializer)

AttributeError: 'PipelinedRDD' object has no attribute '_jdstream'

这是我的密码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext 

# Create Streaming Context with batch interval of 5 second.

ssc = StreamingContext(sc, 5)

# creating rdd for all the words in the dictionary file

text_file = sc.textFile('AFINN-111.txt')
def createPair(line):
    x = line.replace("\t"," ").split(" ")
    return (x[0],int(x[1]))

dictionary = text_file.map(createPair)
print(dictionary.take(20))
                         dataDirectory = 'FILES'
lines = ssc.textFileStream(dataDirectory) 

counts = lines.flatMap(lambda line: line.split(" ")) \
   .map(lambda x: (x, 1)) \
   .reduceByKey(lambda a, b: a + b) \
   .cogroup(dictionary)

counts.pprint()

# Start the computation

ssc.start() 
ssc.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题