如何在sparkscala中动态读取textfile(字符串类型数据)Map并将数据加载到parquet格式(具有不同数据类型的多列)

eufgjt7s  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(460)

我们使用sqoop作为文本文件格式将数据从源rdbms系统导入hadoop环境。这个文本文件需要加载到Parquet格式的配置单元表中。我们如何在不使用hive支持(之前我们使用beeline insert,现在我们设计不再使用hive)和使用parquet直接写入hdfs的情况下实现这个场景。
例如:-在sqoop导入之后,假设我们在hdfs target dir下有一个文件/数据/loc/mydb/mytable
mytable和all中的数据都是string类型。

-----------------------------------------
10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|20.0|2016-09-08  10:45:00.0
30|customer3|30.0|2016-09-10  03:26:00.0
------------------------------------------

目标配置单元表架构。

rec_id: int
rec_name: String
rec_value: Decimal(2,1)
rec_created: Timestamp

如何使用spark和动态管理所有列的类型转换,将数据从mytable加载到目标底层配置单元表位置(parquet格式)。
请注意:这里不能使用hivecontext。在此方法中的任何帮助都是非常感谢的。提前谢谢。

2mbi3lxu

2mbi3lxu1#

下面的例子是 .csv 文件的格式与问题中的格式相同。
我想先解释一些细节。
在“表架构”字段中: rec_value: Decimal(2,1) 一定会的 rec_value: Decimal(3,1) 原因如下:
这个 DECIMAL 类型表示具有固定值的数字 precision 以及 scale . 当您创建 DECIMAL 列中,指定 precision 、p和 scale ,s。 Precision 是总位数,与小数点的位置无关。 Scale 小数点后的位数。为了在不损失精度的情况下表示数字10.0,您需要 DECIMAL 键入 precision 至少3个,以及 scale 至少为1。
所以 Hive 表将是:

CREATE TABLE tab_data (
  rec_id INT,
  rec_name STRING,
  rec_value DECIMAL(3,1),
  rec_created TIMESTAMP
) STORED AS PARQUET;

完整的scala代码

import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}

object CsvToParquet {

  val spark = SparkSession
    .builder()
    .appName("CsvToParquet")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
    .config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
                                                         // The convention used by Spark to write Parquet data is configurable.
                                                         // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                         // The default value is false. If set to "true",
                                                         // Spark will use the same convention as Hive for writing the Parquet data.
    .getOrCreate()

  val sc = spark.sparkContext

  val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
  val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {

      val DecimalType = DataTypes.createDecimalType(3, 1)

      /**
        * the data schema
        */
      val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
                   StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))

      /**
        * Reading the data from HDFS as .csv text file
        */
      val data = spark
        .read
        .option("sep","|")
        .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
        .option("inferSchema",false)
        .schema(schema)
        .csv(inputPath)

       data.show(truncate = false)
       data.schema.printTreeString()

      /**
        * Writing the data as Parquet file
        */
      data
        .write
        .mode(SaveMode.Append)
        .option("compression", "none") // Assuming no data compression
        .parquet(outputPath)

    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输入文件为 .csv 制表符分隔字段

10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|24.0|2016-09-08  10:45:00.0
30|customer3|35.0|2016-09-10  03:26:00.0
40|customer1|46.0|2016-09-11  08:38:00.0
........

阅读 Spark ```
+------+---------+---------+-------------------+
|rec_id|rec_name |rec_value|rec_created |
+------+---------+---------+-------------------+
|10 |customer1|10.0 |2016-09-07 08:38:00|
|20 |customer2|24.0 |2016-09-08 10:45:00|
|30 |customer3|35.0 |2016-09-10 03:26:00|
|40 |customer1|46.0 |2016-09-11 08:38:00|
......

架构

root
|-- rec_id: integer (nullable = true)
|-- rec_name: string (nullable = true)
|-- rec_value: decimal(3,1) (nullable = true)
|-- rec_created: timestamp (nullable = true)

阅读 `Hive` ```
SELECT *
FROM tab_data;

+------------------+--------------------+---------------------+------------------------+--+
| tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  |  tab_data.rec_created  |
+------------------+--------------------+---------------------+------------------------+--+
| 10               | customer1          | 10                  | 2016-09-07 08:38:00.0  |
| 20               | customer2          | 24                  | 2016-09-08 10:45:00.0  |
| 30               | customer3          | 35                  | 2016-09-10 03:26:00.0  |
| 40               | customer1          | 46                  | 2016-09-11 08:38:00.0  |
.....

希望这有帮助。

相关问题