我对spark流媒体和elasticsearch是新手,我正在尝试使用spark从kafka主题读取数据,并将数据存储为rdd。在rdd中,我想在新数据到来时添加时间戳,然后推送到elasticsearch。
lines.foreachRDD(rdd -> {
if(!rdd.isEmpty()){
// rdd.collect().forEach(System.out::println);
String timeStamp = new
SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
List<String> f = rdd.collect();
Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);
JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));
JavaEsSpark.saveToEs(javaRDD, "sample/docs");
}
});
2条答案
按热度按时间50pmv0ei1#
Spark?
据我所知,spark streaming是用于实时流数据计算的,比如
map
,reduce
,join
以及window
. 似乎没有必要使用如此强大的工具,因为我们需要的只是为事件添加一个时间戳。logstash?
如果是这种情况,logstash可能更适合我们的情况。
logstash将记录事件发生时的时间戳,它还具有持久队列和死信队列,以确保数据的弹性。它支持将数据推送到es(毕竟它们属于一系列产品),这使得将数据推送到es非常容易。
更多
关于logstash的更多信息,这里是介绍。
下面是一个示例logstash配置文件。
希望这有帮助。
参考
部署和扩展日志存储
7kqas0il2#
如果您使用spark streaming的目的只是将数据从kafka传输到elasticsearch,那么使用kafka connect将是一种更整洁的方式,而且不需要任何编码。
有一个ElasticSearchKafka连接Flume。根据您想对时间戳执行的操作(例如,对于索引路由或添加为字段),您可以使用单个消息转换(这里有一个示例)。