PySpark:从随机均匀分布创建 Dataframe

v7pvogib  于 2023-03-22  发布在  Spark
关注(0)|答案(3)|浏览(383)

我试图在Spark中使用随机均匀分布创建一个 Dataframe 。我找不到任何关于如何创建 Dataframe 的内容,但当我阅读文档时,我发现pyspark.mllib.random有一个RandomRDDs对象,该对象有一个uniformRDD方法,可以从随机均匀分布创建rdds。
但问题是它不能创建二维的rdds,有没有一种方法可以创建一个二维的rdd或者(最好是)dataframe?
我可以创建一些rdd并使用它们来创建一个 Dataframe ,但我使用的数据集有许多字段(100多个),创建100个rdd然后压缩它们似乎效率不高。

vhipe2zx

vhipe2zx1#

您可以生成统一的Vectors RDD并将其转换为DataFrame

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.random import RandomRDDs
data  = RandomRDDs.uniformVectorRDD(sc, 10,10) \ # numpy.ndarray are not supported.
                  .map(lambda a : DenseVector(a)) \
                  .map(lambda a : (a,)) \ 
                  .toDF(['features'])

data.show()
# +--------------------+
# |            features|
# +--------------------+
# |[0.97051622217872...|
# |[0.39165143210012...|
# |[0.70067295066813...|
# |[0.59568555130484...|
# |[0.16572531686478...|
# |[0.92494190257048...|
# |[0.43691499080129...|
# |[0.28320336307013...|
# |[0.85420768678698...|
# |[0.65923297006740...|
# +--------------------+

有关更多信息,您可以随时查看这里的官方文档。

编辑:(检查注解)

如果希望每个值都在单独的列中,则不需要将向量转换为DenseVector,而是转换为列表:

data  = RandomRDDs.uniformVectorRDD(sc, 10,10).map(lambda a : a.tolist()).toDF()
data1.show()
# +-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------------+-------------------+
# |                 _1|                 _2|                  _3|                 _4|                 _5|                 _6|                 _7|                  _8|                _9|                _10|
# +-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------------+-------------------+
# |0.14585743784778926|  0.803498096310468| 0.31605227000909253| 0.8119612820220813|0.41447836235778723| 0.2676439013488928| 0.8524783652359866|  0.5701076199786781|0.6693708605568874| 0.8111256775283068|
# | 0.1827511073189425| 0.3350517687462683|  0.7400032940623857| 0.7869460532004358| 0.6448914199353433| 0.9805601228284964|0.20020913675524243|  0.7922294214683878|0.9374972404332362| 0.6765087842364208|
# |0.38625776221583874|0.04229839224493681|  0.7734933051852422| 0.0274813429089541|  0.311445753826302|0.25698473390480325| 0.9437646814604557|  0.4741747733429049| 0.290710728473321|  0.677912271088622|
# | 0.7896873370148003| 0.1858840420861243|  0.3197437373418126|0.10097010041540833|0.10289933172316801| 0.5449368374946228| 0.4030450125686461| 0.21948568405399982|0.8930079107298496| 0.7519921983394425|
# |  0.815811790931526| 0.3634760983908547| 0.42601575700182837|0.13606388717010864| 0.5861222009300258| 0.3340860113942531| 0.2557956812340677| 0.43528056172400853|0.3922245296661778| 0.8912435252335149|
# |0.30392495415210397| 0.7925870450504611|  0.9030779298622288| 0.8727793109267047| 0.8158542803828924| 0.7931830841520005| 0.6282396202128951|  0.1420886768888291|0.8614276809589785|0.17436606175314684|
# | 0.9382134044434042| 0.6749506191750686|0.015443852959660331|0.12038319457909019|  0.417781126294975|0.07393488977646023|0.31885174813644857|   0.728226037613587|0.9952269580720621|0.07007086773721505|
# |0.13783951066912703| 0.7119354308993141| 0.42197923155036043|0.29716042608097326| 0.9738408655296322| 0.9868052613269893| 0.6935287164137466|0.037473358201903895|0.3495081198619411| 0.8435628173797828|
# | 0.1587632683889939| 0.7360623327266481| 0.42321853435929413| 0.9677124294019807|   0.63138909800576|0.09938015379429832| 0.5399110874035429|  0.7668582384258967|0.7925729040215128| 0.1764801807830343|
# | 0.2588173671258266| 0.5196258205360417| 0.47988935453823345| 0.6699354533063644| 0.8233338127383266| 0.8249394954169588|0.32268906006759734|  0.2768177979947253|0.9951067081655113| 0.5263299321371093|
# +-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+------------------+-------------------+
qxsslcnc

qxsslcnc2#

要生成具有n行和n列的随机 Dataframe ,可以使用以下函数

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

def generate_random_uniform_df(nrows, ncols, seed=1):
    df = spark.range(nrows).select(F.col("id"))
    df = df.select('*', *(F.rand(seed).alias("_"+str(target)) for target in range(ncols)))
    return df.drop("id")

以及

def generate_random_normal_df(nrows, ncols, seed=1):
    df = spark.range(nrows).select(F.col("id"))
    df = df.select('*', *(F.randn(seed).alias("_"+str(target)) for target in range(ncols)))
    return df.drop("id")

对于标准正态分布。然而,Eliasah建议的

def generate_random_uniform_df(nrows, ncols):
    df  = RandomRDDs.uniformVectorRDD(spark.sparkContext, nrows,ncols).map(lambda a : a.tolist()).toDF()
    return df

似乎要快得多。

swvgeqrz

swvgeqrz3#

下面的解决方案忽略了我在问题中提到的我自己的担忧
我可以创建一些rdd并使用它们来创建一个 Dataframe ,但我使用的数据集有许多字段(100多个),创建100个rdd然后压缩它们似乎效率不高。

def create_uniform_rdd(nrow, ncol, schema = None):
    random_rdd = RandomRDDs()
    rdds = []
    for each_col in range(ncol):
        rdds.append(random_rdd.uniformRDD(sc, nrow).collect())
    rdds = list(zip(*rdds))
    if schema is None:
        schema = StructType([StructField(str(i), FloatType(), False) for i in range(ncol)])
    df = sqlContext.createDataFrame(rdds, schema)
    return df

我必须处理zip位,因为Spark Dataframe 是面向行的。我可以在for循环中将ncolnrow交换,但由于行数远远大于列数

编辑

添加了eliasah的方法和我的方法的时间比较

def create_uniform_rdd_vector(nrow, ncol, schema = None):
    data  = RandomRDDs.uniformVectorRDD(sc, nrow,ncol).map(lambda a : a.tolist()).toDF()
    return data

def create_uniform_rdd(nrow, ncol, schema = None):
    random_rdd = RandomRDDs()
    rdds = []
    for each_col in range(ncol):
        rdds.append(random_rdd.uniformRDD(sc, nrow).collect())
    rdds = list(zip(*rdds))
    if schema is None:
        schema = StructType([StructField(str(i), FloatType(), False) for i in range(ncol)])
    df = sqlContext.createDataFrame(rdds, schema)
    return df

def timer_func(func, niter = 10):
    tic = time()
    for i in range(1,niter+1):
        nrow = i*1000
        ncol = i*10
        _ = func(nrow, ncol, schema = None)
    tac = time()
    return tac - tic

niter = 5
create_uniform_rdd_time = timer_func(create_uniform_rdd, niter) # 4.27 secs
create_uniform_rdd_vector_time = timer_func(create_uniform_rdd_vector, niter) # 1.31 secs

相关问题