FileStream的Java实现

vof42yt1  于 2023-06-04  发布在  Java
关注(0)|答案(2)|浏览(202)

我有一些麻烦的编程与Spark流。因为我想创建一个输入流,并使用自定义的输入格式读取它们。定义如下:

def fileStream[K, V, F <: NewInputFormat[K, V]](
      directory: String): JavaPairInputDStream[K, V] = {
    implicit val cmk: ClassTag[K] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
    implicit val cmv: ClassTag[V] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
    implicit val cmf: ClassTag[F] =
      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
    ssc.fileStream[K, V, F](directory)
}

如果我使用的是scala,那么我将编写如下代码:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDirectory)

但是,当我像这样使用java时:

ClassTag<LongWritable> k = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag<Text> v = scala.reflect.ClassTag$.MODULE$.apply(Text.class);
ClassTag<InputFormat<LongWritable, Text>> f = scala.reflect.ClassTag$.MODULE$.apply(TextInputFormat.class);
JavaPairInputDStream<LongWritable, Text> inputLines = ssc.fileStream<k, v, f>("dataDirectory);

我会遇到“fileStream无法解析或不是字段”的错误。那么,如何使用JavaStreamingContext.fileStream呢?
我用下面的代码创建了ssc:

JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("Spark Streaming Demo"), new Duration(3000));

谢谢!

bfnvny8b

bfnvny8b1#

fileStream无法解析或不是字段问题是fileStream使用不当导致的。当使用fileStream时,如下所示:

JavaPairInputDStream<LongWritable, Text> inputLines = ssc.<LongWritable, Text, TestInputFormat>fileStream(dataDirectory);

TestInputFormat必须扩展OutputFormat<LongWritable, Text>

public interface TestOutputFormat extends OutputFormat<LongWritable, Text>

这样使用是没有问题的,但是需要使用**旧API(org.apache.hadoop.mapred.*)**实现TestInputFormat的Class。我还没试过这个^_^

nfs0ujit

nfs0ujit2#

你需要加上

import java.io.File;

import java.io.*;

相关问题