它是在文档中编写的,可以用它将行写入数据库 foreach
反对 writeStream
```
def process_row(row):
# Write row to storage
pass
query = streamingDF.writeStream.foreach(process_row).start()
没有写的,是里面的 `row` ? 没有这些知识我怎么处理?
现在我写了
df = df... # something
def write_row(row):
mydb = mysql.connector.connect(
host=config["jdbc"]["host"],
port=config["jdbc"]["port"],
user=config["jdbc"]["user"],
password=config["jdbc"]["password"],
database=config["jdbc"]["database"],
)
dbtable = ...
mycursor = mydb.cursor()
sql = f"INSERT INTO {dbtable} (user_id, carName) VALUES (%s, %s)"
val = (352, "Sample")
mycursor.execute(sql, val)
mydb.commit()
query = df
.writeStream
.outputMode('update')
.foreach(write_row)
.start()
query.awaitTermination()
不幸的是,它什么都没写。功能 `write_row` 它自己工作并写入样本行。在里面的时候 `foreach` ,它没有。
如果这个函数出错,我怎么知道呢?
我根据@alexott改写了。
如果我运行这个
def foreach_batch_function(df1, epoch_id):
df1.write.format("jdbc")
.option("url", "jdbc:mariadb://IPADDRESS/database")
.option("dbtable", "pysparktest")
.option("user", config["jdbc"]["user"])
.option("password", config["jdbc"]["password"])
.option("driver", "org.mariadb.jdbc.Driver")
.save()
或选择
.option("createTableColumnTypes", "cnt int, minBestLapTime double, trackName varchar(64), trackVersion varchar(64), carClass varchar(64), carName varchar(64), carVersion varchar(64)") \
虽然表不存在,但由于sql语法错误而失败。如果表已经存在,它就不能说“表存在”。
是否可以填充现有表?
1条答案
按热度按时间wnavrhmk1#
最好改用foreachbatch—在这种情况下,您可以只使用标准的spark jdbc操作,如下所示: