python中的hadoop mapreduce | sma

xwmevbvl  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(324)

我对python和mapreduce比较陌生。我尝试使用python中的ta lib库计算简单移动平均(sma)。我有这样一个数据框:

AA     BB  
2008-03-05  36.60  36.60  
2008-03-04  38.37  38.37  
2008-03-03  38.71  38.71  
2008-03-02  38.00  38.00
2008-03-01  38.32  38.32
2008-02-29  37.14  37.14

aa和bb是股票符号,显示了它们在6天内的值。
有人能帮帮我吗?map应该执行什么,应该减少什么输入?
最终输出应打印库存a和b的SMA。

l7wslrjt

l7wslrjt1#

什么是sma(简单移动平均线)?一种简单的或算术的移动平均数,通过将证券在若干时间段内的收盘价相加,然后除以这些时间段的总数来计算。
例如,在上例中,收盘价为:37.14(2008-02-29)、38.32(2008-03-01)、38.00(2008-03-02)、38.71(2008-03-03)、38.37(2008-03-04)、36.60(2008-03-05)。
因此,2008-03-02的3天sma为(37.14+38.32+38.00)/3=37.82 2008-02-29没有3天sma(因为只有1天的数据:2008-02-29),2008-03-01没有3天sma(只有2天的数据:2008-02-29,2008-03-01)。
以下是针对数据的3天sma的解决方案(您可以轻松地将其更改为n天sma)。
Map器(m.py):

import sys
for line in sys.stdin:
    val = line.strip()
    vals = val.split('\t')
    print "%s\t%s:%s" % (vals[0], vals[1], vals[2])

Map器逻辑:它只读取行中以制表符分隔的值并输出“{key}\t{val1}:{val2}”。
例如,对于第一行(制表符分隔值):

2008-03-05    36.60    36.60

它输出:

2008-03-05    36.60:36.60

异径管(r.py):

import sys

lValueA = list()
lValueB = list()

smaInterval = 3

for line in sys.stdin:
    (key, val) = line.strip().split('\t')

    vals = val.split(':')
    lValueA.append(float(vals[0]))
    lValueB.append(float(vals[1]))
    if len(lValueA) == smaInterval:     

        sumA = 0;
        sumB = 0;

        for a in lValueA:
            sumA += a
        for b in lValueB:
            sumB += b

        sumA = sumA / smaInterval;
        sumB = sumB / smaInterval;

        print "%s\t%.2f\t%.2f" % (key, sumA, sumB);
        del lValueA[0]
        del lValueB[0]

减速器逻辑:
它使用两个列表。一个用于a股,一个用于b股。
假设sma间隔为3( smaInterval = 3 )
当输入行进入时,它解析该行并将值a和值b附加到它们各自的列表中
当任何列表的大小达到3(即sma间隔)时,它计算移动平均值并输出(键,sma表示股票a,sma表示股票b),并从每个列表中删除第0个元素。
我执行这个是为了你的意见。
我在没有使用hadoop的情况下执行了它,如下所示(input.txt包含问题中提到的输入,用制表符分隔值):

cat input.txt | python m.py | sort | python r.py

我得到了以下输出(经验证是正确的):

2008-03-02      37.82   37.82
2008-03-03      38.34   38.34
2008-03-04      38.36   38.36
2008-03-05      37.89   37.89

您应该能够使用hadoop框架执行相同的操作,如下所示:

hadoop jar hadoop-streaming-2.7.1.jar -input {Input directory in HDFS} -output {Output directory in HDFS} -mapper {Path to the m.py} -reducer {Path to the r.py}

注:此代码可以优化,可能是,你不需要减速器在所有。如果您的数据很小,则可以在Map器本身读取所有值,对它们进行排序,然后计算sma。我刚刚编写了这段代码,以说明使用hadoop流计算sma。

相关问题