我知道使用spark更新mysql表是不可能的,但是我尝试了一些方法来避免它,但是它不起作用。
假设我有一张table last_modification
其中,我将用户名保存为id,并显示具有不同服务的系统的上次修改日期。每次处理一些数据时,我都必须更新来自该用户的数据被修改的日期,如果新用户进入系统,我必须将其插入表中。
过程是:
从sql表中读取数据:
df_last_mod = sqlContext.read.jdbc(url=url, table="last_modification", properties=properties)
从该Dataframe中提取将被处理的用户( last_mod_actual
)剩下的放在rdd里( last_mod_aux
):
last_mod_actual = (df_last_mod
.rdd
.filter(lambda x: x[0] == service)
)
从已处理用户的rdd更新修改日期(现在命名为 last_mod_rdd
),并将其加入未修改用户的rdd:
union_rdd = last_mod_rdd.union(last_mod_aux)
这部分是额外的,以避免丢失数据,但不知道它是否可以忽略。在这里,我创建一个临时表并缓存它:
header = (sqlContext
.createDataFrame(union_rdd,header_schema)
.createOrReplaceTempView("union_header")
)
sqlContext.cacheTable("union_header")
最后,我使用jdbc编写表:
dd = sqlContext.table("union_header")`
dd.write.format('jdbc').options(
url= url,
driver="com.mysql.jdbc.Driver",
dbtable="last_modification",
user=user,
password=password).mode('overwrite').save()
这段代码似乎有时可以工作,但其他代码只保存修改过的用户并删除未修改过的用户。插入时 dd.show()
在写入sql表之前,这个程序似乎工作得更好,但不知道真正的原因,它的工作有点随机。
暂无答案!
目前还没有任何答案,快来回答吧!