我正在研究一个基于spark的kafka消费者,它以avro格式读取数据。下面是try-catch代码读取和处理输入。
import java.util.*;
import java.io.*;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import kafka.serializer.StringDecoder;
import kafka.serializer.DefaultDecoder;
import scala.Tuple2;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import kafka.producer.KeyedMessage;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
public class myKafkaConsumer{
/**
* Main function, entry point to the program.
* @param args, takes the user-ids as the parameters, which
*will be treated as topics
* in our case.
*/
private String [] topics;
private SparkConf sparkConf;
private JavaStreamingContext jssc;
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String [] args){
if(args.length < 1){
System.err.println("Usage : myKafkaConsumber <topics/user-id>");
System.exit(1);
}
myKafkaConsumer bKC = new myKafkaConsumer(args);
bKC.run();
}
/**
* Constructor
*/
private myKafkaConsumer(String [] topics){
this.topics = topics;
sparkConf = new SparkConf();
sparkConf = sparkConf.setAppName("JavaDirectKafkaFilterMessages");
jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
}
/**
* run function, runs the entire program.
* @param topics, a string array containing the topics to be read from
* @return void
*/
private void run(){
HashSet<String> topicSet = new HashSet<String>();
for(String topic : topics){
topicSet.add(topic);
System.out.println(topic);
}
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "128.208.244.3:9092");
kafkaParams.put("auto.offset.reset", "smallest");
try{
JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicSet
);
JavaDStream<String> avroRows = messages.map(new Function<Tuple2<String, byte[]>, String>(){
public String call(Tuple2<String, byte[]> tuple2){
return testFunction(tuple2._2().toString());
}
});
avroRows.print();
jssc.start();
jssc.awaitTermination();
}catch(Exception E){
System.out.println(E.toString());
E.printStackTrace();
}
}
private static String testFunction(String str){
System.out.println("Input String : " + str);
return "Success";
}
}
代码编译正确,但是,当我尝试在spark集群上运行代码时,出现task not serializable错误。我试着删除函数并简单地打印一些文本,但是错误仍然存在。
p、 我检查了打印的信息,发现他们是正确的阅读。
1条答案
按热度按时间dxpyg8gm1#
print语句将rdd收集到驱动程序,以便在屏幕上打印它们。这样的任务会触发数据的序列化/反序列化。
为了使代码正常工作
avroRows
数据流必须是可序列化类型。例如,如果将avrorows定义替换为:
我只是在您的记录中添加了一个tostring,因为字符串类型是可序列化的(当然,它不一定是您需要的,它只是一个示例)。