Pyspark Dataframe 动态选择子句错误

t98cgbkg  于 2023-05-16  发布在  Spark
关注(0)|答案(1)|浏览(102)

我试图动态地解析select子句到pyspark dataframe,但我一直收到一个错误,说“无法解决...”给定输入列:[value];

split_col = split(df[column_name], delimiter)
file = open(schema_file, 'r')
data = csv.reader(file)
o_str = ''
for row in data:
    if row[0] == rec_type:
        col_names = row[1:]
        for i in range(len(col_names)):
            o_str += ("split_col.getItem("+str(i)+").alias('"+col_names[i]+"'),")
df_out = df.select(o_str.rsplit(',', 1)[0])

上面给出的是代码片段。这里o_str.rsplit(',', 1)[0]解析为split_col.getItem(0).alias('RCD_TYPE'),split_col.getItem(1).alias('VER'),split_col.getItem(2).alias('ID'),split_col.getItem(3).alias('OL_IND'),split_col.getItem(4).alias('PERS_ID')当我在select子句中硬编码值时,它工作正常,但当我尝试动态生成它时,它给出错误

An error occurred while calling o44.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`split_col.getItem(0).alias('RCD_TYPE'),split_col.getItem(1).alias('VER'),split_col.getItem(2).alias('ID'),split_col.getItem(3).alias('OL_IND'),split_col.getItem(4).alias('PERS_ID')`' given input columns: [value];;
'Project ['split_col.getItem(0).alias('RCD_TYPE'),split_col.getItem(1).alias('VER'),split_col.getItem(2).alias('ID'),split_col.getItem(3).alias('OL_IND'),split_col.getItem(4).alias('PERS_ID')']
+- Filter StartsWith(value#2, L)
   +- Relation[value#2] text
df_out = df.select(split_col.getItem(0).alias('RCD_TYPE'),split_col.getItem(1).alias('VER'),split_col.getItem(2).alias('ID'),split_col.getItem(3).alias('OL_IND'),split_col.getItem(4).alias('PERS_ID')) --> This works

df_out = df.select(o_str.rsplit(',', 1)[0]) --> This does not work.
icnyk63a

icnyk63a1#

您正在尝试在Python中执行大部分计算。我建议你走公园路。我从您的代码中了解到的是,您希望跳过select子句中的特定列。如果是这样,在pyspark中这样做:

# Existing dataframe
df.show(5, False)

+----------+----------+----------+----------+
|dummy_col1|dummy_col2|dummy_col3|dummy_col4|
+----------+----------+----------+----------+
|a         |12        |9         |ab        |
|b         |6         |38        |bc        |
|c         |4         |81        |cd        |
|d         |9         |32        |de        |
|e         |5         |19        |ef        |
+----------+----------+----------+----------+

# Mention the column you want to skip
rejected_col = 'dummy_col1'

# Select the columns dynamically skipping one
df_out = df.select([f.col(c) for c in df.columns if c not in rejected_col])

df_out.show(5, False)

+----------+----------+----------+
|dummy_col2|dummy_col3|dummy_col4|
+----------+----------+----------+
|12        |9         |ab        |
|6         |38        |bc        |
|4         |81        |cd        |
|9         |32        |de        |
|5         |19        |ef        |
+----------+----------+----------+

相关问题