hadoop—从java中的spark转换函数中写入hdfs中的文件

aiazj4mn  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(910)

我的问题类似于scala中的一个已经回答的问题,用于读取文件。
从spark转换函数中动态读取hdfs中的文件
我知道累加器使用它们将结果返回到驱动程序并在驱动程序中写入hdfs。在我们的用例中,每个执行器的输出都很大,因此我正在寻找一种在java转换中写入hdfs的方法。
谢谢!

pzfprimi

pzfprimi1#

终于找到了一个优雅的方法来实现这一点。为hadoop配置创建广播变量

Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration();
Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration));

将此广播变量传递给您的转换或操作,并使用以下代码段获取hadoop文件系统:

FileSystem fileSystem = FileSystem.get(bc.getValue().value());

希望这对其他人有帮助。
干杯!

nszi6y05

nszi6y052#

JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class,
        StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName));

JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() {
    public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context());
        stringJavaPairRDD.saveAsTextFile("hdfs://");
        return stringJavaPairRDD;
    }
});

相关问题