apachespark作业优化

gk7wooem  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(193)

我有很多新行分隔的json,存储在google云存储上。这些json文件的一个对象如下所示:

{"col1":"val1", "col2":"val2", "col3":"val3", "values": { "id1_col1": "aaa","id1_col2": "bbb", "id2_col1": "ccc", "id2_col2": "ddd", "id3_col1": "eee", "id3_col2": "fff"} }

下划线后面的“col”部分可以是任何内容。我可以得到“id1_zfuhu”或“id3_ue8az”。两个ID之间的列名之间没有链接。
我的目标是在bigquery中创建3个表,找到的每个id对应1个表,其模式如下:

table id1: col1, col2, col3, id1_col1, id1_col2
table id2: col1, col2, col3, id2_col1, id2_col2
table id3: col1, col2, col3, id3_col1, id3_col2

如果我举上面的例子,它给出了以下几行:

table id1: val1, val2, val3, aaa, bbb
table id2: val1, val2, val3, ccc, ddd
table id3: val1, val2, val3, eee, fff

我确切地知道哪些“固定”列(col1、col2和col3)将出现在这些文件中。但是,我不知道每行上会出现哪些id,这些文件中有多少不同的id,每个id有多少列。所以在一行,我可以找到一个“值”dict,它看起来像这样:

{ "id1_col1": "zzz", "id3_col2": "ccc"}

或者有时像这样:

{"id5_col8": "abg", "id11_col4: "klm"}

所以我没有关于这些文件包含多少id,它们有多少列,或者哪一行包含任何id中的一列的信息。
我有一个解决这个问题的办法,但我想改进它。目前大约需要1h50,对于50gb的json文件(解压后),集群使用12核和70gb内存。
我开始将数据从google存储加载到Dataframe中:

data = spark.read.format("json").load(source)

然后我将数据展平,用其中的每个键替换“值”:

flat_data = data.select("*","values.*").drop("values").persist(StorageLevel.MEMORY_AND_DISK)

然后我阅读了这个模式,试图找出这些文件中有哪些ID,以及每个ID的每一列。

schema = [s.name for s in flat_data.schema]
columns_list = detect_column(schema)
flat_data.createOrReplaceTempView("full_data")

列列表是一个dict。在上面的示例中,它将如下所示:

{"id1": ["id1_col1", "id1_col2"], "id2": ["id2_col1", "id2_col2"], "id3": ["id3_col1", "id3_col2"]}

然后我运行一个循环,为找到的每个id生成一个查询。对于id1,查询将如下所示:

select col1, col2, col3, id1_col1, id2_col2 from full_data where id1_col1 is not null or id1_col2 is not null

我对每个id运行一个查询,然后使用spark到bigquery连接器将结果加载到bigquery中的一个表中:

tmp_df = spark.sql(query)
tmp_df.write.format('bigquery').option("temporaryGcsBucket","bucketname").option('table', '{0}.{1}'.format(dataset_name, table_name)).mode("append").save()

我的第一个直觉是,为每个id运行一个查询是浪费时间,因为我必须为每个id分析整个Dataframe。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题