如何在pyspark作业中的回调之间共享变量?

csga3l58  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(209)

我试过了
全局变量—此方法有效。
使用额外的成员变量对sparkcontext进行子类化,然后通过我的回调中的rdd.context().getmydata()获取额外的成员变量--此方法失败,错误为“我的子类类型不可调用”。
我也是spark的新手,我想知道有没有比全局变量更好的方法?或者如果Spark提供了一种我错过的方法?我共享的数据是一个单独的缓冲区,用于每个rdd中的样本,因为我输入样本的快速傅立叶变换算法需要一个特定的样本计数,所以我缓冲样本,直到缓冲区大小达到这个特定计数。任何意见和建议都将不胜感激!!
我的代码示例:

limit = 64
buffer=[]
def processSamples(rdd):
     global limit
     global buffer
     lines = rdd.collect()
     for (k,v) in lines:
       if len(buffer) == limit:
           FFT(buffer)
           del buffer[:]
       buffer.append(v)
if __name__ == "__main__":
     sc = SparkContext()
     ssc = StreamingContext(sc, 2)
     topic = "topic1"
     ks = KafkaUtils.createDirectStream(ssc, [topic], {"bootstrap.servers": "kafka:9092"})
     ks.foreachRDD(processSamples)
     ssc.start()
     ssc.awaitTermination()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题