pandas Python 3.9中的AssertionError

lstz6jyr  于 2023-03-28  发布在  Python
关注(0)|答案(1)|浏览(176)

我尝试计算单元格内要素的统计数据,因为 Dataframe 非常大,我使用多处理来加快处理速度。当我尝试运行完整数据集的统计数据时,(430万个单元格+-),我得到AssertionError。看起来这是由于试图将超过2Gb的数据从worker传递到主进程这是有道理的,我的一个 Dataframe 在磁盘上是5GB。显然这是一个已经修复的bug,所以我在osgeo 4 w中移到了python 3. 9,但是得到了同样的错误。然后,我尝试使用array_split和循环将其分解为较小的数据集,但仍然没有运气。我如何才能完成这个相当大的操作?我尝试使用gp.overlay,但得到了相同的Assert错误。Sjoin返回每个相交要素的计数,我能够过滤和sjoin,但有些轨迹多次穿过单元格,我需要每次交叉的计数,因此,我使用geopandas覆盖和剪辑。我也尝试了pandarallel,但它不断返回错误剪辑未定义。

from pandas import concat
from pandarallel import pandarallel
from geopandas import read_file, clip
from time import time as ttime, strftime, gmtime
from datetime import timedelta
from calendar import month_name
from functools import partial
from os import chdir, path, remove
from multiprocessing import Pool
from numpy import array_split

def cell_calc(x, y):
    df = clip(y.to_crs(x.crs), x, keep_geom_type=True).explode()
    cell = x
    TList = ['CARGO', 'FISHING', 'OTHER', 'PASSENGER', 'TANKER']
    MList = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
    cell['ALL'] = len(df)
    for type in TList:
        cell[type] = len(df[df['MYTYPE']==type])
    for month in MList:
        mdf = df[df['MONTH']==month]
        cell[month_name[month]+'_ALL'] = len(mdf)
        for type in TList:
            cell[month_name[month]+'_'+type] = len(mdf[mdf['MYTYPE'] == type])
    return cell

if __name__ == '__main__':
    GRIDDB = r'...\GRID.gdb'
    GRID = 'GRID102001'
    TRACKSDB = r'...\ShipTracks.gdb'
    TRACKS = 'ShipTracks_clip'
    OGDB = r'...\OutDir'
    ODB = '2020.gpkg'

    stime = ttime()
    print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    GRIDGDF = read_file(GRID)
    GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
    TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB', rows=1000)
    TRACKSGDF = TRACKSGDF[['MYTYPE', 'MONTH', 'geometry']]
    stime = ttime()
    print('Computing cell statistics ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    func = partial(cell_calc, y=TRACKSGDF)
    p = Pool(processes=16)
    split_dfs = array_split(GRIDGDF, 16)
    pool_results = p.map(func, split_dfs)
    p.close()
    p.join()

    grid = concat(pool_results)

    etime = ttime()
    print('Cell statistics completed at ' + str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))

    chdir(OGDB)
    grid.to_file(ODB, layer='AIS_GRID', driver='GPKG')

下面是错误:

AssertionError
Process SpawnPoolWorker-31:
Traceback (most recent call last):
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\pool.py", line 114, in worker
task = get()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\queues.py", line 366, in get
res = self._reader.recv_bytes()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 323, in _recv_bytes
return self._get_more_data(ov, maxsize)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 342, in _get_more_data
assert left > 0

我添加了这个循环,试图打破数据甚至更多...

