我在将CSV加载到数据框时遇到了一个问题,添加一些行并覆盖源文件中的CSV
为了使它简单,我试图加载CSV,然后覆盖它,这也不会工作,之后,我想使它更简单加载CSV并将其写入一个空位置。也不起作用。
请帮帮我
我写了一个测试脚本:
# Importing the different packages.
from datetime import datetime, timedelta
from delta import *
# Importing Spark Packages.
from pyspark.sql.functions import col, lit, hash, array, expr, when, regexp_replace, sha2, concat_ws, concat
from pyspark.sql.types import BooleanType, IntegerType, TimestampType
df_target = spark.read.options(header='True') \
.csv ('abfss://sdas@da.dfs.core.windows.net/metadata/Test')
output_path = "abfss://sdas@da.dfs.core.windows.net/metadata/Test/"
df_target = df_target.repartition(1)
df_target.write.option("header", True).mode("overwrite").csv(output_path)
它给出以下错误:
> ---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In [27], line 4
1 output_path = "abfss://.dfs.core.windows.net/metadata/Test/"
3 df_target = df_target.repartition(1)
----> 4 df_target.write.option("header", True).mode("overwrite").csv(output_path)
5 #df_target.write.format("csv").mode("overwrite").save(output_path/test.csv)
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py:1240, in DataFrameWriter.csv(self, path, mode, compression, sep, quote, escape, header, nullValue, escapeQuotes, quoteAll, dateFormat, timestampFormat, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, charToEscapeQuoteEscaping, encoding, emptyValue, lineSep)
1221 self.mode(mode)
1222 self._set_opts(
1223 compression=compression,
1224 sep=sep,
(...)
1238 lineSep=lineSep,
1239 )
-> 1240 self._jwrite.csv(path)
File ~/cluster-env/env/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
188 def deco(*a: Any, **kw: Any) -> Any:
189 try:
--> 190 return f(*a, **kw)
191 except Py4JJavaError as e:
192 converted = convert_exception(e.java_exception)
File ~/cluster-env/env/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o3902.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:651)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:283)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:187)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:183)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:97)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:108)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:104)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:136)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:901)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:382)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:892)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 22) (vm-d5418561 executor 1): java.io.FileNotFoundException:
Operation failed: "The specified path does not exist.", 404, HEAD, https://.dfs.core.windows.net/zmsynapsedlhda/metadata/tmas/Test/test.csv?upn=false&action=getStatus&timeout=90
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:248)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:308)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:149)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2682)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2618)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2617)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2617)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1190)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1190)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1190)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2870)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2812)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2801)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException:
Operation failed: "The specified path does not exist.", 404, HEAD, https://.dfs.core.windows.net/zmsynapsedlhda/metadata/tmas/Test/test.csv?upn=false&action=getStatus&timeout=90
It is possible the underlying files have been updated. You can explicitly invalidate
the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by
recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:661)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:248)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:308)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:149)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
2条答案
按热度按时间i5desfxk1#
基础文件可能已更新。您可以通过在SQL中运行'REFRESH TABLE tableName'命令或重新创建所涉及的Dataset/DataFrame来显式地使Spark中该高速缓存无效。
根据错误消息,当您更改表或数据框的几行中的数据,然后快速查询表或数据框时,这是预期的行为,将面临此错误。
要解决此问题,您可以尝试以下解决方案:
我尝试了类似的代码,它对我来说工作正常。
kyxcudwk2#
你运行的代码和你发布的代码是不同的。具体来说,第1行:
output_path = "abfss://.dfs.core.windows.net/metadata/Test/"
posted
from error
将
"abfss://.dfs.core..."
更改为"abfss://output_path = "abfss://sdas@da.dfs.core..."