任务不能在spark读取序列化输入中序列化

jaxagkaj  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(439)

我正在研究一个基于spark的kafka消费者,它以avro格式读取数据。下面是try-catch代码读取和处理输入。

import java.util.*;
import java.io.*;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.StringDecoder;
import kafka.serializer.DefaultDecoder;
import scala.Tuple2;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import kafka.producer.KeyedMessage;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

public class myKafkaConsumer{
  /**
  * Main function, entry point to the program.
  * @param args, takes the user-ids as the parameters, which 
  *will be treated as topics
  * in our case.
  */
  private String [] topics;
  private SparkConf sparkConf;
  private JavaStreamingContext jssc;

  public static final String USER_SCHEMA = "{"
           + "\"type\":\"record\","
           + "\"name\":\"myrecord\","
           + "\"fields\":["
           + "  { \"name\":\"str1\", \"type\":\"string\" },"
           + "  { \"name\":\"int1\", \"type\":\"int\" }"
           + "]}";

  public static void main(String [] args){
    if(args.length < 1){
      System.err.println("Usage : myKafkaConsumber <topics/user-id>");
      System.exit(1);
    }
    myKafkaConsumer bKC = new myKafkaConsumer(args);
    bKC.run();
 }

  /**
  * Constructor
  */
  private myKafkaConsumer(String [] topics){
    this.topics = topics;
    sparkConf = new SparkConf();
    sparkConf = sparkConf.setAppName("JavaDirectKafkaFilterMessages");
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
  }

  /**
  * run function, runs the entire program.
  * @param topics, a string array containing the topics to be read from
  * @return void
  */
  private void run(){
    HashSet<String> topicSet = new HashSet<String>();
     for(String topic : topics){
      topicSet.add(topic);
      System.out.println(topic);
    }
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "128.208.244.3:9092");
    kafkaParams.put("auto.offset.reset", "smallest");
    try{
      JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        byte[].class,
        StringDecoder.class,
    DefaultDecoder.class,
    kafkaParams,
    topicSet
  );

  JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){
    public String call(Tuple2<String, byte[]> tuple2){
        return testFunction(tuple2._2().toString());
    }
  });
  avroRows.print();
  jssc.start();
  jssc.awaitTermination();
}catch(Exception E){
  System.out.println(E.toString());
  E.printStackTrace();
  }
}

private static String testFunction(String str){
  System.out.println("Input String : " + str);
  return "Success";
}
}

代码编译正确,但是,当我尝试在spark集群上运行代码时,出现task not serializable错误。我试着删除函数并简单地打印一些文本,但是错误仍然存在。
p、 我检查了打印的信息,发现他们是正确的阅读。

dxpyg8gm

dxpyg8gm1#

print语句将rdd收集到驱动程序,以便在屏幕上打印它们。这样的任务会触发数据的序列化/反序列化。
为了使代码正常工作 avroRows 数据流必须是可序列化类型。
例如,如果将avrorows定义替换为:

JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){
    public String call(Tuple2<String, byte[]> tuple2){
        return tuple2._2().toString();
    }
});

我只是在您的记录中添加了一个tostring,因为字符串类型是可序列化的(当然,它不一定是您需要的,它只是一个示例)。

相关问题