我正在编程处理从apachekafka到elasticsearch的数据。为此,我使用apachespark。我已经浏览了许多链接,但找不到将apachespark中的javadstream中的数据写入elasticsearch的示例。
下面是spark的示例代码,它从kafka获取数据并打印出来。
import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.List;
public class SparkStream {
public static JavaSparkContext sc;
public static List<Map<String, ?>> alldocs;
public static void main(String args[])
{
if(args.length != 2)
{
System.out.println("SparkStream <broker1-host:port,broker2-host:port><topic1,topic2,...>");
System.exit(1);
}
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf sparkConf=new SparkConf().setAppName("Data Streaming");
sparkConf.setMaster("local[2]");
sparkConf.set("es.index.auto.create", "true");
sparkConf.set("es.nodes","localhost");
sparkConf.set("es.port","9200");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet=new HashSet<>(Arrays.asList(args[1].split(",")));
Map<String,String> kafkaParams=new HashMap<>();
String brokers=args[0];
kafkaParams.put("metadata.broker.list",brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("offsets.storage", "zookeeper");
JavaPairDStream<String, String> messages=KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
`
2条答案
按热度按时间s71maibg1#
保存到ElasticSearch的一种方法是使用
saveToEs
方法内部foreachRDD
功能。您希望使用的任何其他方法仍然需要foreachRDD
呼叫您的数据流。例如:
更多信息请参见此处
cotxawn72#
在这个代码块中,“dstream”是从服务器(如kafka)观察数据的数据流。在“todf()”的括号内,必须使用标题。在“savetoes()”中,必须使用elasticsearch索引。在此之前,您需要创建sqlcontext。
如果您使用kafka发送数据,则必须添加下面提到的依赖项
获取依赖关系
要查看完整示例,请参阅
在本例中,首先您必须创建kafka producer“test”,然后在运行程序后启动elasticsearch。您可以使用上面的url查看完整的sbt和代码。