从spark表创建字典

eiee3dmh  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(518)

正如标题所述,我正试图从spark表中生成一个字典,用作广播变量,但作为这方面的新手,我正在努力找出正确的方法。
我想要的词典结构是这样的。主要目的是创建一个“查找”表,我可以用它在一组数据中查找最近的匹配项,并返回该列的父级。

{
  '1603756800': {
    'B000GIPJY8': [
      {
        'Rank': 43,
        'ParentAsin':'B08LX475MX'
      },      
      {
        'Rank': 146,
        'ParentAsin':'B08LXSRY8R'
      },
      {
        'Rank': 1320,
        'ParentAsin':'B08LL7LDYN'
      },
      {
        'Rank': 2471,
        'ParentAsin':'B08LLC2TWN'
      },
      {
        'Rank': 4058,
        'ParentAsin':'B08LX4D9CR'
      }
    ]
 }},
{
  '1603756800': {
    'B000GIQSV6': [
      {
        'Rank': 37,
        'ParentAsin':'B08LKB3H7L'
      },      
      {
        'Rank': 1320,
        'ParentAsin':'B08LLBCDT8'
      }
    ]
 }}

广播数据框

+----------+-------+----------+----------+
|  tmp_Asin|US_Rank|ParentAsin|     Epoch|
+----------+-------+----------+----------+
|B000GIQSV6|   38.0|B08LKB3H7L|1603324800|
|B000GIQSV6|32841.0|B08LLBCDT8|1603324800|
|B000GIQSV6|   37.0|B08LKB3H7L|1603324800|
|B000GIPJY8|30153.0|B08LLBCDT8|1603324800|
|B000GIPJY8|   37.0|B08LKB3H7L|1603324800|
|B000GIQSV6| 2735.0|B08LLC2TWN|1603324800|
|B000GIPJY8|   38.0|B08LKB3H7L|1603324800|
|B000GIQSV6|30153.0|B08LLBCDT8|1603324800|
|B000GIPJY8| 2735.0|B08LLC2TWN|1603324800|
|B000GIPJY8|32841.0|B08LLBCDT8|1603324800|
+----------+-------+----------+----------+

把几个答案拼凑在一起,我就想到了这个。

def findNearest(asin, us_Rank, epoch):
  if epoch in list(broadcastVar.value.keys()):
    asins = broadcastVar.value[epoch]
    if asin in asins.keys():
      variations = broadcastVar.value[epoch][asin]
      lowest = 0
      parent = ''

      for variation in variations:
        vRank = variation['Rank']
        vParent = variation['ParentAsin']

        diff = abs((vRank - us_Rank))

        if lowest == 0 or lowest > diff:
          lowest = diff
          parent = vParent

      return parent
  else:
    return ''

combineMap = F.udf(lambda maps:{key:f[key] for f in maps for key in f},
             T.MapType( T.DoubleType(), T.StringType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
              T.MapType(T.StringType(), T.MapType(T.DoubleType(), T.StringType())))

mapdf = broadcastDf.filter("tmp_Asin == 'B000GIPJY8' OR tmp_Asin == 'B000GIQSV6' ").groupBy('Epoch', 'tmp_Asin') \
                   .agg(F.collect_list(F.create_map('US_Rank','ParentAsin')).alias('maps')) \
                   .agg(combineDeepMap(F.collect_list(F.create_map('Epoch', combineMap('maps')))))

result_dict = mapdf.collect()[0][0]

broadcastVar = sc.broadcast(result_dict)
print(broadcastVar.value)
{'1603065600': {
161.0: 'B08L65HM35', 40.0: 'B08L6CRR2S', 41.0: 'B08L6CRR2S', 45.0: 'B08L6CRR2S'},
 '1603497600': {
40.0: 'B08LKB3H7L', 167.0: 'B08LLBCDT8', 42.0: 'B08LKB3H7L', 2725.0: 'B08LLC2TWN', 45.0: 'B08LKB3H7L'}}
udf_FindNearest = F.udf(findNearest, T.StringType())

toLookUp = asinscanpy.withColumn("Epoch", F.unix_timestamp(F.col("ScanDate")))\
                      .select("Asin", "US_Rank", "ParentAsin", "NoBB", "ScanDate", "Timestamp", "Epoch").where(F.col("ParentAsin").isNull()).where(F.col("Asin") == 'B000GIPJY8').where(F.col("ScanDate") >= '2020-10-01').where(F.col("US_Rank") > 0.0)\
                      .withColumn("ParentAsin", udf_FindNearest(F.col("Asin"), F.col("US_Rank"), F.col("Epoch").cast(T.StringType()))).show()

很接近,但我现在被困在如何将我的tmpèasin列纳入混合。任何帮助都将不胜感激!

2nc8po8w

2nc8po8w1#

这就是我的解决方案

def findNearest(asin, us_Rank, epoch):
  if epoch in list(broadcastVar.value.keys()):
    asins = broadcastVar.value[epoch]
    if asin in asins.keys():
      variations = broadcastVar.value[epoch][asin]
      lowest = 0
      parent = ''

      for key in variations:
        vRank = key
        vPAsin = variations[key]

        diff = abs((vRank - us_Rank))

        if lowest == 0 or lowest > diff:
          lowest = diff
          parent = vPAsin

      return parent
  else:
    return ''

combineMap = F.udf(lambda maps:{key:f[key] for f in maps for key in f},
             T.MapType( T.DoubleType(), T.StringType()))

combineDeepMap = udf(lambda maps: {key:f[key] for f in maps for key in f},
              T.MapType(T.StringType(), T.MapType(T.StringType(), T.MapType(T.DoubleType(), T.StringType()))))

mapdf = broadcastDf.filter("ScanDate >= '2020-10-01'").groupBy('Epoch','tmp_Asin') \
                     .agg(F.collect_list(F.create_map('US_Rank','ParentAsin')).alias('maps'))\
                     .agg(combineDeepMap(F.collect_list(F.create_map(F.col('Epoch').cast(T.StringType()), F.create_map('tmp_Asin', combineMap('maps'))) ))).alias('Dict')

result_dict = mapdf.collect()[0][0]
broadcastVar = sc.broadcast(result_dict)

udf_FindNearest = F.udf(findNearest, T.StringType())
toLookUp = asinscanpy.withColumn("Epoch", F.unix_timestamp(F.col("ScanDate")))\
                     .select("Asin", "US_Rank", "ParentAsin", "NoBB", "ScanDate", "Timestamp", "Epoch").where(F.col("ParentAsin").isNull()).where(F.col("ScanDate") >= '2020-10-01').where(F.col("US_Rank") > 0.0)\
                     .withColumn("ParentAsin", udf_FindNearest(F.col("Asin"), F.col("US_Rank"), F.col("Epoch").cast(T.StringType())))

相关问题