将RDD转换为 Dataframe ,但每行的RDD具有不同的字段集

s5a0g9ez  于 2022-10-07  发布在  Spark
关注(0)|答案(1)|浏览(171)

我有RDD:

[{'systemID': '617914',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '5',
  'shiftNumber': '167',
  'numberInShift': '6',
  'printedDate': '2022-10-03T15:38:09',
  'total': '990.0000',
  'IsEReceipt': 'false',
  'version': '1.05',
  'receiptKISExid': '023442935',
  'attribute_1': '1234567890',
  'attribute_2': '65e2b71b-c2de-4681-9cf1-29701f5ce6bb',
  'city_id': 'BC29FE50'},
 {'systemID': '617915',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '6',
  'shiftNumber': '167',
  'numberInShift': '7',
  'printedDate': '2022-10-03T16:48:35',
  'total': '640.0000',
  'IsEReceipt': 'false',
  'version': '1.05',
  'receiptKISExid': '0234434052',
  'attribute_3': '00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000',
  'city_id': 'BC29FE50'},
 {'systemID': '617916',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '7',
  'shiftNumber': '167',
  'numberInShift': '8',
  'printedDate': '2022-10-03T17:19:46',
  'total': '310.0000',
  'version': '1.05',
  'receiptKISExid': '0234435605',
  'attribute_1': '1234567890',
  'city_id': 'BC29FE50'}]

应将其转换为带字段的DataFrame:

system ID、typeID、axID、workerID、workerExID、Number、ShiftNumber、number InShift、printedDate、Total、IsEReceipt、Version、ReceiptKISExid、ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3、CITY_ID

但字段ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3不能为每行而存在。转换为DataFrame后,我只从前面提到的字段获取数据,例如,我得到ATTRIBUTE_1,但ATTRIBUTE_2ATTRIBUTE_3为空。

现在我使用:

ORDER_COL_NAMES = ['systemID','typeID','taxID','workerID','workerExID','number','shiftNumber','numberInShift','printedDate','total','IsEReceipt','version','receiptKISExid','attribute_1','attribute_2','attribute_3','city_id']

def set_schema():
        schema_list = []
            for c in ORDER_COL_NAMES:
                if c == 'total':
                    schema_list.append(StructField(c, FloatType(), True))
                elif c == 'print_date':
                    schema_list.append(StructField(c, DateType(), True))
                else:
                    schema_list.append(StructField(c, StringType(), True))

            return StructType(schema_list)

df_schema = set_schema()
order_df = orders.toDF(df_schema)

提前谢谢您!

zujrkrfu

zujrkrfu1#

我希望下面的代码将解决您显式处理模式的问题

data = [{'systemID': '617914',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '5',
  'shiftNumber': '167',
  'numberInShift': '6',
  'printedDate': '2022-10-03T15:38:09',
  'total': '990.0000',
  'IsEReceipt': 'false',
  'version': '1.05',
  'receiptKISExid': '023442935',
  'attribute_1': '1234567890',
  'attribute_2': '65e2b71b-c2de-4681-9cf1-29701f5ce6bb',
  'city_id': 'BC29FE50'},
 {'systemID': '617915',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '6',
  'shiftNumber': '167',
  'numberInShift': '7',
  'printedDate': '2022-10-03T16:48:35',
  'total': '640.0000',
  'IsEReceipt': 'false',
  'version': '1.05',
  'receiptKISExid': '0234434052',
  'attribute_3': '00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000',
  'city_id': 'BC29FE50'},
 {'systemID': '617916',
  'typeID': '1',
  'taxID': '1',
  'workerID': '1011778',
  'workerExID': '70000111',
  'number': '7',
  'shiftNumber': '167',
  'numberInShift': '8',
  'printedDate': '2022-10-03T17:19:46',
  'total': '310.0000',
  'version': '1.05',
  'receiptKISExid': '0234435605',
  'attribute_1': '1234567890',
  'city_id': 'BC29FE50'}]

df  = spark.read.option("multiline", "true").json(sc.parallelize(data))
df.show(truncate=False)
+----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+
|IsEReceipt|attribute_1|attribute_2                         |attribute_3                                                                                                     |city_id |number|numberInShift|printedDate        |receiptKISExid|shiftNumber|systemID|taxID|total   |typeID|version|workerExID|workerID|
+----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+
|false     |1234567890 |65e2b71b-c2de-4681-9cf1-29701f5ce6bb|null                                                                                                            |BC29FE50|5     |6            |2022-10-03T15:38:09|023442935     |167        |617914  |1    |990.0000|1     |1.05   |70000111  |1011778 |
|false     |null       |null                                |00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000|BC29FE50|6     |7            |2022-10-03T16:48:35|0234434052    |167        |617915  |1    |640.0000|1     |1.05   |70000111  |1011778 |
|null      |1234567890 |null                                |null                                                                                                            |BC29FE50|7     |8            |2022-10-03T17:19:46|0234435605    |167        |617916  |1    |310.0000|1     |1.05   |70000111  |1011778 |
+----------+-----------+------------------------------------+----------------------------------------------------------------------------------------------------------------+--------+------+-------------+-------------------+--------------+-----------+--------+-----+--------+------+-------+----------+--------+

相关问题