我想把数据保存到MySQL中,覆盖某个字段中重复的行,把挂起的数据不包含的数据保存在MySQL中,我试了Mode.Overwrite/Mode.append,还是不能满足我的需要,所以我试着从MySQL中加载已有的数据,找到行,但是在保存数据到MySQL中的时候,得到的DataFrame变成了空的。
在这个过程中,我尝试了两种方法:
1.找到挂起数据中不存在的数据,然后使用UNION将两部分连接起来,最后使用Mode.Overwrite保存。
1.找到挂起数据中不存在的数据,使用Mode.Overwrite保存挂起的DataFrame,使用Mode.append保存得到的DF。
两种方法都不可用,方法1保存时或方法2保存后,得到的DF一直为空。
代码如下:
var mysql_table = spark.sqlContext.read.format("jdbc").options(jdbc_options).load()
val list = pre_res.select("clientMacAddr").rdd.map(x => x.toString.substring(1,18)).collect()
val rec_diff = mysql_table.filter(x => !(list.contains(x.apply(0).toString)))
pre_res.write.mode("overwrite").format("jdbc").options(jdbc_options).save()
rec_diff.show()
rec_diff.write.mode("append").format("jdbc").options(jdbc_options).save()
结果是这样的:
+-————————————————-+-——-+
|客户端MAC地址|变量1|
+-————————————————-+-——-+
+-————————————————-+-——-+
谢谢。
1条答案
按热度按时间czq61nw11#
你的结果是空的,因为spark是懒惰的。它不执行任何操作,直到你收集数据到驱动程序(reduce,count,collect,show...)或者把数据写到磁盘(write,保存...)。
因此,只有当你调用
rec_diff.show()
时,你的mysql表才被读取并与pre_res
进行比较,这时你已经将pre_res
写入mysql表,所以pre_res
包含了与mysql表相同的数据,这导致差异为空。在覆盖mysql表之前(把代码的第4行和第5行颠倒),试着显示(或者收集或者写)你的不同,你会看到不同。
后续行动:
这意味着用spark覆盖你的输入是一个坏主意,原因很简单,spark是懒惰的(永远记住这一点),并且在你写东西之前不会读取任何东西,这时spark会删除文件,用你的数据替换它,然后开始阅读...你刚刚删除的文件。这一切背后的真实的原因是spark要处理的数据集比任何内存都大,因此它被设计成以小批量的方式读取和处理数据(执行器任务)并渐进地写入结果,这与重写输入不兼容。
你需要做的是把你的数据写在一个临时文件中(比如hdfs parquet会非常有效).注意有一个类似的线程here .你尝试做的事情将被编码如下: