我有很多新行分隔的json,存储在google云存储上。这些json文件的一个对象如下所示:
{"col1":"val1", "col2":"val2", "col3":"val3", "values": { "id1_col1": "aaa","id1_col2": "bbb", "id2_col1": "ccc", "id2_col2": "ddd", "id3_col1": "eee", "id3_col2": "fff"} }
下划线后面的“col”部分可以是任何内容。我可以得到“id1_zfuhu”或“id3_ue8az”。两个ID之间的列名之间没有链接。
我的目标是在bigquery中创建3个表,找到的每个id对应1个表,其模式如下:
table id1: col1, col2, col3, id1_col1, id1_col2
table id2: col1, col2, col3, id2_col1, id2_col2
table id3: col1, col2, col3, id3_col1, id3_col2
如果我举上面的例子,它给出了以下几行:
table id1: val1, val2, val3, aaa, bbb
table id2: val1, val2, val3, ccc, ddd
table id3: val1, val2, val3, eee, fff
我确切地知道哪些“固定”列(col1、col2和col3)将出现在这些文件中。但是,我不知道每行上会出现哪些id,这些文件中有多少不同的id,每个id有多少列。所以在一行,我可以找到一个“值”dict,它看起来像这样:
{ "id1_col1": "zzz", "id3_col2": "ccc"}
或者有时像这样:
{"id5_col8": "abg", "id11_col4: "klm"}
所以我没有关于这些文件包含多少id,它们有多少列,或者哪一行包含任何id中的一列的信息。
我有一个解决这个问题的办法,但我想改进它。目前大约需要1h50,对于50gb的json文件(解压后),集群使用12核和70gb内存。
我开始将数据从google存储加载到Dataframe中:
data = spark.read.format("json").load(source)
然后我将数据展平,用其中的每个键替换“值”:
flat_data = data.select("*","values.*").drop("values").persist(StorageLevel.MEMORY_AND_DISK)
然后我阅读了这个模式,试图找出这些文件中有哪些ID,以及每个ID的每一列。
schema = [s.name for s in flat_data.schema]
columns_list = detect_column(schema)
flat_data.createOrReplaceTempView("full_data")
列列表是一个dict。在上面的示例中,它将如下所示:
{"id1": ["id1_col1", "id1_col2"], "id2": ["id2_col1", "id2_col2"], "id3": ["id3_col1", "id3_col2"]}
然后我运行一个循环,为找到的每个id生成一个查询。对于id1,查询将如下所示:
select col1, col2, col3, id1_col1, id2_col2 from full_data where id1_col1 is not null or id1_col2 is not null
我对每个id运行一个查询,然后使用spark到bigquery连接器将结果加载到bigquery中的一个表中:
tmp_df = spark.sql(query)
tmp_df.write.format('bigquery').option("temporaryGcsBucket","bucketname").option('table', '{0}.{1}'.format(dataset_name, table_name)).mode("append").save()
我的第一个直觉是,为每个id运行一个查询是浪费时间,因为我必须为每个id分析整个Dataframe。
暂无答案!
目前还没有任何答案,快来回答吧!