我有一个@transform_pandas代码,它加载输入文件进行计算。
在计算函数中,我有一个for循环,它必须读取完整的输入数据,并对每次迭代进行相应的过滤。
@transform_pandas(
Output("/FCA_Foundry/dataset1"),
source_df=Input(sample),
)
我有下面的代码,我试图在for循环中的每次迭代中读取source_df数据集,并根据年份和家族过滤数据集,然后进行计算。
def compute(source_df):
for entire_row in vhcl_df.itertuples():
modyr = entire_row[1]
fam = str(entire_row[2])
/* source_df should be read again here.
source_df = source_df.loc[source_df['i_yr']==modyr]
source_df = source_df.loc[source_df['fam']==fam]
...
有没有办法做到这一点。谢谢大家的支持。
1条答案
按热度按时间7fyelxc51#
正如@nicornk在注解中所建议的,应该在声明转换之后立即为
source_df
创建一个新的.copy()
项。这两个过滤步骤(如果你不需要只处理“modyr filtered”source_df的话,也可以合并为一个)。
请注意,
modyr
、fam
是vhcl_df
的实际列名,实际上,以一种更简洁明了的方式,它实际上可以写为
PS:记住,如果
source_df
很大,你应该继续使用PySpark(参见铸造文档)请注意,
transform_pandas
只能用于内存容量足够大的数据集。如果您有更大的数据集,希望在转换为Pandas之前先过滤掉,则应使用transform_df()
装饰器和pyspark.sql.SparkSession.createDataFrame()
方法编写转换。