我有一个python脚本,目前在我的桌面上运行。它需要一个包含大约2500万行(可能是15列左右)的csv文件,并执行逐行操作。
对于每一行输入,产生多个输出行。然后将结果逐行输出到一个csv文件中,输出结果约为1亿行。
代码如下所示:
with open(outputfile,"a") as outputcsv:
with open(inputfile,"r") as input csv:
headerlist=next(csv.reader(csvfile)
for row in csv.reader(csvfile):
variable1 = row[headerlist.index("VAR1")]
variableN = row[headerlist.index("VARN")]
while calculations not complete:
do stuff #Some complex calculations are done at this point
outputcsv.write(stuff)
我们现在尝试使用pyspark将脚本转换为通过hadoop运行。我都不知道怎么开始。我正在尝试解决如何迭代rdd对象,但我认为这是不可能的。
这样的逐行计算是否适合分布式处理?
1条答案
按热度按时间qrjkbowd1#
如果您直接想运行脚本,可以通过spark submit执行:
但是我建议使用sparkapi,因为它们很容易使用。它将降低编码开销。
首先,您必须创建一个spark会话变量,以便可以访问所有spark函数:
接下来,如果要加载csv文件:
您可以指定可选参数,如headers、inferschema等:
“file”现在将是pysparkDataframe。
现在可以这样编写结束输出:
有关转换和其他信息,请参阅文档:spark文档。