并行地将数据写入parquet格式

pu3pd22g  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(580)

我有一个相对巨大的本地表(约15亿行),我正试图使用aws胶水将其以Parquet格式拉入awss3。我使用sparkjdbc读取表并将其写入s3。问题是我不能一次从源表中提取所有数据,因为源数据库会耗尽内存并发出抱怨。为了解决这个问题,我使用了predicates选项来并行下推过滤器,它可以很好地将数据拉入大约2亿块的数据中。但是,当我尝试将这个Dataframe写入s3时,几乎需要半个小时才能完成:

df = spark.read.jdbc(url=host_url, 
                        table="TABLENAME",
                        predicates=predicates,
                        properties= {
                            "user" : username,
                            "password" : password
                            }
                        )

所以我要做的是从db stage顺序读取: Read Part 1 from DB --> Read Part 2 from DB --> Read Part 3 from DB 然后将所有数据并行地写入s3 Write Part 1 || Write Part 2 || Write Part 3 我有两个问题:
我不知道spark什么时候会把这些查询发送到db。我知道当我定义如上所示的dataframe时不是这样的,所以我不知道如何序列化stage1。
我环顾四周,找不到一个选项来并行地将多个Dataframe写入Parquet地板分区。我应该仅仅使用python来并行化Dataframe来编写操作语句吗?这样做明智吗?

nwnhqdif

nwnhqdif1#

spark会在应用某个操作后立即读取数据,因为您只是在对s3进行读写操作,所以在触发写操作时会读取数据。
spark没有优化为从rdbms读取大量数据,因为它只建立到数据库的单个连接。如果您想继续使用spark进行读取,请尝试将fetchsize属性增加到100000,默认值为1000。
对于数据的并行处理,可以尝试利用python多处理并执行并行读写

Thread 1
Read 1 -> Write 1
Thread 2
Read 2 -> Write 2

但第一次尝试执行只是顺序的

Read 1 -> Write 1 -> Read 2 -> Write 2

我建议的另一种方法是使用dms或sct一次将所有数据发送到s3。
dms可以在s3中以Parquet格式转储数据,而且速度非常快,因为它针对迁移任务本身进行了优化。
如果不想使用dms,可以编写一个sqoop导入作业,该作业可以通过临时emr集群触发。sqoop还可以导入Parquet格式的数据。
glue最适合于转换已经存在的数据和迁移大数据,您应该借助其他服务。

相关问题