使用pyspark更新mysql表

t5fffqht  于 2021-06-20  发布在  Mysql
关注(0)|答案(0)|浏览(336)

我知道使用spark更新mysql表是不可能的,但是我尝试了一些方法来避免它,但是它不起作用。
假设我有一张table last_modification 其中,我将用户名保存为id,并显示具有不同服务的系统的上次修改日期。每次处理一些数据时,我都必须更新来自该用户的数据被修改的日期,如果新用户进入系统,我必须将其插入表中。
过程是:
从sql表中读取数据:

df_last_mod = sqlContext.read.jdbc(url=url, table="last_modification", properties=properties)

从该Dataframe中提取将被处理的用户( last_mod_actual )剩下的放在rdd里( last_mod_aux ):

last_mod_actual = (df_last_mod
               .rdd
               .filter(lambda x: x[0] == service)
               )

从已处理用户的rdd更新修改日期(现在命名为 last_mod_rdd ),并将其加入未修改用户的rdd:

union_rdd = last_mod_rdd.union(last_mod_aux)

这部分是额外的,以避免丢失数据,但不知道它是否可以忽略。在这里,我创建一个临时表并缓存它:

header = (sqlContext
           .createDataFrame(union_rdd,header_schema)
           .createOrReplaceTempView("union_header")
           )
sqlContext.cacheTable("union_header")

最后,我使用jdbc编写表:

dd = sqlContext.table("union_header")`

dd.write.format('jdbc').options(
       url= url,
       driver="com.mysql.jdbc.Driver",
       dbtable="last_modification",
       user=user,
       password=password).mode('overwrite').save()

这段代码似乎有时可以工作,但其他代码只保存修改过的用户并删除未修改过的用户。插入时 dd.show() 在写入sql表之前,这个程序似乎工作得更好,但不知道真正的原因,它的工作有点随机。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题