pyspark 从数组列内的列表中计数单词,而不调用shuffle

stszievb  于 2023-06-28  发布在  Spark
关注(0)|答案(3)|浏览(130)

我来自这篇文章:pyspark: count number of occurrences of distinct elements in lists,其中OP询问从数组列中获取不同项的计数。如果我事先已经知道词汇表,并希望计算出预设长度的向量,该怎么办?
假设我的词汇量是

vocab = ['A', 'B', 'C', 'D', 'E']

我的数据看起来像这样(从另一个帖子修改)

data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
     'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}

data['date'] = pd.to_datetime(data['date'])

data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "500g") \
    .appName('my-pandasToSparkDF-app') \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")

df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))

最终这就是我想要的

+-------------------+-----------+---------------------+
|               date| flat      | counts              |
+-------------------+-----------+---------------------+
|2014-01-01 00:00:00|A;A;B      |[2, 1, 0, 0, 0]      |
|2014-01-02 00:00:00|D;B;E;B;B  |[0, 3, 0, 1, 1]      |
|2014-01-03 00:00:00|B;A        |[1, 1, 0, 0, 0]      |
+-------------------+-----------+---------------------+

这里有一个工作解决方案,似乎效率低下,改编自前一篇文章的解决方案:

from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("distinct_items", F.array_distinct("list") \
  .withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc+IF(t=x,1,0))))"""))\
  .withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
  .withColumn(
      "counts",
      F.array(
          [
              F.when(
                  F.col("count_map")
                      .getItem(v)
                      .isNull(),
                      0,
                  )
                  .otherwise(
                      F.col("count_map").getItem(v)
                  )
              for v in vocab
           ]
      ).drop("occurrences", "distinct_items").show()

我可以这样做而不必创建Map,然后从Map创建数组吗?实际上,我需要在一个包含大量列的大表上执行此过程,因此我希望避免执行groupByagg类型的操作。

2skhul33

2skhul331#

问得好你的直觉是完全正确的:在这种情况下可以避免混洗。

from pyspark.sql import functions as F
vocab = ['A', 'B', 'C', 'D', 'E']
df = spark.createDataFrame([('A;A;B',), ('D;B;E;B;B',), ('B;A',),], ['flat'])

voc_arr = F.array([F.lit(x) for x in vocab])
df = df.withColumn('count', F.transform(voc_arr, lambda v: F.size(F.array_remove(F.transform(F.split('flat', ';'), lambda f: f == v), False))))

df.show()
# +---------+---------------+
# |     flat|          count|
# +---------+---------------+
# |    A;A;B|[2, 1, 0, 0, 0]|
# |D;B;E;B;B|[0, 3, 0, 1, 1]|
# |      B;A|[1, 1, 0, 0, 0]|
# +---------+---------------+
7kjnsjlb

7kjnsjlb2#

非常有趣的问题。
虽然它可以在没有shuffle的情况下使用高阶函数,但我无法计算出比VocabularySize * flatSize更低的复杂度。
我想还是比shuffles好。

vocabulary = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P",
              "Q", "R", "S", "U", "V", "W", "X", "Y", "Z"]
vocabulary_df = spark.createDataFrame(
    [
     [{k:0 for k in vocabulary}]
    ],
     ["vocab"]
)

df \
.crossJoin(vocabulary_df) \
.withColumn("count_distinct", aggregate(
    "flat",
    initValue="vocab",
    merge=lambda acc, flat_value: transform_values(
        acc,
        lambda vocab_key, vocab_value: when(
            flat_value == vocab_key,
            vocab_value + 1
        ).otherwise(vocab_value)
    )
)) \
.select("flat", "count_distinct") \
.show(truncate=0)

+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|flat           |count_distinct                                                                                                                                                                                          |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[A, A, B]      |{A -> 2, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[D, B, E, B, B]|{A -> 0, B -> 3, C -> 0, D -> 1, E -> 1, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[B, A]         |{A -> 1, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
n8ghc7c1

n8ghc7c13#

我有第一种方法的另一个变体,把整个vocab放进每个数组的前面。。我不知道相对的优点是什么

from pyspark.sql import functions as F

vocab_arr = F.array([F.lit(v) for v in vocab])
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
  .withColumn("list_", F.concat(vocab_arr, "list")) \
  .withColumn(
     "counts", 
     F.expr("""transform(list_, x-> aggregate(list_, -1,(acc,t)->acc+IF(t=x,1,0)))""")) \
  .withColumn("counts", F.slice("counts", 1, len(vocab))) \
  .drop("list_").show()

相关问题