正如标题所述,我正试图从spark表中生成一个字典,用作广播变量,但作为这方面的新手,我正在努力找出正确的方法。
我想要的词典结构是这样的。主要目的是创建一个“查找”表,我可以用它在一组数据中查找最近的匹配项,并返回该列的父级。
{
'1603756800': {
'B000GIPJY8': [
{
'Rank': 43,
'ParentAsin':'B08LX475MX'
},
{
'Rank': 146,
'ParentAsin':'B08LXSRY8R'
},
{
'Rank': 1320,
'ParentAsin':'B08LL7LDYN'
},
{
'Rank': 2471,
'ParentAsin':'B08LLC2TWN'
},
{
'Rank': 4058,
'ParentAsin':'B08LX4D9CR'
}
]
}},
{
'1603756800': {
'B000GIQSV6': [
{
'Rank': 37,
'ParentAsin':'B08LKB3H7L'
},
{
'Rank': 1320,
'ParentAsin':'B08LLBCDT8'
}
]
}}
广播数据框
+----------+-------+----------+----------+
| tmp_Asin|US_Rank|ParentAsin| Epoch|
+----------+-------+----------+----------+
|B000GIQSV6| 38.0|B08LKB3H7L|1603324800|
|B000GIQSV6|32841.0|B08LLBCDT8|1603324800|
|B000GIQSV6| 37.0|B08LKB3H7L|1603324800|
|B000GIPJY8|30153.0|B08LLBCDT8|1603324800|
|B000GIPJY8| 37.0|B08LKB3H7L|1603324800|
|B000GIQSV6| 2735.0|B08LLC2TWN|1603324800|
|B000GIPJY8| 38.0|B08LKB3H7L|1603324800|
|B000GIQSV6|30153.0|B08LLBCDT8|1603324800|
|B000GIPJY8| 2735.0|B08LLC2TWN|1603324800|
|B000GIPJY8|32841.0|B08LLBCDT8|1603324800|
+----------+-------+----------+----------+
把几个答案拼凑在一起,我就想到了这个。
def findNearest(asin, us_Rank, epoch):
if epoch in list(broadcastVar.value.keys()):
asins = broadcastVar.value[epoch]
if asin in asins.keys():
variations = broadcastVar.value[epoch][asin]
lowest = 0
parent = ''
for variation in variations:
vRank = variation['Rank']
vParent = variation['ParentAsin']
diff = abs((vRank - us_Rank))
if lowest == 0 or lowest > diff:
lowest = diff
parent = vParent
return parent
else:
return ''
combineMap = F.udf(lambda maps:{key:f[key] for f in maps for key in f},
T.MapType( T.DoubleType(), T.StringType()))
combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
T.MapType(T.StringType(), T.MapType(T.DoubleType(), T.StringType())))
mapdf = broadcastDf.filter("tmp_Asin == 'B000GIPJY8' OR tmp_Asin == 'B000GIQSV6' ").groupBy('Epoch', 'tmp_Asin') \
.agg(F.collect_list(F.create_map('US_Rank','ParentAsin')).alias('maps')) \
.agg(combineDeepMap(F.collect_list(F.create_map('Epoch', combineMap('maps')))))
result_dict = mapdf.collect()[0][0]
broadcastVar = sc.broadcast(result_dict)
print(broadcastVar.value)
{'1603065600': {
161.0: 'B08L65HM35', 40.0: 'B08L6CRR2S', 41.0: 'B08L6CRR2S', 45.0: 'B08L6CRR2S'},
'1603497600': {
40.0: 'B08LKB3H7L', 167.0: 'B08LLBCDT8', 42.0: 'B08LKB3H7L', 2725.0: 'B08LLC2TWN', 45.0: 'B08LKB3H7L'}}
udf_FindNearest = F.udf(findNearest, T.StringType())
toLookUp = asinscanpy.withColumn("Epoch", F.unix_timestamp(F.col("ScanDate")))\
.select("Asin", "US_Rank", "ParentAsin", "NoBB", "ScanDate", "Timestamp", "Epoch").where(F.col("ParentAsin").isNull()).where(F.col("Asin") == 'B000GIPJY8').where(F.col("ScanDate") >= '2020-10-01').where(F.col("US_Rank") > 0.0)\
.withColumn("ParentAsin", udf_FindNearest(F.col("Asin"), F.col("US_Rank"), F.col("Epoch").cast(T.StringType()))).show()
很接近,但我现在被困在如何将我的tmpèasin列纳入混合。任何帮助都将不胜感激!
1条答案
按热度按时间2nc8po8w1#
这就是我的解决方案