from pyspark.sql.functions import rand
from pyspark.sql import Row
from pyspark.sql.types import *
def add_index_to_row(row, index):
print(index)
row_dict = row.asDict()
row_dict["index"] = index
return Row(**row_dict)
def add_index_to_df(df):
df_with_index = df.rdd.zipWithIndex().map(lambda x: add_index_to_row(x[0], x[1]))
new_schema = StructType(df.schema.fields + [StructField("index", IntegerType(), True)])
return spark.createDataFrame(df_with_index, new_schema)
def shuffle_single_column(df, column_name):
df_cols = df.columns
# select the desired column and shuffle it (i.e. order it by column with random numbers)
shuffled_col = df.select(column_name).orderBy(F.rand())
# add explicit index to the shuffled column
shuffled_col_index = add_index_to_df(shuffled_col)
# add explicit index to the original dataframe
df_index = add_index_to_df(df)
# drop the desired column from df, join it with the shuffled column on created index and finally drop the index column
df_shuffled = df_index.drop(column_name).join(shuffled_col_index, "index").drop("index")
# reorder columns so that the shuffled column comes back to its initial position instead of the last position
df_shuffled = df_shuffled.select(df_cols)
return df_shuffled
# initialize random array
z = np.random.randint(20, size=(10, 3)).tolist()
# create the pyspark dataframe
example_df = sc.parallelize(z).toDF(("a","b","c"))
# shuffle one column of the dataframe
example_df_shuffled = shuffle_single_column(df = example_df, column_name = "a")
5条答案
按热度按时间db2dz4w81#
如果有人想找一个相当于Sascha Vetter的post的PySpark,你可以在下面找到它:
hujrc8aj2#
那么选择要随机播放的列
orderBy(rand)
和zip it by index to the existing dataframe呢?goqiplq23#
如果不需要对数据进行全局洗牌,可以使用
mapPartitions
方法在分区内进行洗牌。对于
PairRDD
(类型为RDD[(K, V)]
的RDD),如果您对打乱键值Map(将任意键Map到任意值)感兴趣:结尾处的布尔标志表示对于该操作保留分区(不改变密钥),使得可以优化下游操作(例如
reduceByKey
)(避免混洗)。g52tjvyc4#
虽然不能直接混洗单个列,但可以通过
RandomRDDs
置换RDD
中的记录。https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/random/RandomRDDs.html仅置换单列的一种可能方法是:
mapPartitions
对每个Worker任务执行一些设置/拆卸iterator.toList
。* 确保您有**许多(/小)*数据分区以避免OOMEmapPartitions
返回结果list.toIterator
w1jd8yoj5#
你可以添加一个随机生成的列,然后根据这个随机生成的列对记录进行排序。这样,你就可以随机地对你指定的列进行洗牌。
通过这种方式,您不需要将所有数据都保存在内存中,这很容易导致OOM。Spark将在必要时通过溢出到磁盘来处理排序和内存限制问题。
如果不需要额外的列,可以在排序后将其删除。