有没有更好的方法使用python处理spark/aws glue中的postgresql存储

9rygscc1  于 2021-05-24  发布在  Spark
关注(0)|答案(0)|浏览(239)

我正在使用aws glue对postgresql中存储的数据执行etl,其中包含许多动态hstore字段。我需要使用hstores中的一些字段执行操作。
让我给一些我是如何做的,这样你可以帮助我与另一个选择或使这更好的上下文。
数据从glue目录加载到daynamicframe中
我将dynamicframe转换为spark dataframe,用于一些类似sql的操作(我不能使用dynamicframe,因为我需要一些分组、聚合和排序)
glue目录不支持hs存储并将列作为字符串加载。
使用spark,我将hstore字符串转换为json字符串,然后使用来自\ujson的spark将列作为Map类型加载。

import pyspark.sql.functions as F
import pyspark.sql.types as T

df = dynamicFrame.toDF()
df = df.withColumn("column_from_hstore", F.concat(F.lit("{"), F.col("column"), F.lit("}")))
df = df.withColumn("column_json", F.regexp_replace(F.col("column_from_hstore"), '=>', ":"))
df = df.withColumn("column_map", F.from_json(F.col("column_json"), T.MapType(T.StringType(), T.StringType())))
df = df.withColumn("column_child", F.col("column_map.child").cast('int'))

最后,我可以使用子字段。
我还有别的选择
我可以直接将spark连接到postgres,使用原始sql加载数据并选择所需的字段,但这需要我在glue上管理jdbc连接证书。我找不到一个简单的方法来做这件事。
问题
因为我是新的Spark,我不知道如何表现这可以或如果有更好的方法。感谢您的帮助

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题