我的任务是将这个spark python udf转换为pyspark原生函数...
def parse_access_history_json_table(json_obj):
'''
extracts
list of DBs, Schemas, Tables
from json_obj field in access history
'''
db_list = set([])
schema_list = set([])
table_list = set([])
try:
for full_table_name in json_obj:
full_table_name = full_table_name.lower()
full_table_name_array = full_table_name.split(".")
if len(full_table_name_array) == 3:
db_list.add(full_table_name_array[0])
schema_list.add(full_table_name_array[1])
table_list.add(full_table_name_array[2])
elif len(full_table_name_array) == 2:
schema_list.add(full_table_name_array[0])
table_list.add(full_table_name_array[1])
else:
table_list.add(full_table_name_array[0])
except Exception as e:
print(str(e))
return (list(db_list), list(schema_list), list(table_list))
json_obj是数组类型col,示例值为[db1.s1.t1,s2.t2,t3],最终需要3个col用于db/schema/table,每个都作为db等的数组...['db 1][' s1 ',' s2 '] & tables [' t1 ',' t2 ',' t3 ']
我试图把它转换成一个x,y,z的列表,但是拆分出错了。(不确定是否有一个更快/更好的方法,不使用udf/lambda,即python转换)
from pyspark.sql.types import *
cSchema = StructType([StructField("WordList", ArrayType(StringType()))])
test_list = [['database.schema.table']], [['t3','d1.s1.t1','s2.t2']]
df = spark.createDataFrame(test_list,schema=cSchema)
df.withColumn("Filtered_Col", expr(f"transform(WordList,x -> split(x,'\.') )")).show()
正在给予
+-----------------+--------------------+
| WordList| Filtered_Col|
+-----------------+--------------------+
|[share_db.sc.tbl]|[[, , , , , , , ,...|
|[x a, d.s.t, d.s]|[[, , , ], [, , ,...|
+-----------------+--------------------+
不知道为什么。我有Spark2.4。
1条答案
按热度按时间xmjla07d1#
问题出在正则表达式上,它是split函数的第二个参数。
反斜杠也必须转义,所以
\\\.
将得到正确的结果: