我目前正在与pyspark一起开发一个数据管道。作为管道的一部分,我使用以下函数将sparkDataframe写入mysql:
def jdbc_insert_overwrite_table(df, mysql_user, mysql_pass, mysql_host, mysql_port, mysql_db, num_executors, table_name,
logger):
mysql_url = "jdbc:mysql://{}:{}/{}?characterEncoding=utf8".format(mysql_host, mysql_port, mysql_db)
logger.warn("JDBC Writing to table " + table_name)
df.write.format('jdbc')\
.options(
url=mysql_url,
driver='com.mysql.cj.jdbc.Driver',
dbtable=table_name,
user=mysql_user,
password=mysql_pass,
truncate=True,
numpartitions=num_executors,
batchsize=100000
).mode('Overwrite').save()
这没有问题。但是,稍后在管道中(在同一pyspark app/spark会话中),此表是另一个转换的依赖项,我尝试使用以下函数读取此表:
def read_mysql_table_in_session_df(spark, mysql_conn, query_str, query_schema):
cursor = mysql_conn.cursor()
cursor.execute(query_str)
records = cursor.fetchall()
df = spark.createDataFrame(records, schema=query_schema)
return df
我得到一个mysql错误: Error 1412: Table definition has changed, please retry transaction.
我已经能够通过关闭并ping(reconnect=true)到数据库来解决这个问题,但是我不喜欢这个解决方案,因为它感觉像创可贴。
你知道我为什么会犯这个错误吗?我已经确认了写入表不会更改表定义(至少在模式方面)。
暂无答案!
目前还没有任何答案,快来回答吧!