spark1.2:将一条记录写入多个文件(黑名单)

2izufjch  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(432)

我尝试为rdd到hdfs中的每条记录编写多个文件(每个黑名单一个,按一个键分组),每个文件集上都应用了一个黑名单。
首先,我将multipletextoutputformat与keyby结合使用,按照记录中的一个字段对输出文件进行分组,效果很好。因此,我的输出文件现在由一个键命名,来自记录,记录被分组在这个文件中。
但我现在的问题是,我需要对输出应用黑名单,并分别保存每个输出。我用了一个简单的过滤器。现在发生的情况是,应用这个文件管理器会导致工作被执行x次,针对x个不同的黑名单。对于大量的记录来说,这是不可接受的。即使以前调用Dataframe上的缓存函数。
为了明确我想要输入的是一个巨大的分布式hdfs文件,其结构如下:

ID, Name, .

我的输出目录应该如下所示:

blacklisted1/ID1.file -> Content:(ID, Name, ...)
          /ID2.file
blacklisted2/ID1.file
          /ID2.file

目标是只读取每条记录一次,然后为其写入这些文件集。我目前的方法是使用foreachpartition,并通过迭代记录在hdfs中手动创建一个文件,但对我来说,必须有一个更简单的解决方案。

kxkpmulp

kxkpmulp1#

因为您的密钥不能保证在一个黑名单中,所以我要做的是生成一个flatmap,在这里您将(key,value)rdd更改为(blacklist key,value),其中数据可能重复,然后使用multipletextoutputformat根据“blacklist/key”写入文件路径。类似于此:

// Simple MultipleTextOutputFormat to output filename by key
// Directory separators are handled gracefully
class BlacklistMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

myKvRdd = ... // your code to generate your RDD of (key, value)
blacklist:Map[String, Set[String]] = ... // your code to generate (key -> Set[Blacklist]) 

// Change it to (blacklist/key, value) where there is one row per key per blacklist that it blongs to
blacklisted = myKvRdd.flatMap(kv => blacklist.get(kv._1).map(b => (b + '/' + kv._1, kv._2)

// 100 partitions is pretty arbitrary, but it should be sufficiently large so that you don't OOM your executors
blacklisted.partitionBy(new HashPartitioner(100))
    .saveAsHadoopFile("absoluteOutputPath", classOf[String], classOf[String],
        classOf[BlacklistMultipleTextOutputFormat])

然后您可以在rdd的一个过程中生成输出。假设这个rdd比原来的要大,这个工作流似乎更适合spark的优化。

相关问题