JavaRDD<String> history_ = sc.emptyRDD();
java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);
JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
return new Tuple2< String,ArrayList<String> >(null,null);
});
JavaPairInputDStream<String, GenericData.Record> stream_1 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_1);
JavaPairInputDStream<String, GenericData.Record> stream_2 =
KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
GenericDataRecordDecoder.class, props, topicsSet_2);
然后进行一些转换并创建类型为的twp dstream data_1和data_2
JavaPairDStream<String, <ArrayList<String>>
并按如下方式进行连接,然后过滤掉那些没有连接键的记录,并将它们保存在历史记录中,以便在下一批中通过与数据\u 1进行联合来使用它
Data_1 = Data_1.union(history);
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
Data_1.leftOuterJoin(Data_2).cache();
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());
history = dstream_filtered.mapToPair(r -> {
return new Tuple2<>(r._1,r._2._1);
}).persist;
我在上一步之后获得了历史记录(通过将其保存到hdfs进行检查),但是在执行union时,这个历史记录仍然是成批空的。
1条答案
按热度按时间zwghvu4y1#
从概念上讲,不可能“记住”一个单词
DStream
.DStreams
是否有时间限制,在每个时钟周期(称为“批处理间隔”)上DStream
表示在该时间段内流中观察到的数据。因此,我们不能有一个“旧的”
DStream
保存以加入“新”DStream
. 全部DStreams
活在“现在”。的底层数据结构
DStreams
是RDD
:每个批次间隔DStream
将有1个RDD
该时间间隔的数据。RDD
表示数据的分布式集合。RDD
只要我们有对它们的引用,它们是不变的和永久的。我们可以合并
RDD
s和DStream
s来创建此处所需的“历史滚动”。它看起来与这个问题上的方法非常相似,但只使用
history
RDD
.以下是建议更改的高级视图:
这只是一个起点。还有其他关于
checkpoint
惯性导航与制导。否则history
rdd将无限增长,直到发生堆栈溢出。这篇博客非常完整地介绍了这种特殊的技术:http://www.spark.tc/stateful-spark-streaming-using-transform/我还建议您使用scala而不是java。java语法太冗长,无法与spark流一起使用。