**问题:**我在一个流中接收到多个表/架构数据。现在,在隔离数据之后,我为每个表打开一个并行写流。
我在forEachBatch中使用的函数是:
def writeToAurora(df, batch_id, tableName):
df = df.persist()
stagingTable = f'{str(tableName.lower())}_delta'
df.write \
.mode("overwrite") \
.format("jdbc") \
.option("truncate", "true") \
.option("driver", DB_conf['DRIVER']) \
.option("batchsize", 1000) \
.option("url", DB_conf['URL']) \
.option("dbtable", stagingTable) \
.option("user", DB_conf['USER_ID']) \
.option("password", DB_conf['PASSWORD']) \
.save()
df.unpersist()
字符串
打开多个writestreams的逻辑是
data_df = spark.readStream.format("kinesis") \
.option("streamName", stream_name) \
.option("startingPosition", initial_position) \
.load()
#Distinguishing table wise df
distinctTables = ['Table1', 'Table2', 'Table3']
tablesDF = {table: data_df.filter(f"TableName = '{table}'") for table in distinctTables}
#Processing Each Table
for table, tableDF in tablesDF.items():
df = tableDF.withColumn('csvData', F.from_csv('finalData', schema=tableSchema[table], options={'sep': '|','quote': '"'}))\
.select('csvData.*')
vars()[table+'_query'] = df.writeStream\
.trigger(processingTime='120 seconds') \
.foreachBatch(lambda fdf, batch_id: writeToAurora(fdf, batch_id, table)) \
.option("checkpointLocation", f"s3://{bucket}/temporary/checkpoint/{table}")\
.start()
for table in tablesDF.keys():
eval(table+'_query').awaitTermination()
型
**问题:**现在,当运行上述代码时,有时table 1会加载到table 2中,并且每次代码运行时的顺序都不同。在 Dataframe 和它应该被加载到的表之间不保持顺序。
需要帮助了解为什么会发生这种情况。
2条答案
按热度按时间ffscu2ro1#
这是由
foreachBatch
方法中lambda函数的late binding引起的。举个例子这将尝试将所有表写入“t2”,并且失败(实际上只写入“t2”表,但写入“t0”数据:
字符串
要解决这个问题,有几个选择。使用
partial
:型
在代码中,将
writeStream
重写为:型
m1m5dgzv2#
字符串
通过此更改,传递给writeToAurora函数的DataFramedf现在将具有一个名为“TableName”的附加列,其中包含数据所属的表的名称。writeToAurora函数然后将使用此信息将数据写入Aurora中适当的staging表。