我有以下Dataframe:
root
|-- sents: array (nullable = false)
| |-- element: integer (containsNull = true)
|-- metadata: array (nullable = true)
| |-- element: float (containsNull = true)
+----------+---------------------+
|sents |metadata |
+----------+---------------------+
|[1, -1, 0]|[0.4991, 0.5378, 0.0]|
|[-1] |[0.6281] |
|[-1] |[0.463] |
+----------+---------------------+
我想动态地将每个数组项展开到它自己的列中,以便它可以如下所示:
+--------+--------+--------+-----------+-----------+-----------+
|sents[0]|sents[1]|sents[2]|metadata[0]|metadata[1]|metadata[2]|
+--------+--------+--------+-----------+-----------+-----------+
| 1| -1| 0| 0.4991| 0.5378| 0.0|
| -1| null| null| 0.6281| null| null|
| -1| null| null| 0.463| null| null|
+--------+--------+--------+-----------+-----------+-----------+
但在结构化流媒体中,动态执行操作有许多限制:
我尝试了以下方法:
numcol = df.withColumn('phrasesNum', F.size('sents')).agg(F.max('phrasesNum')).head()
df = df.select(*[F.col('sents')[i] for i in range(numcol[0])],*[F.col('metadata')[i] for i in range(numcol[0])])
也:
df_sizes = df.select(F.size('sents').alias('sents'))
df_max = df_sizes.agg(F.max('sents'))
nb_columns = df_max.collect()[0][0]
d = c.select(*[F.map_values(c['metadata'][i]).getItem(0).alias('confidenceIntervals'+"{}".format(j)).cast(DoubleType()) for i,j in enumerate(range(F.size('sents')))],
*[c['sents'][i].alias('phraseSents'+"{}".format(j)).cast(IntegerType()) for i,j in enumerate(range(nb_columns))])
但是我不能在结构化流中使用.head()、.collect()或.take()之类的东西来创建表示要动态创建的列数的数字变量。有什么想法吗??
谢谢大家
1条答案
按热度按时间a5g8bdjr1#
只有这样你才能做到
without collecting to driver
节点(首先,采取,收集等),是如果你知道columns you need
或者max size of each array column.
这里我假设两列都有max size of 3
,需要列0,1,2.
在流媒体中,Dataframe之间不能有不同的模式(列)。