如何使用`ssc.filestream()`读取Parquet文件?传递给“ssc.filestream()”的类型是什么?

wmomyfyw  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(458)

我对spark的理解 fileStream() 方法是将三种类型作为参数: Key , Value ,和 Format . 对于文本文件,适当的类型为: LongWritable , Text ,和 TextInputFormat .
首先,我想了解这些类型的本质。凭直觉,我猜 Key 在本例中,是文件的行号 Value 是那一行的文字。因此,在下面的文本文件示例中:

Hello
Test
Another Test

第一排 DStream 会有一个 Key1 ( 0 ?)和 ValueHello .
是这样吗?
问题的第二部分:我查看了 ParquetInputFormat 我注意到一些奇怪的事情:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...
``` `TextInputFormat` 延伸 `FileInputFormat` 类型 `LongWritable` 以及 `Text` ,鉴于 `ParquetInputFormat` 扩展相同类型的类 `Void` 以及 `T` .
这是否意味着我必须创建一个 `Value` 类来保存我的Parquet地板数据的整行,然后传递类型 `<Void, MyClass, ParquetInputFormat<MyClass>>` 至 `ssc.fileStream()` ?
如果是,我应该如何实施 `MyClass` ?
编辑1:我注意到一个 `readSupportClass` 要传递给 `ParquetInputFormat` 物体。这是什么类型的类,如何使用它来解析parquet文件?是否有相关文件?
编辑2:据我所知,这是不可能的。如果有人知道如何在Parquet文件流Spark然后请随时分享。。。
azpvetkf

azpvetkf1#

您可以通过添加一些 parquet 具体 hadoop 设置:

val ssc = new StreamingContext(conf, Seconds(5))
var schema =StructType(Seq(
      StructField("a", StringType, nullable = false),
      ........

     ))
val schemaJson=schema.json

val fileDir="/tmp/fileDir"
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")  ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")

val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)

streamRdd.count().print()

ssc.start()
ssc.awaitTermination()

此代码是用 Spark 2.1.0 .

vatpfxk5

vatpfxk52#

下面是我在spark流媒体中读取Parquet文件的示例。

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
  directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
  println("row:" + row.toString())
  row
})

有些观点是。。。
记录类型为genericrecord
readsupportclass是avroreadsupport
将配置传递到filestream
将parquet.read.support.class设置为配置
我参考了下面的源代码来创建示例。
我也找不到好的例子。
我想等更好的。
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/streamingcontext.scala
https://github.com/parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/parquetinputformat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/fileinputdstream.scala

相关问题