for grid in array_split(GRIDGDF, 10):
        count = count+1
        tracks = clip(TRACKSGDF, grid)

        print('Computing cell statistics for '+str(count)+' of 10 at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
        func = partial(cell_calc, y=tracks)
        p = Pool(processes=16)
        split_dfs = array_split(grid, 16)
        pool_results = p.map(func, split_dfs)
        p.close()
        p.join()

        grid = concat(pool_results, axis=0)

        etime = ttime()
        print('Cell statistics completed for '+str(count)+' of 10 at '+ str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
        results.append(grid)
t40tm48m

t40tm48m1#

经过一些测试后,我了解到使用多处理器模块加速覆盖在分割目标时效果更好(TRACKS)与面罩相对(GRID)。所以我更改了代码,以循环遍历网格的较大部分,并在grid_calc()函数我添加了一个多处理器的覆盖应用于目标数据的array_split。我还切换到了pivot_表,它比for循环快得多,可以计算每个月和类型的记录。

def grid_calc(grid, tracks):
    stime = ttime()
    print('Starting Overlay at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    func = partial(overlay, df2=grid)
    trackList = []
    with Pool(16) as pool:
        for fresults in pool.map(func, array_split(tracks, 16)):
            trackList.append(fresults)
    tracks = concat(trackList)
    # tracks = overlay(tracks, grid, how='intersection').explode()
    etime = ttime()
    print('Overlay completed at ' + str(strftime('%H:%M:%S', gmtime(etime - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
    stime = ttime()
    print('Group and Pivot started at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    j1 = tracks.groupby(['grid_id']).size().reset_index(name='ALL')
    j1 = pivot_table(j1, values='ALL', index='grid_id', aggfunc=np_sum, fill_value=0)

    j2 = tracks.groupby('grid_id').size().reset_index(name='COUNT')
    j2 = pivot_table(j2, values='COUNT', index='grid_id', columns='MYTYPE', aggfunc=np_sum, fill_value=0)

    j3 = tracks.groupby(['grid_id', 'MONTH']).size().reset_index(name='COUNT')
    j3['cols'] = j3.apply(lambda x: str(month_name[x['MONTH']]).upper() + '_'+'ALL', axis=1)
    j3 = pivot_table(j3, values='COUNT', index='grid_id', columns='cols', aggfunc=np_sum, fill_value=0)

    j4 = tracks.groupby(['grid_id', 'MONTH', 'MYTYPE']).size().reset_index(name='COUNT')
    j4['cols'] = j4.apply(lambda x: str(month_name[x['MONTH']]).upper() + '_' + x['MYTYPE'], axis=1)
    j4 = pivot_table(j4, values='COUNT', index='grid_id', columns='cols', aggfunc=np_sum, fill_value=0)
    jtables = [j1, j2, j3, j4]
    fgrid = grid
    for jtable in jtables:
        fgrid = merge(fgrid, jtable, how='left', on='grid_id').fillna(0)fill_value=0)

        
    etime = ttime()
    print('Group and Pivot complete at ' + str(strftime('%H:%M:%S', gmtime(etime - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
    return fgrid

if __name__ == '__main__':
    print('Importing modules and components...')
    from pandas import concat, pivot_table, merge
    from pandarallel import pandarallel
    from geopandas import read_file, overlay, GeoDataFrame
    from time import time as ttime, strftime, gmtime
    from datetime import timedelta
    from calendar import month_name
    from functools import partial
    from os import chdir
    from multiprocessing import Pool
    from numpy import array_split, sum as np_sum
    print('Imports complete')

    GRIDDB = r'...\GRID.gdb'
    GRID = 'GRID'
    TRACKSDB = r'...\Track.gdb'
    TRACKS = 'TRACKS'
    OGDB = #Out Dir
    ODB = #Out Geopackage\Geodatabase

    stime = ttime()
    print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    GRIDGDF = read_file(GRIDDB, layer=GRID, Driver='FileGDB')
    GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
    TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB')
    TRACKSGDF = TRACKSGDF[TRACKSGDF.geom_type == 'MultiLineString']

    print('Breaking up big dataset into smaller data pools...')
    x = len(GRIDGDF) // 360000
    if x == 0:
        n = 1
        x = 1
    elif x < 2:
        n = x
    else:
        n = 2

    fgrids = []
    for grid in array_split(GRIDGDF, x):
        df1 = grid_calc(grid, TRACKSGDF.to_crs(grid.crs))
        fgrids.append(df1)
        del df1

    chdir(OGDB)
    if len(fgrids) < 2:
        fgrid = fgrids[0][arrange_cols]
        fgrid.to_file(ODB, layer='FINAL_GRID_COUNTS', driver='GPKG')
    else:
        fgrid = concat(fgrids)[arrange_cols]
        fgrid.to_file(ODB, layer='FINAL_GRID_COUNTS', driver='GPKG')

我不确定这是如何解决AssertionError的;重写是否修复了代码问题,或者这种方式是否更好地处理了PC资源。

相关问题