我遇到了以下情况,即一个目录下有多个文件,如下所示:
AT_BookingRequests_16102021_05082022.xlsx
AT_Bookings_16102021_05082022.xlsx
SWE_BookingRequests_08042019_05082022.xlsx
SWE_Bookings_06082020_05082021.xlsx
SWE_Bookings_06082021_05082022.xlsx
SWE_Bookings_08042019_05082020.xlsx
现在,我需要将它们加载到两个不同的表中(比如booking_request和bookings)。文件夹中的数据每天都会加载,我只需要读取每天加载的新记录。
到目前为止,我一直在考虑将文件名存储在特定的表中,
# List all files under the base folder
fileList =[]
for x in dbutils.fs.ls(base_source_path):
fileList.append(x)
booking_requests_files = []
bookings_files = []
for i in fileList:
file = i[0].split('/')[-1]
if 'BookingRequests' in file:
booking_requests_files.append(file)
else:
bookings_files.append(file)
loaded_booking_req_files = spark.sql(f"select distinct filename from {booking_requests_table}").rdd.flatMap(lambda x: x).collect()
loaded_bookings_files = spark.sql(f"select distinct filename from {bookings_table}").rdd.flatMap(lambda x: x).collect()
for file in booking_requests_files:
filepath = base_source_path + '/' + file
print(filepath)
if file not in loaded_booking_req_files:
df_req_read = spark.read.format("com.crealytics.spark.excel")\
.option("header", "true")\
.load(filepath)
for file in bookings_files :
filepath = base_source_path + '/' + file
print(filepath)
if file not in loaded_bookings_files :
df_req_read = spark.read.format("com.crealytics.spark.excel")\
.option("header", "true")\
.load(filepath)
我试图实现的是避免迭代通过文件,可以看到。我已经尝试过传递所有的文件作为列表,但它失败了。
此外,实施这一点的最佳方法是什么?
1条答案
按热度按时间ktecyv1j1#
在我看来,流处理就像是一种流处理方法。有多种方法可以做你需要的事情。我在批处理和流处理中都有类似的事情。我们的案例:
1.批处理:我们使用了一个标记系统。每次处理一个文件时,我们都会用原始文件名创建一个空文件。所以,下次处理时,我们会检查该文件是否已经处理过。
1.结构化流:使用检查点。这里有很多东西可以分享,但是如果你不知道结构化流,你可以阅读文档:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
希望我能帮到你。