pyspark 在palantir中为for循环中的每次迭代加载输入数据集

f1tvaqid  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(178)

我有一个@transform_pandas代码,它加载输入文件进行计算。
在计算函数中,我有一个for循环,它必须读取完整的输入数据,并对每次迭代进行相应的过滤。

@transform_pandas(
    Output("/FCA_Foundry/dataset1"),
    source_df=Input(sample),
    )

我有下面的代码,我试图在for循环中的每次迭代中读取source_df数据集,并根据年份和家族过滤数据集,然后进行计算。

def compute(source_df):
        for entire_row in vhcl_df.itertuples():
            modyr = entire_row[1]
            fam = str(entire_row[2])

            /* source_df should be read again here.

            source_df = source_df.loc[source_df['i_yr']==modyr]
            source_df = source_df.loc[source_df['fam']==fam]
            ...

有没有办法做到这一点。谢谢大家的支持。

7fyelxc5

7fyelxc51#

正如@nicornk在注解中所建议的,应该在声明转换之后立即为source_df创建一个新的.copy()项。
这两个过滤步骤(如果你不需要只处理“modyr filtered”source_df的话,也可以合并为一个)。
请注意,modyrfamvhcl_df的实际列名,实际上,

@transform_pandas(
    Output("/FCA_Foundry/dataset1"),
    source_df=Input(sample),
    vhcl_df=Input(path)
)
def compute(source_df, vhcl_df):
    for modyr, fam in vhcl_df.items():
        temp_df = source_df.copy()
        temp_df = source_df.loc[source_df['i_yr']==modyr]
        temp_df = source_df.loc[source_df['fam']==str(fam)]

以一种更简洁明了的方式,它实际上可以写为

def compute(source_df, vhcl_df):
    for modyr, fam in vhcl_df.items():
        temp_df = source_df.copy()
        filtered_temp_df = temp_df[(temp_df.i_yr==modyr) & (temp_df.fam==str(fam))]

PS:记住,如果source_df很大,你应该继续使用PySpark(参见铸造文档)
请注意,transform_pandas只能用于内存容量足够大的数据集。如果您有更大的数据集,希望在转换为Pandas之前先过滤掉,则应使用transform_df()装饰器和pyspark.sql.SparkSession.createDataFrame()方法编写转换。

相关问题