我实施的是:
val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
val sc = SparkContext.getOrCreate(sparkConf)
val sparkRDD = sc.wholeTextFiles("sample.csv", 10)
此rdd由上下文缓存
val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)
val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)
现在,如果任何spark作业需要访问这个rdd,它不需要创建新的rdd,而是从ignite缓存中检索它。
val RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
输入文件的示例数据
25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT
我需要调用groupbykey()api对输入文件的status列进行分组。
谢谢你的帮助。
谢谢
2条答案
按热度按时间d7v8vwbk1#
wholeTextFiles
将整个文件放在一个条目中,所以将其保存在ignite中没有多大意义。基本上,您将拥有一个包含一个大条目的分布式缓存。这没有任何好处。您应该首先分割文件,并将每一行保存为一个单独的元组。然后您可以将这些数据保存在ignite中,并使用RDDAPI对其进行处理。
ej83mcc02#
使用
cache
或者persist
应该避免rdd
重新创造。你可以选择保存rdds
in-memory
,memory-disk
,serialized
,deserialized
等。IgniteContext
是上述解决方案的替代方案。这对我来说有点棘手
groupByKey
在你保存的rdd
在IgniteContext
,如您所用wholeTextFile
读取文本文件。WholeTextFile
将生成Tuple2
rdd of(“文本文件的路径”,“文本行”)一旦你读到保存的
rdd
fromCache
的IgniteContext
```val RDDfromCache = igniteContext.fromCacheString, String
输出将是