我有一个很大的 Dataframe (几百万行)。
我希望能够对它执行groupby操作,但只是按任意连续(最好大小相等)的行子集进行分组,而不是使用单个行的任何特定属性来决定它们进入哪个组。
使用案例:我想通过IPython中的并行Map将一个函数应用于每一行。哪些行转到哪个后端引擎并不重要,因为该函数每次基于一行计算结果。(至少在概念上。实际上它是矢量化的。)
我想到了这样一个方法:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
但这似乎非常冗长,并不能保证大小相等的块。特别是如果索引是稀疏的或非整数的。
有什么更好的方法吗?
谢谢你,谢谢
8条答案
按热度按时间2vuwiymt1#
使用numpy的array_split():
slsn1g292#
我不确定这是否正是您想要的,但我发现another SO thread上的这些分组器函数对于创建多处理器池非常有用。
下面是该线程的一个简短示例,它可能会执行您想要的操作:
它给你这样的东西:
希望能帮上忙。
编辑
在本例中,我以(大致)以下方式在处理器池中使用此函数:
我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。
6qqygrtg3#
在实践中,你不能 * 保证 * 相同大小的块。行数(N)可能是质数,在这种情况下,您只能在1或N处获得相等大小的块。正因为如此,现实世界的分块通常使用固定的大小,并允许在最后使用较小的块。我倾向于传递一个数组给
groupby
。起始日期:我故意将索引设置为0,使其不提供信息,我们只需决定我们的大小(这里是10)并将数组除以它:
当索引与DataFrame不兼容时,基于切片DataFrame的方法可能会失败,尽管您始终可以使用
.iloc[a:b]
忽略索引值并按位置访问数据。yws3nbqq4#
Chunks 生成器函数,用于迭代pandas Dataframes和Series
块函数的生成器版本如下所示。此外,该版本还支持pd.DataFrame或pd.Series的自定义索引(例如:浮点型索引)
输出为
7xzttuei5#
良好环境的一个标志是有很多选择,因此我将从Anaconda Blaze添加此选项,实际上是使用Odo
vlurs2pr6#
ycggw6v27#
您建议使用
groupby
非常好,但您应该使用np.arange(len(dataframe)) // batch_size
而不是dataframe.index
,因为索引可以是非整数和非结果的。我对给出的答案进行了一些基准测试。得票最多的那个慢得可怕。请考虑使用可接受的解决方案:
基准代码:
wj8zmpe18#
另一种方法。