我正在尝试使用spark合并多个配置单元表,其中一些同名的列具有不同的数据类型,特别是string和bigint。
我的最后一张table( hiveDF
)应该有如下模式-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | bigint | |
| approvalstatus | string | |
| capitalrate | double | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | bigint | |
| divfeerate | double | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | double | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | double | |
| qualifiedratepercent | double | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | double | |
| splitfactor | double | |
| taxstatuscodeid | bigint | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
这是最后一张table( hiveDF
)模式可以用下面的json-
{
"id": -2147483647,
"productId": 150816,
"dividendTypeId": 2,
"dividendType": "Dividend/Capital Gain",
"payDate": null,
"exDate": "2009-03-25",
"oldSymbol": "ILAAX",
"newSymbol": "ILAAX",
"oldName": "",
"newName": "",
"grossDivRate": 0.115,
"shortTermRate": 0,
"longTermRate": 0,
"splitFactor": 0,
"shareFactor": 0,
"costFactor": 0,
"cashInLieuPrice": 0,
"cash": 0,
"note": "0",
"createdBy": "Yahoo",
"createdDate": "2009-08-03T06:44:19.677-05:00",
"editedBy": "Yahoo",
"editedDate": "2009-08-03T06:44:19.677-05:00",
"netDivRate": null,
"divFeeRate": null,
"specialDivRate": null,
"approvalStatus": null,
"capitalRate": null,
"qualifiedRateDollar": null,
"qualifiedRatePercent": null,
"declarationDate": null,
"declarationType": null,
"currencyCode": null,
"taxStatusCodeId": null,
"announcementType": null,
"frequency": null,
"recordDate": null,
"divOnlyRate": 0.115,
"fileRecordID": null,
"indicatedAnnualDividend": null
}
我在做下面这样的事情-
var hiveDF = spark.sqlContext.sql("select * from final_destination_tableName")
var newDataDF = spark.sqlContext.sql("select * from incremental_table_1 where id > 866000")
我的增量表( newDataDF
)有些列具有不同的数据类型。我有大约10个增量表 bigint
在其他表中也是字符串,所以不能确定是否进行了类型转换。typecast可能很容易,但是我不确定我应该使用哪种类型,因为有多个表。我正在寻找任何没有类型转换的方法。
例如,增量表如下所示-
+--------------------------+------------+----------+--+
| col_name | data_type | comment |
+--------------------------+------------+----------+--+
| announcementtype | string | |
| approvalstatus | string | |
| capitalrate | string | |
| cash | double | |
| cashinlieuprice | double | |
| costfactor | double | |
| createdby | string | |
| createddate | string | |
| currencycode | string | |
| declarationdate | string | |
| declarationtype | string | |
| divfeerate | string | |
| divonlyrate | double | |
| dividendtype | string | |
| dividendtypeid | bigint | |
| editedby | string | |
| editeddate | string | |
| exdate | string | |
| filerecordid | string | |
| frequency | string | |
| grossdivrate | double | |
| id | bigint | |
| indicatedannualdividend | string | |
| longtermrate | double | |
| netdivrate | string | |
| newname | string | |
| newsymbol | string | |
| note | string | |
| oldname | string | |
| oldsymbol | string | |
| paydate | string | |
| productid | bigint | |
| qualifiedratedollar | string | |
| qualifiedratepercent | string | |
| recorddate | string | |
| sharefactor | double | |
| shorttermrate | double | |
| specialdivrate | string | |
| splitfactor | double | |
| taxstatuscodeid | string | |
| lastmodifieddate | timestamp | |
| active_status | boolean | |
+--------------------------+------------+----------+--+
我在为下面的table做这个联合-
var combinedDF = hiveDF.unionAll(newDataDF)
但运气不好。我试图给出最后的模式如下,但没有运气-
val rows = newDataDF.rdd
val newDataDF2 = spark.sqlContext.createDataFrame(rows, hiveDF.schema)
var combinedDF = hiveDF.unionAll(newDataDF2)
combinedDF.coalesce(1).write.mode(SaveMode.Overwrite).option("orc.compress", "snappy").orc("/apps/hive/warehouse/" + database + "/" + tableLower + "_temp")
根据这一点,我试着在下面-
var combinedDF = sparkSession.read.json(hiveDF.toJSON.union(newDataDF.toJSON).rdd)
最后我试着像上面那样写进表格,但是运气不好,请帮帮我-
1条答案
按热度按时间v9tzhpje1#
在将增量表与现有表合并时,我也遇到了这种情况。一般有两种情况需要处理
1.带额外列的增量数据:
这可以通过您在这里尝试的正常合并过程来解决。
2.具有相同列名但不同架构的增量数据:
这是个棘手的问题。一个简单的解决方案是将bot数据转换为tojson并执行union
hiveDF.toJSON.union(newDataDF.toJSON)
. 但是,这将导致json模式合并并更改现有模式。例如:如果列a:Long
在table上a:String
在增量表中,合并后的最终模式将是:字符串。如果您想实现json联合,就无法避免这种情况。另一种方法是对增量数据进行严格的模式检查。测试增量表是否与配置单元表具有相同的架构,如果架构不同,则不合并。
然而,对于实时数据来说,这有点过于严格,很难实施模式强制。
所以我解决这个问题的方法是在合并之前有一个单独的浓缩过程。该进程实际上会检查架构,如果传入列可以升级/降级到当前配置单元表架构,它就会这样做。
本质上,它遍历传入的增量,为每一行将其转换为正确的模式。这种方法虽然成本不高,但可以很好地保证数据的正确性。以防进程无法转换行。我将行置于一旁并发出警报,以便可以手动验证生成数据的上游系统中的任何错误。
这是我用来验证这两个模式是否可混合的代码。