如何高效地覆盖Pandas的连续块

uidvcgyl  于 2023-09-29  发布在  其他
关注(0)|答案(8)|浏览(122)

我有一个很大的 Dataframe (几百万行)。
我希望能够对它执行groupby操作,但只是按任意连续(最好大小相等)的行子集进行分组,而不是使用单个行的任何特定属性来决定它们进入哪个组。
使用案例:我想通过IPython中的并行Map将一个函数应用于每一行。哪些行转到哪个后端引擎并不重要,因为该函数每次基于一行计算结果。(至少在概念上。实际上它是矢量化的。)
我想到了这样一个方法:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)

# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]

# Process chunks in parallel
results = dview.map_sync(my_function, groups)

但这似乎非常冗长,并不能保证大小相等的块。特别是如果索引是稀疏的或非整数的。
有什么更好的方法吗?
谢谢你,谢谢

2vuwiymt

2vuwiymt1#

使用numpy的array_split()

import numpy as np
import pandas as pd

data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
  assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
slsn1g29

slsn1g292#

我不确定这是否正是您想要的,但我发现another SO thread上的这些分组器函数对于创建多处理器池非常有用。
下面是该线程的一个简短示例,它可能会执行您想要的操作:

import numpy as np
import pandas as pds

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))

for i in chunker(df,5):
    print i

它给你这样的东西:

a         b         c         d
0  0.860574  0.059326  0.339192  0.786399
1  0.029196  0.395613  0.524240  0.380265
2  0.235759  0.164282  0.350042  0.877004
3  0.545394  0.881960  0.994079  0.721279
4  0.584504  0.648308  0.655147  0.511390
          a         b         c         d
5  0.276160  0.982803  0.451825  0.845363
6  0.728453  0.246870  0.515770  0.343479
7  0.971947  0.278430  0.006910  0.888512
8  0.044888  0.875791  0.842361  0.890675
9  0.200563  0.246080  0.333202  0.574488
           a         b         c         d
10  0.971125  0.106790  0.274001  0.960579
11  0.722224  0.575325  0.465267  0.258976
12  0.574039  0.258625  0.469209  0.886768
13  0.915423  0.713076  0.073338  0.622967

希望能帮上忙。
编辑
在本例中,我以(大致)以下方式在处理器池中使用此函数:

from multiprocessing import Pool

nprocs = 4

pool = Pool(nprocs)

for chunk in chunker(df, nprocs):
    data = pool.map(myfunction, chunk)
    data.domorestuff()

我认为这应该与使用IPython分布式机器非常相似,但我还没有尝试过。

6qqygrtg

6qqygrtg3#

在实践中,你不能 * 保证 * 相同大小的块。行数(N)可能是质数,在这种情况下,您只能在1或N处获得相等大小的块。正因为如此,现实世界的分块通常使用固定的大小,并允许在最后使用较小的块。我倾向于传递一个数组给groupby。起始日期:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
    0         1         2         3         4
0   0  0.746300  0.346277  0.220362  0.172680
0   1  0.657324  0.687169  0.384196  0.214118
0   2  0.016062  0.858784  0.236364  0.963389
[...]
0  13  0.510273  0.051608  0.230402  0.756921
0  14  0.950544  0.576539  0.642602  0.907850

[15 rows x 5 columns]

我故意将索引设置为0,使其不提供信息,我们只需决定我们的大小(这里是10)并将数组除以它:

>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
...     print(k,g)
...     
0    0         1         2         3         4
0  0  0.746300  0.346277  0.220362  0.172680
0  1  0.657324  0.687169  0.384196  0.214118
0  2  0.016062  0.858784  0.236364  0.963389
[...]
0  8  0.241049  0.246149  0.241935  0.563428
0  9  0.493819  0.918858  0.193236  0.266257

[10 rows x 5 columns]
1     0         1         2         3         4
0  10  0.037693  0.370789  0.369117  0.401041
0  11  0.721843  0.862295  0.671733  0.605006
[...]
0  14  0.950544  0.576539  0.642602  0.907850

[5 rows x 5 columns]

当索引与DataFrame不兼容时,基于切片DataFrame的方法可能会失败,尽管您始终可以使用.iloc[a:b]忽略索引值并按位置访问数据。

yws3nbqq

yws3nbqq4#

Chunks 生成器函数,用于迭代pandas Dataframes和Series

块函数的生成器版本如下所示。此外,该版本还支持pd.DataFrame或pd.Series的自定义索引(例如:浮点型索引)

import numpy as np
    import pandas as pd

    df_sz = 14

    df = pd.DataFrame(np.random.rand(df_sz,4), 
                      index=np.linspace(0., 10., num=df_sz),
                      columns=['a', 'b', 'c', 'd']
                     )

    def chunker(seq, size):
        for pos in range(0, len(seq), size):
            yield seq.iloc[pos:pos + size] 

    chunk_size = 6
    for i in chunker(df, chunk_size):
        print(i)

   chnk = chunker(df, chunk_size)
   print('\n', chnk)
   print(next(chnk))
   print(next(chnk))
   print(next(chnk))

