spark/hive-将数据分组为“透视表”格式

20jt8wwn  于 2021-06-26  发布在  Hive
关注(0)|答案(2)|浏览(358)

我有一组非常烦人的文件,结构如下:

userId string,
eventType string,
source string,
errorCode string,
startDate timestamp,
endDate timestamp

每个文件可能包含每个eventid的任意数量的记录,具有不同的事件类型和源,每个文件的代码和开始/结束日期也不同。
在hive或spark中,有没有一种方法可以将所有这些内容在userid上分组,有点像键值,其中的值是与userid相关联的所有字段的列表?具体来说,我希望它是由事件类型和源键。基本上我想用table的长度来交换宽度,有点像一张透视表。我的目标是最终存储为apacheparquet或avro文件格式,以便将来进行更快速的分析。
举个例子:
源数据:

userId, eventType, source, errorCode, startDate, endDate
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'

目标:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'

字段名或顺序无关紧要,只要我能分辨出来。
我已经尝试了两种方法来实现这一点:
从表中手动选择每个组合并连接到主数据集。这工作得很好,并行性也很好,但是不允许为键字段提供任意数量的值,并且需要预定义模式。
使用spark创建key:value records 其中每个值都是一个字典。基本上,循环遍历数据集,如果字典不存在,则向其中添加一个新键;对于该条目,如果值字典不存在,则向其添加一个新字段。它工作得很漂亮,但是速度非常慢,而且并行性也不好。我也不确定这是否是一个avro/Parquet兼容的格式。
除了这两种方法,还有别的选择吗?或者比我的目标更好的结构?

mwngjboj

mwngjboj1#

你能试试这个并给出你的意见吗,

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql import functions as F
>>> from pyspark.sql.types import *

>>> spark = SparkSession.builder.getOrCreate()

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')]

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate'])
>>> df.show(10,False)
+------+---------+--------+---------+-----------------------+-----------------------+
|userId|eventType|source  |errorCode|startDate              |endDate                |
+------+---------+--------+---------+-----------------------+-----------------------+
|552113|ACK      |PROVIDER|0        |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|
|284723|ACK      |PROVIDER|0        |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|
|552113|TRADE    |MERCH   |0        |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|
|552113|CHARGE   |MERCH   |0        |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|
|284723|REFUND   |MERCH   |1        |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|CLOSE    |PROVIDER|0        |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|
|284723|CLOSE    |PROVIDER|0        |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|
+------+---------+--------+---------+-----------------------+-----------------------+

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list'))

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output

>>> def f(Vals):
        aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above
        if len(aggVals) == 5:
            return aggVals
        else:
            missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null
            for idx,val in missngval:
                aggVals.insert(idx,[None]*5)
            return aggVals

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType())))
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values'))

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array]

>>> oldnames = df4.columns
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch']

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns
>>> finalDF.show()    
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider   |endDateAckProvider     |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch    |endDateTradeMerch      |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch   |endDateChargeMerch     |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider   |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch   |endDateRefundMerch     |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
|284723|ACK                 |PROVIDER         |0                   |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null               |null            |null               |null                   |null                   |null                |null             |null                |null                   |null                   |CLOSE                 |PROVIDER           |0                     |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND              |MERCH            |1                   |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947|
|552113|ACK                 |PROVIDER         |0                   |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE              |MERCH           |0                  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE              |MERCH            |0                   |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE                 |PROVIDER           |0                     |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null                |null             |null                |null                   |null                   |
+------+--------------------+-----------------+--------------------+-----------------------+-----------------------+-------------------+----------------+-------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+----------------------+-------------------+----------------------+-----------------------+-----------------------+--------------------+-----------------+--------------------+-----------------------+-----------------------+
jckbn6z7

jckbn6z72#

你想要这样的吗?

from pyspark.sql.functions import struct, col, create_map, collect_list

df = sc.parallelize([
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'],
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'],
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'],
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'],
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'],
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'],
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777']
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate'))
df.show()

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')]))

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail'))
new_df.show()

相关问题