通过列出spark中位置下的文件来避免“for循环”

emeijp43  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(354)

我正在尝试通过spark将多个文件从datalake读入databricks。目前使用for循环,它是非常慢,修改代码,而不是使用过滤器,但我有一个错误。这个问题还有别的解决办法吗?
文件位置

abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=1/*.csv
 abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=2/*.csv
......

csv文件。每小时生成一次
工作代码

from pyspark.sql.utils import AnalysisException

df_list = []
for day in range(1,int(getArgument("NUMBER_OF_DAYS"))+1,1):
  for hour in range(0,24,1):

file_location"**dummy**/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+str(day)+"/hour="+str(hour)+"/*.csv"

    print(file_location)

    try:
        batch_df= spark.read.format("csv").option("header", "true").load(file_location)

        pandas_df = batch_df.toPandas()

        print(pandas_df.shape)

        df_list.append(pandas_df)

    except AnalysisException as e:
        print(e)

final_batch_pandas_df = pd.concat(df_list)

print(final_batch_pandas_df.shape)

修改(hadoop文件系统api)

data_path = sc._gateway.jvm.org.apache.hadoop.fs.Path(
    "**dummy**/year=" + getArgument("YEAR") + "/month=" + getArgument("MONTH")
)
files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)

filtered_files = []

# filter files that have dayofmonth in [1, NUMBER_OF_DAYS]

while files.hasNext():
    file_path = files.next().getPath().toString()
    dayofmonth = int(re.search(r".*/dayofmonth=(\d+)/.*", file_path).group(1))
    if dayofmonth <= getArgument("NUMBER_OF_DAYS"):
        filtered_files.append(file_path)

batch_df = spark.read.format("csv").option("header", "true").load(*filtered_files)
final_pandas_df = batch_df.toPandas()

错误

Failure to initialize configuration

   ----> 2 files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
      3 
      4 filtered_files = []
      5 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    125     def deco(*a,**kw):
    126         try:
--> 127             return f(*a,**kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(
7hiiyaii

7hiiyaii1#

如果您将路径定义为:

file_location"xxxxx/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth={"+days+"}/hour={"+hours+"}/*.csv"

如果days和hours是逗号分隔的值,则应该加载与该regex匹配的每个文件。
因此,可以在foor循环内进行路径计算,但在循环外进行spark读取和处理。作为一个通用解决方案:您可以使用正则表达式作为spark文件读取器的输入。

相关问题