将apachespark中的javadstream< string>中的数据写入elasticsearch

fivyi3re  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(360)

我正在编程处理从apachekafka到elasticsearch的数据。为此,我使用apachespark。我已经浏览了许多链接,但找不到将apachespark中的javadstream中的数据写入elasticsearch的示例。
下面是spark的示例代码,它从kafka获取数据并打印出来。

import org.apache.log4j.Logger;
import org.apache.log4j.Level;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import scala.Tuple2;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import com.google.common.collect.ImmutableMap;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.List;
public class SparkStream {

    public static JavaSparkContext sc;
    public static List<Map<String, ?>> alldocs;

    public static void main(String args[])
    {
        if(args.length != 2)
        {
            System.out.println("SparkStream <broker1-host:port,broker2-host:port><topic1,topic2,...>");

            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        SparkConf sparkConf=new SparkConf().setAppName("Data Streaming");
        sparkConf.setMaster("local[2]");
        sparkConf.set("es.index.auto.create", "true");
        sparkConf.set("es.nodes","localhost");
        sparkConf.set("es.port","9200");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

        Set<String> topicsSet=new HashSet<>(Arrays.asList(args[1].split(",")));
        Map<String,String> kafkaParams=new HashMap<>();
        String brokers=args[0];
        kafkaParams.put("metadata.broker.list",brokers);
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("offsets.storage", "zookeeper");
        JavaPairDStream<String, String> messages=KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
              return tuple2._2();
            }
        });
        lines.print();
       jssc.start();
       jssc.awaitTermination();
    }
}

`

s71maibg

s71maibg1#

保存到ElasticSearch的一种方法是使用 saveToEs 方法内部 foreachRDD 功能。您希望使用的任何其他方法仍然需要 foreachRDD 呼叫您的数据流。
例如:

lines.foreachRDD(lambda rdd: rdd.saveToEs("ESresource"))

更多信息请参见此处

cotxawn7

cotxawn72#

dstream.foreachRDD{rdd=>
        val es = sqlContext.createDataFrame(rdd).toDF("use headings suitable for your dataset")
        import org.elasticsearch.spark.sql._
        es.saveToEs("wordcount/testing")
  es.show()
}

在这个代码块中,“dstream”是从服务器(如kafka)观察数据的数据流。在“todf()”的括号内,必须使用标题。在“savetoes()”中,必须使用elasticsearch索引。在此之前,您需要创建sqlcontext。

val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())

如果您使用kafka发送数据,则必须添加下面提到的依赖项

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.2.1"

获取依赖关系
要查看完整示例,请参阅
在本例中,首先您必须创建kafka producer“test”,然后在运行程序后启动elasticsearch。您可以使用上面的url查看完整的sbt和代码。

相关问题