我有一个相对巨大的本地表(约15亿行),我正试图使用aws胶水将其以Parquet格式拉入awss3。我使用sparkjdbc读取表并将其写入s3。问题是我不能一次从源表中提取所有数据,因为源数据库会耗尽内存并发出抱怨。为了解决这个问题,我使用了predicates选项来并行下推过滤器,它可以很好地将数据拉入大约2亿块的数据中。但是,当我尝试将这个Dataframe写入s3时,几乎需要半个小时才能完成:
df = spark.read.jdbc(url=host_url,
table="TABLENAME",
predicates=predicates,
properties= {
"user" : username,
"password" : password
}
)
所以我要做的是从db stage顺序读取: Read Part 1 from DB --> Read Part 2 from DB --> Read Part 3 from DB
然后将所有数据并行地写入s3 Write Part 1 || Write Part 2 || Write Part 3
我有两个问题:
我不知道spark什么时候会把这些查询发送到db。我知道当我定义如上所示的dataframe时不是这样的,所以我不知道如何序列化stage1。
我环顾四周,找不到一个选项来并行地将多个Dataframe写入Parquet地板分区。我应该仅仅使用python来并行化Dataframe来编写操作语句吗?这样做明智吗?
1条答案
按热度按时间nwnhqdif1#
spark会在应用某个操作后立即读取数据,因为您只是在对s3进行读写操作,所以在触发写操作时会读取数据。
spark没有优化为从rdbms读取大量数据,因为它只建立到数据库的单个连接。如果您想继续使用spark进行读取,请尝试将fetchsize属性增加到100000,默认值为1000。
对于数据的并行处理,可以尝试利用python多处理并执行并行读写
但第一次尝试执行只是顺序的
我建议的另一种方法是使用dms或sct一次将所有数据发送到s3。
dms可以在s3中以Parquet格式转储数据,而且速度非常快,因为它针对迁移任务本身进行了优化。
如果不想使用dms,可以编写一个sqoop导入作业,该作业可以通过临时emr集群触发。sqoop还可以导入Parquet格式的数据。
glue最适合于转换已经存在的数据和迁移大数据,您应该借助其他服务。