我对spark的理解 fileStream()
方法是将三种类型作为参数: Key
, Value
,和 Format
. 对于文本文件,适当的类型为: LongWritable
, Text
,和 TextInputFormat
.
首先,我想了解这些类型的本质。凭直觉,我猜 Key
在本例中,是文件的行号 Value
是那一行的文字。因此,在下面的文本文件示例中:
Hello
Test
Another Test
第一排 DStream
会有一个 Key
的 1
( 0
?)和 Value
的 Hello
.
是这样吗?
问题的第二部分:我查看了 ParquetInputFormat
我注意到一些奇怪的事情:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
``` `TextInputFormat` 延伸 `FileInputFormat` 类型 `LongWritable` 以及 `Text` ,鉴于 `ParquetInputFormat` 扩展相同类型的类 `Void` 以及 `T` .
这是否意味着我必须创建一个 `Value` 类来保存我的Parquet地板数据的整行,然后传递类型 `<Void, MyClass, ParquetInputFormat<MyClass>>` 至 `ssc.fileStream()` ?
如果是,我应该如何实施 `MyClass` ?
编辑1:我注意到一个 `readSupportClass` 要传递给 `ParquetInputFormat` 物体。这是什么类型的类,如何使用它来解析parquet文件?是否有相关文件?
编辑2:据我所知,这是不可能的。如果有人知道如何在Parquet文件流Spark然后请随时分享。。。
2条答案
按热度按时间azpvetkf1#
您可以通过添加一些
parquet
具体hadoop
设置:此代码是用
Spark 2.1.0
.vatpfxk52#
下面是我在spark流媒体中读取Parquet文件的示例。
有些观点是。。。
记录类型为genericrecord
readsupportclass是avroreadsupport
将配置传递到filestream
将parquet.read.support.class设置为配置
我参考了下面的源代码来创建示例。
我也找不到好的例子。
我想等更好的。
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/streamingcontext.scala
https://github.com/parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/parquetinputformat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/fileinputdstream.scala