是否可以使用PySpark读取单个目录下的特定文件?

fhity93d  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(174)

我遇到了以下情况,即一个目录下有多个文件,如下所示:

AT_BookingRequests_16102021_05082022.xlsx
AT_Bookings_16102021_05082022.xlsx
SWE_BookingRequests_08042019_05082022.xlsx
SWE_Bookings_06082020_05082021.xlsx
SWE_Bookings_06082021_05082022.xlsx
SWE_Bookings_08042019_05082020.xlsx

现在,我需要将它们加载到两个不同的表中(比如booking_request和bookings)。文件夹中的数据每天都会加载,我只需要读取每天加载的新记录。
到目前为止,我一直在考虑将文件名存储在特定的表中,


# List all files under the base folder

fileList =[]
for x in dbutils.fs.ls(base_source_path):
  fileList.append(x)

booking_requests_files = []
bookings_files = []
for i in fileList: 
  file = i[0].split('/')[-1]
  if 'BookingRequests' in file:
    booking_requests_files.append(file)
  else:
    bookings_files.append(file)

loaded_booking_req_files = spark.sql(f"select distinct filename from {booking_requests_table}").rdd.flatMap(lambda x: x).collect()

loaded_bookings_files = spark.sql(f"select distinct filename from {bookings_table}").rdd.flatMap(lambda x: x).collect()

for file in booking_requests_files:
  filepath = base_source_path + '/' + file
  print(filepath)
  if file not in loaded_booking_req_files:
    df_req_read = spark.read.format("com.crealytics.spark.excel")\
    .option("header", "true")\
    .load(filepath)

for file in bookings_files :
  filepath = base_source_path + '/' + file
  print(filepath)
  if file not in loaded_bookings_files :
    df_req_read = spark.read.format("com.crealytics.spark.excel")\
    .option("header", "true")\
    .load(filepath)

我试图实现的是避免迭代通过文件,可以看到。我已经尝试过传递所有的文件作为列表,但它失败了。
此外,实施这一点的最佳方法是什么?

ktecyv1j

ktecyv1j1#

在我看来,流处理就像是一种流处理方法。有多种方法可以做你需要的事情。我在批处理和流处理中都有类似的事情。我们的案例:
1.批处理:我们使用了一个标记系统。每次处理一个文件时,我们都会用原始文件名创建一个空文件。所以,下次处理时,我们会检查该文件是否已经处理过。
1.结构化流:使用检查点。这里有很多东西可以分享,但是如果你不知道结构化流,你可以阅读文档:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
希望我能帮到你。

相关问题