我试过了
全局变量—此方法有效。
使用额外的成员变量对sparkcontext进行子类化,然后通过我的回调中的rdd.context().getmydata()获取额外的成员变量--此方法失败,错误为“我的子类类型不可调用”。
我也是spark的新手,我想知道有没有比全局变量更好的方法?或者如果Spark提供了一种我错过的方法?我共享的数据是一个单独的缓冲区,用于每个rdd中的样本,因为我输入样本的快速傅立叶变换算法需要一个特定的样本计数,所以我缓冲样本,直到缓冲区大小达到这个特定计数。任何意见和建议都将不胜感激!!
我的代码示例:
limit = 64
buffer=[]
def processSamples(rdd):
global limit
global buffer
lines = rdd.collect()
for (k,v) in lines:
if len(buffer) == limit:
FFT(buffer)
del buffer[:]
buffer.append(v)
if __name__ == "__main__":
sc = SparkContext()
ssc = StreamingContext(sc, 2)
topic = "topic1"
ks = KafkaUtils.createDirectStream(ssc, [topic], {"bootstrap.servers": "kafka:9092"})
ks.foreachRDD(processSamples)
ssc.start()
ssc.awaitTermination()
暂无答案!
目前还没有任何答案,快来回答吧!