我的问题类似于scala中的一个已经回答的问题,用于读取文件。从spark转换函数中动态读取hdfs中的文件我知道累加器使用它们将结果返回到驱动程序并在驱动程序中写入hdfs。在我们的用例中,每个执行器的输出都很大,因此我正在寻找一种在java转换中写入hdfs的方法。谢谢!
pzfprimi1#
终于找到了一个优雅的方法来实现这一点。为hadoop配置创建广播变量
Configuration configuration = JavaSparkContext.toSparkContext(context).hadoopConfiguration(); Broadcast<SerializableWritable<Configuration>> bc = context.broadcast(new SerializableWritable<Configuration>(configuration));
将此广播变量传递给您的转换或操作,并使用以下代码段获取hadoop文件系统:
FileSystem fileSystem = FileSystem.get(bc.getValue().value());
希望这对其他人有帮助。干杯!
nszi6y052#
JavaPairInputDStream<String, byte[]> input = KafkaUtils.createJDQDirectStream(ssc, String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, Collections.singleton(topicName)); JavaPairDStream<String, byte[]> output = input.transformToPair(new Function<JavaPairRDD<String, byte[]>, JavaPairRDD<String, byte[]>>() { public JavaPairRDD<String, byte[]> call(JavaPairRDD<String, byte[]> stringJavaPairRDD) throws Exception { JavaSparkContext sc = JavaSparkContext.fromSparkContext(stringJavaPairRDD.context()); stringJavaPairRDD.saveAsTextFile("hdfs://"); return stringJavaPairRDD; } });
2条答案
按热度按时间pzfprimi1#
终于找到了一个优雅的方法来实现这一点。为hadoop配置创建广播变量
将此广播变量传递给您的转换或操作,并使用以下代码段获取hadoop文件系统:
希望这对其他人有帮助。
干杯!
nszi6y052#