mysql表定义在读取pyspark写入的表时出错

uinbv5nw  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(360)

我目前正在与pyspark一起开发一个数据管道。作为管道的一部分,我使用以下函数将sparkDataframe写入mysql:

def jdbc_insert_overwrite_table(df, mysql_user, mysql_pass, mysql_host, mysql_port, mysql_db, num_executors, table_name,
                                logger):
    mysql_url = "jdbc:mysql://{}:{}/{}?characterEncoding=utf8".format(mysql_host, mysql_port, mysql_db)
    logger.warn("JDBC Writing to table " + table_name)
    df.write.format('jdbc')\
        .options(
        url=mysql_url,
        driver='com.mysql.cj.jdbc.Driver',
        dbtable=table_name,
        user=mysql_user,
        password=mysql_pass,
        truncate=True,
        numpartitions=num_executors,
        batchsize=100000
    ).mode('Overwrite').save()

这没有问题。但是,稍后在管道中(在同一pyspark app/spark会话中),此表是另一个转换的依赖项,我尝试使用以下函数读取此表:

def read_mysql_table_in_session_df(spark, mysql_conn, query_str, query_schema):
    cursor = mysql_conn.cursor()
    cursor.execute(query_str)
    records = cursor.fetchall()
    df = spark.createDataFrame(records, schema=query_schema)
    return df

我得到一个mysql错误: Error 1412: Table definition has changed, please retry transaction. 我已经能够通过关闭并ping(reconnect=true)到数据库来解决这个问题,但是我不喜欢这个解决方案,因为它感觉像创可贴。
你知道我为什么会犯这个错误吗?我已经确认了写入表不会更改表定义(至少在模式方面)。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题