我希望使用侧输入,以便将一些配置传递给我的管道,但是驱动程序在 PCollectionView
在我的本地spark群集上运行时创建(spark版本2.4.7,1个主服务器,1个工作服务器,在本地主机上运行)。不过,这在directrunner上非常有效。
我试图将代码剥离到其基本内容(见下文)。在spark群集上运行时,问题仍然存在。directrunner仍然可以正常工作。
spark集群确实接受了作业,我成功地运行了一个“hello world”管道,该管道毫无问题地完成了。
这里发生了什么?
日志粘贴在下面。
Environment:
------------
Beam: 2.25
SparkRunner: 2.25
Java version: 11.0.9-ea
Maven Compiler Source: 1.8
Maven Compiler Target: 1.8
Spark version: 2.4.7
// Pipeline
private static PipelineResult runPipeline(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
PCollectionView<String> schema = p
.apply("Dummy tabular schema builder", Create.of("This is a string"))
.apply("Collect", View.asSingleton());
p
.apply("Hello world", Create.of("Hello world"))
.apply("Side input test", ParDo.of(DummyFn.builder().setSchemaView(schema).build()).withSideInput("schema", schema))
.apply(ConsoleIO.create());
return p.run();
}
/ Simple FN that prints the side input
@AutoValue
public abstract class DummyFn extends DoFn<String, String> {
private final static Logger LOG = LoggerFactory.getLogger(DummyFn.class);
public static Builder builder() {
return new org.odp.beam.io.fn.AutoValue_DummyFn.Builder();
}
public abstract PCollectionView<String> getSchemaView();
@ProcessElement
public void processElement(@Element String element,
OutputReceiver<String> out,
ProcessContext context) {
String schema = context.sideInput(getSchemaView());
LOG.warn(schema.toString());
out.output(element.toUpperCase());
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSchemaView(PCollectionView<String> value);
public abstract DummyFn build();
}
}
// Simple PTransform that prints the output of the toString-method
public class ConsoleIO<T> extends PTransform<PCollection<T>, PDone> {
public static <T> ConsoleIO<T> create() {
return new ConsoleIO();
}
@Override
public PDone expand(PCollection<T> input) {
input
.apply("Print elements", ParDo.of(new PrintElementFn<T>()));
return PDone.in(input.getPipeline());
}
static class PrintElementFn<T> extends DoFn<T, Void> {
@DoFn.ProcessElement
public void processElement(@Element T element, ProcessContext context) throws Exception {
System.out.println(element.toString());
}
}
}
$ spark-submit \
--class org.odp.beam.extractors.CsvToCdfRawExtractor \
--verbose \
--driver-memory 4G \
--executor-memory 4G \
--total-executor-cores 4 \
--deploy-mode client \
--supervise \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.network.timeout=420000 \
--master spark://192.168.10.172:7077 \
target/beam-poc-0.1-shaded.jar \
--runner=SparkRunner
Using properties file: null
20/11/10 15:46:44 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.10.172 instead (on interface enp7s0)
20/11/10 15:46:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/jars/spark-unsafe_2.11-2.4.7.jar) to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Parsed arguments:
master spark://192.168.10.172:7077
deployMode client
executorMemory 4G
executorCores null
totalExecutorCores 4
propertiesFile null
driverMemory 4G
driverCores null
driverExtraClassPath null
driverExtraLibraryPath null
driverExtraJavaOptions null
supervise true
queue null
numExecutors null
files null
pyFiles null
archives null
mainClass org.odp.beam.extractors.CsvToCdfRawExtractor
primaryResource file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar
name org.odp.beam.extractors.CsvToCdfRawExtractor
childArgs [--runner=SparkRunner]
jars null
packages null
packagesExclusions null
repositories null
verbose true
Spark properties used, including those specified through
--conf and those from the properties file null:
(spark.network.timeout,420000)
(spark.driver.memory,4G)
(spark.dynamicAllocation.enabled,false)
20/11/10 15:46:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Main class:
org.odp.beam.extractors.CsvToCdfRawExtractor
Arguments:
--runner=SparkRunner
Spark config:
(spark.jars,file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar)
(spark.app.name,org.odp.beam.extractors.CsvToCdfRawExtractor)
(spark.cores.max,4)
(spark.network.timeout,420000)
(spark.driver.memory,4G)
(spark.submit.deployMode,client)
(spark.master,spark://192.168.10.172:7077)
(spark.executor.memory,4G)
(spark.dynamicAllocation.enabled,false)
Classpath elements:
file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar
log4j:WARN No appenders could be found for logger (org.apache.beam.sdk.options.PipelineOptionsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/10 15:46:46 INFO SparkContext: Running Spark version 2.4.7
20/11/10 15:46:47 INFO SparkContext: Submitted application: CsvToCdfRawExtractor
20/11/10 15:46:47 INFO SecurityManager: Changing view acls to: tom
20/11/10 15:46:47 INFO SecurityManager: Changing modify acls to: tom
20/11/10 15:46:47 INFO SecurityManager: Changing view acls groups to:
20/11/10 15:46:47 INFO SecurityManager: Changing modify acls groups to:
20/11/10 15:46:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tom); groups with view permissions: Set(); users with modify permissions: Set(tom); groups with modify permissions: Set()
20/11/10 15:46:47 INFO Utils: Successfully started service 'sparkDriver' on port 35103.
20/11/10 15:46:47 INFO SparkEnv: Registering MapOutputTracker
20/11/10 15:46:47 INFO SparkEnv: Registering BlockManagerMaster
20/11/10 15:46:47 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/11/10 15:46:47 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/11/10 15:46:47 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-58419068-d0ad-45c9-b90b-92b659dee1c3
20/11/10 15:46:47 INFO MemoryStore: MemoryStore started with capacity 2.2 GB
20/11/10 15:46:47 INFO SparkEnv: Registering OutputCommitCoordinator
20/11/10 15:46:47 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/11/10 15:46:47 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://fedora:4040
20/11/10 15:46:47 INFO SparkContext: Added JAR file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar at spark://fedora:35103/jars/beam-poc-0.1-shaded.jar with timestamp 1605019607514
20/11/10 15:46:47 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.10.172:7077...
20/11/10 15:46:47 INFO TransportClientFactory: Successfully created connection to /192.168.10.172:7077 after 25 ms (0 ms spent in bootstraps)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20201110154647-0020
20/11/10 15:46:47 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20201110154647-0020/0 on worker-20201109144752-192.168.10.172-45535 (192.168.10.172:45535) with 4 core(s)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: Granted executor ID app-20201110154647-0020/0 on hostPort 192.168.10.172:45535 with 4 core(s), 4.0 GB RAM
20/11/10 15:46:47 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33169.
20/11/10 15:46:47 INFO NettyBlockTransferService: Server created on fedora:33169
20/11/10 15:46:47 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/11/10 15:46:47 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20201110154647-0020/0 is now RUNNING
20/11/10 15:46:47 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, fedora, 33169, None)
20/11/10 15:46:47 INFO BlockManagerMasterEndpoint: Registering block manager fedora:33169 with 2.2 GB RAM, BlockManagerId(driver, fedora, 33169, None)
20/11/10 15:46:47 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, fedora, 33169, None)
20/11/10 15:46:47 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, fedora, 33169, None)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
20/11/10 15:46:48 INFO SparkRunner$Evaluator: Entering directly-translatable composite transform: 'Collect/Combine.GloballyAsSingletonView/Combine.globally(Singleton)'
20/11/10 15:46:48 INFO MetricsAccumulator: Instantiated metrics accumulator: MetricQueryResults()
20/11/10 15:46:48 INFO AggregatorsAccumulator: Instantiated aggregators accumulator:
20/11/10 15:46:48 INFO SparkRunner$Evaluator: Evaluating Read(CreateSource)
20/11/10 15:46:48 INFO SparkRunner$Evaluator: Entering directly-translatable composite transform: 'Collect/Combine.GloballyAsSingletonView/Combine.globally(Singleton)'
20/11/10 15:46:48 INFO SparkRunner$Evaluator: Evaluating Combine.globally(Singleton)
20/11/10 15:46:48 INFO SparkContext: Starting job: aggregate at GroupCombineFunctions.java:107
20/11/10 15:46:48 INFO DAGScheduler: Got job 0 (aggregate at GroupCombineFunctions.java:107) with 1 output partitions
20/11/10 15:46:48 INFO DAGScheduler: Final stage: ResultStage 0 (aggregate at GroupCombineFunctions.java:107)
20/11/10 15:46:48 INFO DAGScheduler: Parents of final stage: List()
20/11/10 15:46:48 INFO DAGScheduler: Missing parents: List()
20/11/10 15:46:48 INFO DAGScheduler: Submitting ResultStage 0 (Dummy tabular schema builder/Read(CreateSource).out Bounded[0] at RDD at SourceRDD.java:80), which has no missing parents
20/11/10 15:46:48 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 16.2 KB, free 2.2 GB)
20/11/10 15:46:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 6.8 KB, free 2.2 GB)
20/11/10 15:46:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on fedora:33169 (size: 6.8 KB, free: 2.2 GB)
20/11/10 15:46:48 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1184
20/11/10 15:46:48 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Dummy tabular schema builder/Read(CreateSource).out Bounded[0] at RDD at SourceRDD.java:80) (first 15 tasks are for partitions Vector(0))
20/11/10 15:46:48 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
20/11/10 15:46:49 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.10.172:48382) with ID 0
20/11/10 15:46:49 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.10.172, executor 0, partition 0, PROCESS_LOCAL, 8546 bytes)
20/11/10 15:46:49 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.10.172:43781 with 2.2 GB RAM, BlockManagerId(0, 192.168.10.172, 43781, None)
20/11/10 15:46:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.10.172:43781 (size: 6.8 KB, free: 2.2 GB)
20/11/10 15:46:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2056 ms on 192.168.10.172 (executor 0) (1/1)
20/11/10 15:46:51 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
20/11/10 15:46:51 INFO DAGScheduler: ResultStage 0 (aggregate at GroupCombineFunctions.java:107) finished in 3.091 s
20/11/10 15:46:51 INFO DAGScheduler: Job 0 finished: aggregate at GroupCombineFunctions.java:107, took 3.132405 s
20/11/10 15:46:51 INFO SparkRunner$Evaluator: Evaluating org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@14924f41
20/11/10 15:46:51 INFO SparkRunner$Evaluator: Evaluating View.CreatePCollectionView
20/11/10 15:46:51 INFO SparkContext: Invoking stop() from shutdown hook
20/11/10 15:46:51 INFO SparkUI: Stopped Spark web UI at http://fedora:4040
20/11/10 15:46:51 INFO StandaloneSchedulerBackend: Shutting down all executors
20/11/10 15:46:51 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
20/11/10 15:46:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/11/10 15:46:51 INFO MemoryStore: MemoryStore cleared
20/11/10 15:46:51 INFO BlockManager: BlockManager stopped
20/11/10 15:46:51 INFO BlockManagerMaster: BlockManagerMaster stopped
20/11/10 15:46:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/11/10 15:46:51 INFO SparkContext: Successfully stopped SparkContext
20/11/10 15:46:51 INFO ShutdownHookManager: Shutdown hook called
20/11/10 15:46:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-665a903f-22db-497e-989f-a5ca3e0635e2
20/11/10 15:46:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-d4b5a04f-f6a3-48ff-b229-4eb966151d86
Spark Executor Command: "/usr/lib/jvm/java-11-openjdk-11.0.9.6-0.0.ea.fc33.x86_64/bin/java" "-cp" "/home/tom/app/spark/spark/conf/:/home/tom/app/spark/spark/jars/*" "-Xmx4096M" "-Dspark.driver.port=35103" "-Dspark.network.timeout=420000" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@fedora:35103" "--executor-id" "0" "--hostname" "192.168.10.172" "--cores" "4" "--app-id" "app-20201110154647-0020" "--worker-url" "spark://Worker@192.168.10.172:45535"
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/10 15:46:48 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 360353@localhost.localdomain
20/11/10 15:46:48 INFO SignalUtils: Registered signal handler for TERM
20/11/10 15:46:48 INFO SignalUtils: Registered signal handler for HUP
20/11/10 15:46:48 INFO SignalUtils: Registered signal handler for INT
20/11/10 15:46:48 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.10.172 instead (on interface enp7s0)
20/11/10 15:46:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/jars/spark-unsafe_2.11-2.4.7.jar) to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/11/10 15:46:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/11/10 15:46:48 INFO SecurityManager: Changing view acls to: tom
20/11/10 15:46:48 INFO SecurityManager: Changing modify acls to: tom
20/11/10 15:46:48 INFO SecurityManager: Changing view acls groups to:
20/11/10 15:46:48 INFO SecurityManager: Changing modify acls groups to:
20/11/10 15:46:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tom); groups with view permissions: Set(); users with modify permissions: Set(tom); groups with modify permissions: Set()
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created connection to fedora/192.168.10.172:35103 after 54 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO SecurityManager: Changing view acls to: tom
20/11/10 15:46:49 INFO SecurityManager: Changing modify acls to: tom
20/11/10 15:46:49 INFO SecurityManager: Changing view acls groups to:
20/11/10 15:46:49 INFO SecurityManager: Changing modify acls groups to:
20/11/10 15:46:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tom); groups with view permissions: Set(); users with modify permissions: Set(tom); groups with modify permissions: Set()
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created connection to fedora/192.168.10.172:35103 after 4 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO DiskBlockManager: Created local directory at /tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/blockmgr-019262b3-4d3e-4158-b984-ff85c0846191
20/11/10 15:46:49 INFO MemoryStore: MemoryStore started with capacity 2.2 GB
20/11/10 15:46:49 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@fedora:35103
20/11/10 15:46:49 INFO WorkerWatcher: Connecting to worker spark://Worker@192.168.10.172:45535
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created connection to /192.168.10.172:45535 after 2 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO WorkerWatcher: Successfully connected to spark://Worker@192.168.10.172:45535
20/11/10 15:46:49 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
20/11/10 15:46:49 INFO Executor: Starting executor ID 0 on host 192.168.10.172
20/11/10 15:46:49 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43781.
20/11/10 15:46:49 INFO NettyBlockTransferService: Server created on 192.168.10.172:43781
20/11/10 15:46:49 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/11/10 15:46:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 192.168.10.172, 43781, None)
20/11/10 15:46:49 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 192.168.10.172, 43781, None)
20/11/10 15:46:49 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 192.168.10.172, 43781, None)
20/11/10 15:46:49 INFO CoarseGrainedExecutorBackend: Got assigned task 0
20/11/10 15:46:49 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/11/10 15:46:49 INFO Executor: Fetching spark://fedora:35103/jars/beam-poc-0.1-shaded.jar with timestamp 1605019607514
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created connection to fedora/192.168.10.172:35103 after 2 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO Utils: Fetching spark://fedora:35103/jars/beam-poc-0.1-shaded.jar to /tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/spark-62556d02-a044-4c2c-8f97-c7f25ef3e337/fetchFileTemp6325880319900581024.tmp
20/11/10 15:46:49 INFO Utils: Copying /tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/spark-62556d02-a044-4c2c-8f97-c7f25ef3e337/2058038551605019607514_cache to /home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/work/app-20201110154647-0020/0/./beam-poc-0.1-shaded.jar
20/11/10 15:46:50 INFO Executor: Adding file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/work/app-20201110154647-0020/0/./beam-poc-0.1-shaded.jar to class loader
20/11/10 15:46:50 INFO TorrentBroadcast: Started reading broadcast variable 0
20/11/10 15:46:50 INFO TransportClientFactory: Successfully created connection to fedora/192.168.10.172:33169 after 2 ms (0 ms spent in bootstraps)
20/11/10 15:46:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 6.8 KB, free 2.2 GB)
20/11/10 15:46:50 INFO TorrentBroadcast: Reading broadcast variable 0 took 112 ms
20/11/10 15:46:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 16.2 KB, free 2.2 GB)
20/11/10 15:46:51 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 6312 bytes result sent to driver
20/11/10 15:46:51 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
20/11/10 15:46:51 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
tdown
暂无答案!
目前还没有任何答案,快来回答吧!