随机打乱Spark RDD或 Dataframe 中的列

roejwanj  于 2022-12-19  发布在  Apache
关注(0)|答案(5)|浏览(342)

有没有办法让RDD或 Dataframe 的一列以随机顺序出现?我不确定可以使用哪些API来完成这样的任务。

db2dz4w8

db2dz4w81#

如果有人想找一个相当于Sascha Vetter的post的PySpark,你可以在下面找到它:

from pyspark.sql.functions import rand
from pyspark.sql import Row
from pyspark.sql.types import *

def add_index_to_row(row, index):
  print(index)
  row_dict = row.asDict()
  row_dict["index"] = index
  return Row(**row_dict)

def add_index_to_df(df):
  df_with_index = df.rdd.zipWithIndex().map(lambda x: add_index_to_row(x[0], x[1]))
  new_schema = StructType(df.schema.fields + [StructField("index", IntegerType(), True)])
  return spark.createDataFrame(df_with_index, new_schema)

def shuffle_single_column(df, column_name):
  df_cols = df.columns
  # select the desired column and shuffle it (i.e. order it by column with random numbers)
  shuffled_col = df.select(column_name).orderBy(F.rand())
  # add explicit index to the shuffled column
  shuffled_col_index = add_index_to_df(shuffled_col)
  # add explicit index to the original dataframe
  df_index = add_index_to_df(df)
  # drop the desired column from df, join it with the shuffled column on created index and finally drop the index column
  df_shuffled = df_index.drop(column_name).join(shuffled_col_index, "index").drop("index")
  # reorder columns so that the shuffled column comes back to its initial position instead of the last position
  df_shuffled = df_shuffled.select(df_cols)
  return df_shuffled

# initialize random array
z = np.random.randint(20, size=(10, 3)).tolist()
# create the pyspark dataframe
example_df = sc.parallelize(z).toDF(("a","b","c"))
# shuffle one column of the dataframe
example_df_shuffled = shuffle_single_column(df = example_df, column_name = "a")
hujrc8aj

hujrc8aj2#

那么选择要随机播放的列orderBy(rand)zip it by index to the existing dataframe呢?

import org.apache.spark.sql.functions.rand

def addIndex(df: DataFrame) = spark.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

case class Entry(name: String, salary: Double)

val r1 = Entry("Max", 2001.21)
val r2 = Entry("Zhang", 3111.32)
val r3 = Entry("Bob", 1919.21)
val r4 = Entry("Paul", 3001.5)

val df = addIndex(spark.createDataFrame(Seq(r1, r2, r3, r4)))
val df_shuffled = addIndex(df
  .select(col("salary").as("salary_shuffled"))
  .orderBy(rand))

df.join(df_shuffled, Seq("_index"))
  .drop("_index")
  .show(false) 

+-----+-------+---------------+
|name |salary |salary_shuffled|
+-----+-------+---------------+
|Max  |2001.21|3001.5         |
|Zhang|3111.32|3111.32        |
|Paul |3001.5 |2001.21        |
|Bob  |1919.21|1919.21        |
+-----+-------+---------------+
goqiplq2

goqiplq23#

如果不需要对数据进行全局洗牌,可以使用mapPartitions方法在分区内进行洗牌。

rdd.mapPartitions(Random.shuffle(_));

对于PairRDD(类型为RDD[(K, V)]的RDD),如果您对打乱键值Map(将任意键Map到任意值)感兴趣:

pairRDD.mapPartitions(iterator => {
  val (keySequence, valueSequence) = iterator.toSeq.unzip
  val shuffledValueSequence = Random.shuffle(valueSequence)
  keySequence.zip(shuffledValueSequence).toIterator
}, true)

结尾处的布尔标志表示对于该操作保留分区(不改变密钥),使得可以优化下游操作(例如reduceByKey)(避免混洗)。

g52tjvyc

g52tjvyc4#

虽然不能直接混洗单个列,但可以通过RandomRDDs置换RDD中的记录。https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/random/RandomRDDs.html
仅置换单列的一种可能方法是:

  • 使用mapPartitions对每个Worker任务执行一些设置/拆卸
  • 将所有记录吸入内存。例如iterator.toList。* 确保您有**许多(/小)*数据分区以避免OOME
  • 使用Row对象将除给定列之外的所有内容重写回原始内容
  • 在Map中分区创建内存中排序列表
  • 对于所需的列,将其值放到单独的集合中,并随机采样该集合以替换每条记录的条目
  • mapPartitions返回结果list.toIterator
w1jd8yoj

w1jd8yoj5#

你可以添加一个随机生成的列,然后根据这个随机生成的列对记录进行排序。这样,你就可以随机地对你指定的列进行洗牌。
通过这种方式,您不需要将所有数据都保存在内存中,这很容易导致OOM。Spark将在必要时通过溢出到磁盘来处理排序和内存限制问题。
如果不需要额外的列,可以在排序后将其删除。

相关问题