spark流kafka-job总是在rdd包含实际消息时退出

lmvvr0a8  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(404)

我正在尝试通过spark streaming 1.3.1处理来自Kafka的消息。作业正在我的电脑上本地运行。
当我开始工作,没有消息消费,这是工作正常。但当我向kafka发送消息时,作业会记录一个nullpointerexception

15/05/20 11:22:00 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:54)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:71)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:71)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 ERROR Executor: Exception in task 1.0 in stage 4.0 (TID 9)
org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 WARN TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

15/05/20 11:22:00 ERROR TaskSetManager: Task 1 in stage 4.0 failed 1 times; aborting job
15/05/20 11:22:00 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 
15/05/20 11:22:00 INFO TaskSchedulerImpl: Cancelling stage 4
15/05/20 11:22:00 INFO DAGScheduler: Stage 4 (foreachRDD at SimpleApp.java:55) failed in 0,018 s
15/05/20 11:22:00 INFO DAGScheduler: Job 4 failed: foreachRDD at SimpleApp.java:55, took 0,026982 s
15/05/20 11:22:00 ERROR JobScheduler: Error running job streaming job 1432113720000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 9, localhost): org.apache.spark.util.TaskCompletionListenerException
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:81)
at org.apache.spark.scheduler.Task.run(Task.scala:66)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

这是消费者工作的代码。自定义解码器“traceeventdecoder”只是将kafka消息中的字节数组反序列化为traceevent类型的对象。

public static void main(final String[] args) throws InterruptedException, IOException {
    System.out.println("Job started");
    SparkConf conf = new SparkConf().setAppName("HandlingTimeFilter");
    conf.setMaster("local[10]");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("zookeeper.connect", "cer:2181");
    kafkaParams.put("bootstrap.servers", "cer:9099,lanthan:9099,uran:9099,protactinium:9099");
    kafkaParams.put("consumer.id", "SimpleSparkConsumer");
    kafkaParams.put("group.id", "consumers");

    Set<String> topics = new HashSet<>();
    topics.add("traceEvents");

    JavaPairInputDStream<String, TraceEvent> stream = 
                               KafkaUtils.createDirectStream(
                               jssc, String.class, TraceEvent.class,
                               StringDecoder.class, TraceEventDecoder.class,
                               kafkaParams, topics);

    stream.foreachRDD((Function<JavaPairRDD<String, TraceEvent>, Void>) rdd -> {
        Map<String, TraceEvent> events = rdd.collectAsMap();
        System.out.println("New RDD call, size=" + events.size());

        return null;
    });

    jssc.start();

    Thread.sleep(60000);
    jssc.stop();
}
8iwquhpp

8iwquhpp1#

您的traceeventdecoder构造函数是公共的吗?是否有traceeventdecoder(verifiableproperties)版本?当kafkadirectstream的初始化参数出错时,可能会发生此错误。我已经向apache提交了一个与此案例相关的问题:https://issues.apache.org/jira/browse/spark-9780

vof42yt1

vof42yt12#

我在spark 1.2.0中遇到了这个问题,但是升级到了1.3.1,现在可以工作了。我认为这个问题可能与缺少依赖性有关。如果您使用的是maven,那么可以调整pom.xml以构建适当的uberjar。

<project>
<groupId>com.databricks.apps.logs</groupId>
<artifactId>log-analyzer</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Databricks Spark Logs Analyzer</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
    <repository>
        <id>Akka repository</id>
        <url>http://repo.akka.io/releases</url>
    </repository>
</repositories>
<dependencies>
    <dependency> <!-- Spark -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency> <!-- Spark SQL -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency> <!-- Spark Streaming -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.3.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>
    <dependency> <!-- Command Line Parsing -->
        <groupId>commons-cli</groupId>
        <artifactId>commons-cli</artifactId>
        <version>1.2</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <filters>
                    <filter>
                        <artifact>*:*</artifact>
                        <excludes>
                            <exclude>META-INF/*.SF</exclude>
                            <exclude>META-INF/*.DSA</exclude>
                            <exclude>META-INF/*.RSA</exclude>
                        </excludes>
                    </filter>
                </filters>
                <finalName>uber-${project.artifactId}-${project.version}</finalName>
            </configuration>
        </plugin>
    </plugins>
</build>

资料来源:http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html (我更新了上面的版本以使用spark 1.3.1并包含kafka依赖项)

相关问题