我有一些麻烦的编程与Spark流。因为我想创建一个输入流,并使用自定义的输入格式读取它们。定义如下:
def fileStream[K, V, F <: NewInputFormat[K, V]](
directory: String): JavaPairInputDStream[K, V] = {
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory)
}
如果我使用的是scala,那么我将编写如下代码:
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](dataDirectory)
但是,当我像这样使用java时:
ClassTag<LongWritable> k = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
ClassTag<Text> v = scala.reflect.ClassTag$.MODULE$.apply(Text.class);
ClassTag<InputFormat<LongWritable, Text>> f = scala.reflect.ClassTag$.MODULE$.apply(TextInputFormat.class);
JavaPairInputDStream<LongWritable, Text> inputLines = ssc.fileStream<k, v, f>("dataDirectory);
我会遇到“fileStream无法解析或不是字段”的错误。那么,如何使用JavaStreamingContext.fileStream呢?
我用下面的代码创建了ssc:
JavaStreamingContext ssc = new JavaStreamingContext(new SparkConf().setAppName("Spark Streaming Demo"), new Duration(3000));
谢谢!
2条答案
按热度按时间bfnvny8b1#
fileStream无法解析或不是字段问题是
fileStream
使用不当导致的。当使用fileStream
时,如下所示:而
TestInputFormat
必须扩展OutputFormat<LongWritable, Text>
。这样使用是没有问题的,但是需要使用**旧API(org.apache.hadoop.mapred.*)**实现TestInputFormat的Class。我还没试过这个^_^
nfs0ujit2#
你需要加上
或