我有一个工作的解决方案,当使用一个小的数据集,但失败的非常大的一个(>100mio记录)由于内存不足。我是新的Spark,所以不知道如何调试或在哪里寻找。我在不同的港口聚集了30分钟的时间
# TCP ports
ports = ['22', '25', '53', '80', '88', '123', '514', '443', '8080', '8443']
def add_port_column(r_df, port, window):
'''
Input:
r_df: dataframe
port: port
window: pyspark window to be used
Output: pyspark dataframe
'''
return r_df.withColumn('pkts_src_port_{}_30m'.format(port), F.when(F.col('source_port') == port, F.sum('source_packets').over(window)).otherwise(0))\
.withColumn('pkts_dst_port_{}_30m'.format(port), F.when(F.col('destination_port') == port, F.sum('destination_packets').over(window)).otherwise(0))
w = (Window()
.partitionBy("source_ip")
.orderBy(F.col("timestamp"))
.rangeBetween(-window_time, 0))
flows_filtered_v3_df = (reduce(partial(add_port_column,window=w_s),
ports,
flows_filtered_v3_df
))
不确定这是否是确切的错误,但阅读了很多OOM
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
[17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
[17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
[17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
[17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
[17:39:40] [INFO] [dku.utils] - 20/09/04 17:39:38 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
[17:39:40] [INFO] [dku.utils] - 20/09/04 17:39:38 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[read-ahead,5,main]
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
[17:39:40] [INFO] [dku.utils] - java.lang.OutOfMemoryError: Java heap space
[17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:422)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
[17:39:40] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:557)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.ParquetWriter$.saveParquetDataset(ParquetWriter.scala:63)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.saveHDFSableWithFastPathIfPossible(StdDataikuSparkContext.scala:984)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.internalSave(StdDataikuSparkContext.scala:897)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dss.spark.DataikuSparkContext$class.save(DataikuSparkContext.scala:83)
[17:39:40] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.save(StdDataikuSparkContext.scala:60)
[17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[17:39:40] [INFO] [dku.utils] - at com.dataiku.dip.spark.StdDataikuSparkContext.savePyDataFrame(StdDataikuSparkContext.scala:673)
[17:39:40] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[17:39:40] [INFO] [dku.utils] - at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[17:39:40] [INFO] [dku.utils] - at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[17:39:40] [INFO] [dku.utils] - at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[17:39:40] [INFO] [dku.utils] - at java.lang.reflect.Method.invoke(Method.java:498)
[17:39:40] [INFO] [dku.utils] - at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
[17:39:40] [INFO] [dku.utils] - at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
[17:39:40] [INFO] [dku.utils] - at py4j.Gateway.invoke(Gateway.java:282)
[17:39:40] [INFO] [dku.utils] - at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
[17:39:40] [INFO] [dku.utils] - at py4j.commands.CallCommand.execute(CallCommand.java:79)
[17:39:40] [INFO] [dku.utils] - at py4j.GatewayConnection.run(GatewayConnection.java:238)
[17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
[17:39:40] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
spark提交
org.apache.spark.deploy.SparkSubmit --conf spark.executor.memory=8g --conf spark.local.dir=/pkgs/cdh/tmp/spark --conf spark.yarn.security.tokens.hive.enabled=false --conf spark.yarn.security.credentials.hadoopfs.enabled=false --conf spark.security.credentials.hive.enabled=false --conf spark.app.name=DSS (Py): compute_flows_window_pyspark_2020-04-14 --conf spark.io.compression.codec=snappy --conf spark.sql.shuffle.partitions=80 --conf spark.shuffle.spill.compress=false --conf spark.shuffle.compress=false --conf spark.dku.limitedLogs={"filePartitioner.noMatch":100,"s3.ignoredPath":100,"s3.ignoredFile":100} --conf spark.security.credentials.hadoopfs.enabled=false --conf spark.jars.repositories=https://nexus.bisinfo.org:8443/repository/maven-central --conf spark.yarn.executor.memoryOverhead=600
编辑:增加内存后,它正在抱怨
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.scheduler.Task.run(Task.scala:121)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
[09:20:10] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[09:20:10] [INFO] [dku.utils] - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[09:20:10] [INFO] [dku.utils] - at java.lang.Thread.run(Thread.java:748)
[09:20:10] [INFO] [dku.utils] - Caused by: java.io.IOException: PARSING_ERROR(2)
[09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
[09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
[09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:634)
[09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
[09:20:10] [INFO] [dku.utils] - at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
[09:20:10] [INFO] [dku.utils] - at org.apache.spark.io.ReadAheadInputStream$1.run(ReadAheadInputStream.java:168)
[09:20:10] [INFO] [dku.utils] - ... 3 more
暂无答案!
目前还没有任何答案,快来回答吧!