使用spark2.x和不同的模式/数据类型联合两个Dataframe

ttcibm8c  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(259)

我正在尝试使用spark合并多个配置单元表,其中一些同名的列具有不同的数据类型,特别是string和bigint。
我的最后一张table( hiveDF )应该有如下模式-

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | bigint     |          |
| approvalstatus           | string     |          |
| capitalrate              | double     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | bigint     |          |
| divfeerate               | double     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | double     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | double     |          |
| qualifiedratepercent     | double     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | double     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | bigint     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

这是最后一张table( hiveDF )模式可以用下面的json-

{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}

我在做下面这样的事情-

var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")

我的增量表( newDataDF )有些列具有不同的数据类型。我有大约10个增量表 bigint 在其他表中也是字符串,所以不能确定是否进行了类型转换。typecast可能很容易,但是我不确定我应该使用哪种类型,因为有多个表。我正在寻找任何没有类型转换的方法。
例如,增量表如下所示-

+--------------------------+------------+----------+--+
|         col_name         | data_type  | comment  |
+--------------------------+------------+----------+--+
| announcementtype         | string     |          |
| approvalstatus           | string     |          |
| capitalrate              | string     |          |
| cash                     | double     |          |
| cashinlieuprice          | double     |          |
| costfactor               | double     |          |
| createdby                | string     |          |
| createddate              | string     |          |
| currencycode             | string     |          |
| declarationdate          | string     |          |
| declarationtype          | string     |          |
| divfeerate               | string     |          |
| divonlyrate              | double     |          |
| dividendtype             | string     |          |
| dividendtypeid           | bigint     |          |
| editedby                 | string     |          |
| editeddate               | string     |          |
| exdate                   | string     |          |
| filerecordid             | string     |          |
| frequency                | string     |          |
| grossdivrate             | double     |          |
| id                       | bigint     |          |
| indicatedannualdividend  | string     |          |
| longtermrate             | double     |          |
| netdivrate               | string     |          |
| newname                  | string     |          |
| newsymbol                | string     |          |
| note                     | string     |          |
| oldname                  | string     |          |
| oldsymbol                | string     |          |
| paydate                  | string     |          |
| productid                | bigint     |          |
| qualifiedratedollar      | string     |          |
| qualifiedratepercent     | string     |          |
| recorddate               | string     |          |
| sharefactor              | double     |          |
| shorttermrate            | double     |          |
| specialdivrate           | string     |          |
| splitfactor              | double     |          |
| taxstatuscodeid          | string     |          |
| lastmodifieddate         | timestamp  |          |
| active_status            | boolean    |          |
+--------------------------+------------+----------+--+

我在为下面的table做这个联合-

var combinedDF = hiveDF.unionAll(newDataDF)

但运气不好。我试图给出最后的模式如下,但没有运气-

val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")

根据这一点,我试着在下面-

var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)

最后我试着像上面那样写进表格,但是运气不好,请帮帮我-

v9tzhpje

v9tzhpje1#

在将增量表与现有表合并时,我也遇到了这种情况。一般有两种情况需要处理
1.带额外列的增量数据:
这可以通过您在这里尝试的正常合并过程来解决。
2.具有相同列名但不同架构的增量数据:
这是个棘手的问题。一个简单的解决方案是将bot数据转换为tojson并执行union hiveDF.toJSON.union(newDataDF.toJSON) . 但是,这将导致json模式合并并更改现有模式。例如:如果列 a:Long 在table上 a:String 在增量表中,合并后的最终模式将是:字符串。如果您想实现json联合,就无法避免这种情况。
另一种方法是对增量数据进行严格的模式检查。测试增量表是否与配置单元表具有相同的架构,如果架构不同,则不合并。
然而,对于实时数据来说,这有点过于严格,很难实施模式强制。
所以我解决这个问题的方法是在合并之前有一个单独的浓缩过程。该进程实际上会检查架构,如果传入列可以升级/降级到当前配置单元表架构,它就会这样做。
本质上,它遍历传入的增量,为每一行将其转换为正确的模式。这种方法虽然成本不高,但可以很好地保证数据的正确性。以防进程无法转换行。我将行置于一旁并发出警报,以便可以手动验证生成数据的上游系统中的任何错误。
这是我用来验证这两个模式是否可混合的代码。

相关问题