输出为

a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409

- generator object chunker at 0x7f503c9d0ba0

First "next()":
                 a         b         c         d
0.000000  0.560627  0.665897  0.683055  0.611884
0.769231  0.241871  0.357080  0.841945  0.340778
1.538462  0.065009  0.234621  0.250644  0.552410
2.307692  0.431394  0.235463  0.755084  0.114852
3.076923  0.173748  0.189739  0.148856  0.031171
3.846154  0.772352  0.697762  0.557806  0.254476

Second "next()":
                 a         b         c         d
4.615385  0.901200  0.977844  0.250316  0.957408
5.384615  0.400939  0.520841  0.863015  0.177043
6.153846  0.356927  0.344220  0.863067  0.400573
6.923077  0.375417  0.156420  0.897889  0.810083
7.692308  0.666371  0.152800  0.482446  0.955556
8.461538  0.242711  0.421591  0.005223  0.200596

Third "next()":
                  a         b         c         d
9.230769   0.735748  0.402639  0.527825  0.595952
10.000000  0.420209  0.365231  0.966829  0.514409
7xzttuei

7xzttuei5#

良好环境的一个标志是有很多选择,因此我将从Anaconda Blaze添加此选项,实际上是使用Odo

import blaze as bz
import pandas as pd

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]})

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
    # Do stuff with chunked dataframe
vlurs2pr

vlurs2pr6#

import pandas as pd

def batch(iterable, batch_number=10):
    """
    split an iterable into mini batch with batch length of batch_number
    supports batch of a pandas dataframe
    usage:
        for i in batch([1,2,3,4,5], batch_number=2):
            print(i)
        
        for idx, mini_data in enumerate(batch(df, batch_number=10)):
            print(idx)
            print(mini_data)
    """
    l = len(iterable)

    for idx in range(0, l, batch_number):
        if isinstance(iterable, pd.DataFrame):
            # dataframe can't split index label, should iter according index
            yield iterable.iloc[idx:min(idx+batch_number, l)]
        else:
            yield iterable[idx:min(idx+batch_number, l)]
ycggw6v2

ycggw6v27#

您建议使用groupby非常好,但您应该使用np.arange(len(dataframe)) // batch_size而不是dataframe.index,因为索引可以是非整数和非结果的。
我对给出的答案进行了一些基准测试。得票最多的那个慢得可怕。请考虑使用可接受的解决方案:

data.groupby(np.arange(len(dataframe)) // batch_size)

基准代码:

import numpy as np
import pandas as pd
import time
from tqdm.auto import tqdm

#@markdown # Create a properly funcky `pd.DataFrame`
data = pd.DataFrame([
  {
      'x': np.random.randint(23515243),
      'y': 364274*np.random.rand()-134562,
      'z': ''.join(np.random.choice(list('`1234567890-=qwertyuiop[]\asdfghjkl;\'zxcvbnm,./~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:"ZXCVBNM<>?'), np.random.randint(54,89), replace=True)),
  }
  for _ in tqdm(range(22378))
])
data.index = ['a'] * len(data)

data = pd.concat([data] * 100)

batch_size = 64

times = []

t0 = time.time()
for chunk in np.array_split(data, (len(data) + batch_size - 1) // batch_size):
  pass
times.append({'method': 'np.array_split', 'time': -t0 + time.time()})

t0 = time.time()
for _, chunk in data.groupby(np.arange(len(data)) // batch_size):
  pass
times.append({'method': 'groupby', 'time': -t0 + time.time()})

def chunker(seq, size):
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))
  
t0 = time.time()
for chunk in chunker(data, batch_size):
  pass
times.append({'method': '[]-syntax', 'time': -t0 + time.time()})

# t0 = time.time()
# for chunk in bz.odo(data, target=bz.chunks(pd.DataFrame), chunksize=batch_size):
#   pass
# times.append({'method': 'bz.odo', 'time': -t0 + time.time()})

def chunker(seq, size):
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size] 

t0 = time.time()
for i in chunker(data, batch_size):
    pass
times.append({'method': '.iloc[]-syntax', 'time': -t0 + time.time()})

pd.DataFrame(times)
wj8zmpe1

wj8zmpe18#

另一种方法。

# .. load df ..

CHUNK_SIZE = 100000

for chunk_num in range(len(df) // CHUNK_SIZE + 1):
    start_index = chunk_num*CHUNK_SIZE
    end_index = min(chunk_num*CHUNK_SIZE + CHUNK_SIZE, len(df))
    chunk = df[start_index:end_index]

    # .. do calculaton on chunk here ..

相关问题