如何将spark structured流写入rdms?前排是什么?

zu0ti5jz  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(277)

它是在文档中编写的,可以用它将行写入数据库 foreach 反对 writeStream ```
def process_row(row):
# Write row to storage
pass

query = streamingDF.writeStream.foreach(process_row).start()

没有写的,是里面的 `row` ? 没有这些知识我怎么处理?
现在我写了

df = df... # something

def write_row(row):

mydb = mysql.connector.connect(
    host=config["jdbc"]["host"],
    port=config["jdbc"]["port"],
    user=config["jdbc"]["user"],
    password=config["jdbc"]["password"],
    database=config["jdbc"]["database"],
)

dbtable = ...

mycursor = mydb.cursor()

sql = f"INSERT INTO {dbtable} (user_id, carName) VALUES (%s, %s)"
val = (352, "Sample")
mycursor.execute(sql, val)

mydb.commit()

query = df
.writeStream
.outputMode('update')
.foreach(write_row)
.start()

query.awaitTermination()

不幸的是,它什么都没写。功能 `write_row` 它自己工作并写入样本行。在里面的时候 `foreach` ,它没有。
如果这个函数出错,我怎么知道呢?
我根据@alexott改写了。
如果我运行这个

def foreach_batch_function(df1, epoch_id):
df1.write.format("jdbc")
.option("url", "jdbc:mariadb://IPADDRESS/database")
.option("dbtable", "pysparktest")
.option("user", config["jdbc"]["user"])
.option("password", config["jdbc"]["password"])
.option("driver", "org.mariadb.jdbc.Driver")
.save()

或选择

.option("createTableColumnTypes", "cnt int, minBestLapTime double, trackName varchar(64), trackVersion varchar(64), carClass varchar(64), carName varchar(64), carVersion varchar(64)") \

虽然表不存在,但由于sql语法错误而失败。如果表已经存在,它就不能说“表存在”。
是否可以填充现有表?
wnavrhmk

wnavrhmk1#

最好改用foreachbatch—在这种情况下,您可以只使用标准的spark jdbc操作,如下所示:

def foreach_batch_function(df, epoch_id):
  df.write.format("jdbc") \
    .option("url", "your_driver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

df.writeStream.foreachBatch(foreach_batch_function).start()

相关问题