我正在尝试通过spark将多个文件从datalake读入databricks。目前使用for循环,它是非常慢,修改代码,而不是使用过滤器,但我有一个错误。这个问题还有别的解决办法吗?
文件位置
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=1/*.csv
abfss://xxxxxx/abc/year=2021/month=1/dayofmonth=1/hour=2/*.csv
......
csv文件。每小时生成一次
工作代码
from pyspark.sql.utils import AnalysisException
df_list = []
for day in range(1,int(getArgument("NUMBER_OF_DAYS"))+1,1):
for hour in range(0,24,1):
file_location"**dummy**/year="+getArgument("YEAR")+"/month="+getArgument("MONTH")+"/dayofmonth="+str(day)+"/hour="+str(hour)+"/*.csv"
print(file_location)
try:
batch_df= spark.read.format("csv").option("header", "true").load(file_location)
pandas_df = batch_df.toPandas()
print(pandas_df.shape)
df_list.append(pandas_df)
except AnalysisException as e:
print(e)
final_batch_pandas_df = pd.concat(df_list)
print(final_batch_pandas_df.shape)
修改(hadoop文件系统api)
data_path = sc._gateway.jvm.org.apache.hadoop.fs.Path(
"**dummy**/year=" + getArgument("YEAR") + "/month=" + getArgument("MONTH")
)
files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
filtered_files = []
# filter files that have dayofmonth in [1, NUMBER_OF_DAYS]
while files.hasNext():
file_path = files.next().getPath().toString()
dayofmonth = int(re.search(r".*/dayofmonth=(\d+)/.*", file_path).group(1))
if dayofmonth <= getArgument("NUMBER_OF_DAYS"):
filtered_files.append(file_path)
batch_df = spark.read.format("csv").option("header", "true").load(*filtered_files)
final_pandas_df = batch_df.toPandas()
错误
Failure to initialize configuration
----> 2 files = data_path.getFileSystem(sc._jsc.hadoopConfiguration()).listFiles(data_path, True)
3
4 filtered_files = []
5
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
125 def deco(*a,**kw):
126 try:
--> 127 return f(*a,**kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
1条答案
按热度按时间7hiiyaii1#
如果您将路径定义为:
如果days和hours是逗号分隔的值,则应该加载与该regex匹配的每个文件。
因此,可以在foor循环内进行路径计算,但在循环外进行spark读取和处理。作为一个通用解决方案:您可以使用正则表达式作为spark文件读取器的输入。