read->update->write没有主键的大表

soat7uwm  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(290)

我正在尝试更新一个大型mysql表中每行的几个字段(有接近 500 百万行)。该表没有任何主键(或者没有像uuid这样的字符串主键)。我没有足够的执行器内存来一次读取和保存整个数据。有人能告诉我处理这些表格的方法吗。
下面是模式

CREATE TABLE Persons ( Personid varchar(255) NOT NULL, LastName varchar(255) NOT NULL, FirstName varchar(255) DEFAULT NULL, Email varchar(255) DEFAULT NULL, Age int(11) DEFAULT NULL) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Spark代码就像

SparkSession spark = SparkSession.builder().master("spark://localhost:7077").appName("KMASK").getOrCreate();
DataFrame rawDataFrame = spark.read().format("jdbc").load();
rawDataFrame.createOrReplaceTempView("data");
//encrypt is UDF
String sql = "select Personid, LastName, FirstName, encrypt(Email), Age from data";
Dataset newData = spark.sql(sql);
newData.write().mode(SaveMode.Overwrite).format("jdbc").options(options).save();

这张table周围有 150 百万条记录,数据大小 6GB . 我对遗嘱执行人的记忆 2 gb . 我可以使用spark-jdbc处理这个表吗。

s8vozzvw

s8vozzvw1#

理想情况下,您可以更改spark jdbc fetchsize 用于减少/增加每次获取和处理的记录数的选项。
对数据进行分区也有助于减少混乱和额外的开销。既然你有 Age 作为一个数值场。您还可以在由时间决定的分区中处理数据。首先确定最小和最大年龄,并使用spark jdbc选项。
尤其是: partitionColumn :
Age lowerBound :您确定的最小年龄 upperBound :您确定的最大年龄 numPartitions :确实取决于核心和工作节点的数量,但这里有更多提示和链接
您还可以使用自定义查询来仅选择和更新一些可以用 query 选项。注意。当使用 query 不应使用的选项 dbtable 选项。

相关问题