如何在scala中用正确的名称划分表

ig9co6j1  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(388)

我在Scala2.4.0中有一个很大的Dataframe,看起来像这样

+--------------------+--------------------+--------------------+-------------------+--------------+------+                                      
|              cookie|       updated_score|         probability|    date_last_score|partition_date|target|                 
+--------------------+--------------------+--------------------+-------------------+--------------+------+
|00000000000001074780|  0.1110987111481027| 0.27492987342938174|2019-03-29 16:00:00| 2019-04-07_10|     0|                
|00000000000001673799| 0.02621894072693878|  0.2029688362968775|2019-03-19 08:00:00| 2019-04-07_10|     0|                   
|00000000000002147908| 0.18922034021212567|  0.3520678649755828|2019-03-31 19:00:00| 2019-04-09_12|     1|            
|00000000000004028302| 0.06803669083452231| 0.23089047208736854|2019-03-25 17:00:00| 2019-04-07_10|     0|

这个模式是:

root                                                               
 |-- cookie: string (nullable = true)                                       
 |-- updated_score: double (nullable = true)                                      
 |-- probability: double (nullable = true)                                      
 |-- date_last_score: string (nullable = true)                                         
 |-- partition_date: string (nullable = true)                                              
 |-- target: integer (nullable = false)

然后我创建一个分区表并将数据插入database.table\u name。但当我查看配置单元数据库并键入:show partitions database.table\ u name时,我只得到partition\ u date=0和partition\ u date=1,0和1不是partition\ u date列中的值。
我不知道我是否写错了什么,有一些scala的概念我不明白或者dataframe太大了。
我尝试了不同的方法来解决类似的问题:

result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")

或者

result_df.write.mode(SaveMode.Overwrite).saveAsTable("table_name")

如果这有助于我从scala提供一些信息:
看到这条消息,我想我的结果是正确的。

19/07/31 07:53:57 INFO TaskSetManager: Starting task 11.0 in stage 2822.0 (TID 123456, ip-xx-xx-xx.aws.local.somewhere, executor 45, partition 11, PROCESS_LOCAL, 7767 bytes)
19/07/31 07:53:57 INFO TaskSetManager: Starting task 61.0 in stage 2815.0 (TID 123457, ip-xx-xx-xx-xyz.aws.local.somewhere, executor 33, partition 61, NODE_LOCAL, 8095 bytes)

然后,我开始将分区保存为向量(0,1,2…),但我可能只保存0和1?我真的不知道。

19/07/31 07:56:02 INFO DAGScheduler: Submitting 35 missing tasks from ShuffleMapStage 2967 (MapPartitionsRDD[130590] at insertInto at evaluate_decay_factor.scala:165) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
19/07/31 07:56:02 INFO YarnScheduler: Adding task set 2967.0 with 35 tasks
19/07/31 07:56:02 INFO DAGScheduler: Submitting ShuffleMapStage 2965 (MapPartitionsRDD[130578] at insertInto at evaluate_decay_factor.scala:165), which has no missing parents

我的代码如下所示:

val createTableSQL = s"""
            CREATE TABLE IF NOT EXISTS table_name (
                cookie              string,
                updated_score       float,  
                probability         float,
                date_last_score     string,
                target               int
            )
            PARTITIONED BY (partition_date string)
            STORED AS PARQUET
            TBLPROPERTIES ('PARQUET.COMPRESSION'='SNAPPY')
        """

spark.sql(createTableSQL)

result_df.write.mode(SaveMode.Overwrite).insertInto("table_name")

给定这样的Dataframe:

val result = Seq(
         (8, "123", 1.2, 0.5, "bat", "2019-04-04_9"),
         (64, "451", 3.2, -0.5, "mouse", "2019-04-04_12"),
         (-27, "613", 8.2, 1.5, "horse", "2019-04-04_10"),
         (-37, "513", 4.33, 2.5, "horse", "2019-04-04_11"),
         (45, "516", -3.3, 3.4, "bat", "2019-04-04_10"),
         (12, "781", 1.2, 5.5, "horse", "2019-04-04_11")

我想在配置单元命令行上运行:show partitions“table\u name”并获取:

partition_date=2019-04-04_9                                         
partition_date=2019-04-04_10                                         
partition_date=2019-04-04_11                                      
partition_date=2019-04-04_12

相反,我的输出是:

partition_date=0                                              
partition_date=1

在这个简单的例子中,它工作得很好,但是对于我的大Dataframe,我得到了前面的输出。

x6h2sr28

x6h2sr281#

要更改分区数,请使用 repartition(numOfPartitions) 要在编写时更改分区所依据的列,请使用 partitionBy("col") 一起使用的示例: final_df.repartition(40).write.partitionBy("txnDate").mode("append").parquet(destination) 两个有用的提示:
使重新分区大小等于工作内核的数量,以便进行最快的写入/重新分区。在这个例子中,我有10个执行器,每个执行器有4个核心(总共40个核心)。因此,我把它设为40。
当您向一个目的地写入数据时,除了子bucket之外,不要指定任何东西——让spark来处理索引。
好目的地: "s3a://prod/subbucket/" 错误的目的地: s"s3a://prod/subbucket/txndate=$txndate"

相关问题