我来自这篇文章:pyspark: count number of occurrences of distinct elements in lists,其中OP询问从数组列中获取不同项的计数。如果我事先已经知道词汇表,并希望计算出预设长度的向量,该怎么办?
假设我的词汇量是
vocab = ['A', 'B', 'C', 'D', 'E']
我的数据看起来像这样(从另一个帖子修改)
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}
data['date'] = pd.to_datetime(data['date'])
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")
df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))
最终这就是我想要的
+-------------------+-----------+---------------------+
| date| flat | counts |
+-------------------+-----------+---------------------+
|2014-01-01 00:00:00|A;A;B |[2, 1, 0, 0, 0] |
|2014-01-02 00:00:00|D;B;E;B;B |[0, 3, 0, 1, 1] |
|2014-01-03 00:00:00|B;A |[1, 1, 0, 0, 0] |
+-------------------+-----------+---------------------+
这里有一个工作解决方案,似乎效率低下,改编自前一篇文章的解决方案:
from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("distinct_items", F.array_distinct("list") \
.withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc+IF(t=x,1,0))))"""))\
.withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
.withColumn(
"counts",
F.array(
[
F.when(
F.col("count_map")
.getItem(v)
.isNull(),
0,
)
.otherwise(
F.col("count_map").getItem(v)
)
for v in vocab
]
).drop("occurrences", "distinct_items").show()
我可以这样做而不必创建Map,然后从Map创建数组吗?实际上,我需要在一个包含大量列的大表上执行此过程,因此我希望避免执行groupBy
、agg
类型的操作。
3条答案
按热度按时间2skhul331#
问得好你的直觉是完全正确的:在这种情况下可以避免混洗。
7kjnsjlb2#
非常有趣的问题。
虽然它可以在没有shuffle的情况下使用高阶函数,但我无法计算出比VocabularySize * flatSize更低的复杂度。
我想还是比shuffles好。
n8ghc7c13#
我有第一种方法的另一个变体,把整个vocab放进每个数组的前面。。我不知道相对的优点是什么