将python脚本转换为能够在spark/hadoop中运行

yk9xbfzb  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(312)

我有一个python脚本,目前在我的桌面上运行。它需要一个包含大约2500万行(可能是15列左右)的csv文件,并执行逐行操作。
对于每一行输入,产生多个输出行。然后将结果逐行输出到一个csv文件中,输出结果约为1亿行。
代码如下所示:

with open(outputfile,"a") as outputcsv:
    with open(inputfile,"r") as input csv:
        headerlist=next(csv.reader(csvfile)
        for row in csv.reader(csvfile):
            variable1 = row[headerlist.index("VAR1")]
            variableN = row[headerlist.index("VARN")]
            while calculations not complete:
                do stuff #Some complex calculations are done at this point
                outputcsv.write(stuff)

我们现在尝试使用pyspark将脚本转换为通过hadoop运行。我都不知道怎么开始。我正在尝试解决如何迭代rdd对象,但我认为这是不可能的。
这样的逐行计算是否适合分布式处理?

qrjkbowd

qrjkbowd1#

如果您直接想运行脚本,可以通过spark submit执行:

spark-submit master local[*]/yarn other_parameters path_to_your_script.py

但是我建议使用sparkapi,因为它们很容易使用。它将降低编码开销。
首先,您必须创建一个spark会话变量,以便可以访问所有spark函数:

spark = SparkSession
   .builder()
   .appName("SparkSessionZipsExample")
   .config("parameters", "value")
   .getOrCreate()

接下来,如果要加载csv文件:

file = spark.read.csv("path to file")

您可以指定可选参数,如headers、inferschema等:

file=spark.read.option("header","true").csv("path to your file")

“file”现在将是pysparkDataframe。
现在可以这样编写结束输出:

file.write.csv("output_path")

有关转换和其他信息,请参阅文档:spark文档。

相关问题