使用分隔符的Spark分离不正确(在高阶内)?

bjg7j2ky  于 2023-03-13  发布在  Apache
关注(0)|答案(1)|浏览(108)

我的任务是将这个spark python udf转换为pyspark原生函数...

def parse_access_history_json_table(json_obj):
    '''
    extracts
    list of DBs, Schemas, Tables
    from json_obj field in access history
    '''
    db_list = set([])
    schema_list = set([])
    table_list = set([])
    try:
        for full_table_name in json_obj:
            full_table_name = full_table_name.lower()
            full_table_name_array = full_table_name.split(".")
            if len(full_table_name_array) == 3:
                db_list.add(full_table_name_array[0])
                schema_list.add(full_table_name_array[1])
                table_list.add(full_table_name_array[2])
            elif len(full_table_name_array) == 2:
                schema_list.add(full_table_name_array[0])
                table_list.add(full_table_name_array[1])
            else:
                table_list.add(full_table_name_array[0])
    except Exception as e:
        print(str(e))
    return (list(db_list), list(schema_list), list(table_list))

json_obj是数组类型col,示例值为[db1.s1.t1,s2.t2,t3],最终需要3个col用于db/schema/table,每个都作为db等的数组...['db 1][' s1 ',' s2 '] & tables [' t1 ',' t2 ',' t3 ']
我试图把它转换成一个x,y,z的列表,但是拆分出错了。(不确定是否有一个更快/更好的方法,不使用udf/lambda,即python转换)

from pyspark.sql.types import *

cSchema = StructType([StructField("WordList", ArrayType(StringType()))])

test_list = [['database.schema.table']], [['t3','d1.s1.t1','s2.t2']]

df = spark.createDataFrame(test_list,schema=cSchema)

df.withColumn("Filtered_Col", expr(f"transform(WordList,x -> split(x,'\.')  )")).show()

正在给予

+-----------------+--------------------+
|         WordList|        Filtered_Col|
+-----------------+--------------------+
|[share_db.sc.tbl]|[[, , , , , , , ,...|
|[x a, d.s.t, d.s]|[[, , , ], [, , ,...|
+-----------------+--------------------+

不知道为什么。我有Spark2.4。

xmjla07d

xmjla07d1#

问题出在正则表达式上,它是split函数的第二个参数。
反斜杠也必须转义,所以\\\.将得到正确的结果:

df.withColumn("Filtered_Col", expr("transform(WordList,x -> split(x,'\\\.')  )")).show(truncate=False)
+-----------------------+------------------------------+
|WordList               |Filtered_Col                  |
+-----------------------+------------------------------+
|[database.schema.table]|[[database, schema, table]]   |
|[t3, d1.s1.t1, s2.t2]  |[[t3], [d1, s1, t1], [s2, t2]]|
+-----------------------+------------------------------+

相关问题