我目前正在编写一个Python函数,这个过程应该在包含我的数据结构的panda Dataframe 上循环(我得到哪个表包含我正在寻找的字段的值的信息),然后循环一个spark Dataframe ,该 Dataframe 从precedent循环加载正确的表,如果遇到该字段的值,我们将其添加到记录列表和 Dataframe 中,该 Dataframe 本身将在处理结束时被返回以被转换成CSV。
df_meta = pd.read_csv("/dbfs/mnt/resources/path/file_meta.csv", sep=';')
liste_t = []
def recursive_process(field, id_p, list_drop):
for row in df_meta.index:
if df_meta['SOURCE_COLUMN_NAME'][row] == field:
df_table = spark.read.table("source1"+"."+df_meta['SOURCE_TABLE_NAME'][row])
data_collect = df_table.collect()
for row2 in data_collect:
if row2(field) == id_p and row2(field) not in list_drop:
list_drop.append(id_p)
#add field + value to final dataframe
return list_drop
在参数中,我给出了目标字段、该字段的值id_p
和一个list
来记录我已经处理过的字段。
问题是:我真的不知道如何处理包含我的数据的spark Dataframe ,我读到了我尝试使用的collect()
方法,但我不确定它在这里是否有效。到目前为止,我希望我的代码编辑我的空列表,并返回将添加到我的最终 Dataframe 中的值。但当我调用我的函数时:
recursive_process("Col_ID","1003729193",liste_t)
列表只返回不正常的任何内容......因此,我想知道如何处理spark Dataframe ?以及如何返回在我的循环内编辑的列表/ Dataframe ?(我担心这些过程只是发生在我的循环中,但在这些循环外保持不变)。
谢谢帮忙!
1条答案
按热度按时间i7uq4tfw1#
您可以使用以下内容过滤 Dataframe :
df_table.filter(f"{field} = {id_p}").filter(f"{field} NOT IN {list_drop}")
那么它就取决于这个滤波器的大小:
df.write
方法)的结果保存在磁盘上,然后用spark读取。df.union()
),然后将这个临时df的最终状态写入磁盘。如果你去Spark,你应该一路去Spark(不是收集而是迭代行)。如果你不太了解Spark Apis,你可以使用Pandas一个具有以下导入:
import pyspark.pandas as pd