如何将时间戳附加到rdd并推送到elasticsearch

gwo2fgha  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(343)

我对spark流媒体和elasticsearch是新手,我正在尝试使用spark从kafka主题读取数据,并将数据存储为rdd。在rdd中,我想在新数据到来时添加时间戳,然后推送到elasticsearch。

lines.foreachRDD(rdd -> {
        if(!rdd.isEmpty()){
        // rdd.collect().forEach(System.out::println);
        String timeStamp = new 
        SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
        List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
        List<String> f = rdd.collect();

        Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
        Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);

        JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));

        JavaEsSpark.saveToEs(javaRDD, "sample/docs");
        }
    });
50pmv0ei

50pmv0ei1#

Spark?

据我所知,spark streaming是用于实时流数据计算的,比如 map , reduce , join 以及 window . 似乎没有必要使用如此强大的工具,因为我们需要的只是为事件添加一个时间戳。

logstash?

如果是这种情况,logstash可能更适合我们的情况。

logstash将记录事件发生时的时间戳,它还具有持久队列和死信队列,以确保数据的弹性。它支持将数据推送到es(毕竟它们属于一系列产品),这使得将数据推送到es非常容易。

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logstash-%{type}-%{+YYYY.MM.dd}"
  }
}

更多

关于logstash的更多信息,这里是介绍。
下面是一个示例logstash配置文件。
希望这有帮助。

参考

部署和扩展日志存储

7kqas0il

7kqas0il2#

如果您使用spark streaming的目的只是将数据从kafka传输到elasticsearch,那么使用kafka connect将是一种更整洁的方式,而且不需要任何编码。
有一个ElasticSearchKafka连接Flume。根据您想对时间戳执行的操作(例如,对于索引路由或添加为字段),您可以使用单个消息转换(这里有一个示例)。

相关问题