我尝试计算单元格内要素的统计数据,因为 Dataframe 非常大,我使用多处理来加快处理速度。当我尝试运行完整数据集的统计数据时,(430万个单元格+-),我得到AssertionError。看起来这是由于试图将超过2Gb的数据从worker传递到主进程这是有道理的,我的一个 Dataframe 在磁盘上是5GB。显然这是一个已经修复的bug,所以我在osgeo 4 w中移到了python 3. 9,但是得到了同样的错误。然后,我尝试使用array_split和循环将其分解为较小的数据集,但仍然没有运气。我如何才能完成这个相当大的操作?我尝试使用gp.overlay,但得到了相同的Assert错误。Sjoin返回每个相交要素的计数,我能够过滤和sjoin,但有些轨迹多次穿过单元格,我需要每次交叉的计数,因此,我使用geopandas覆盖和剪辑。我也尝试了pandarallel,但它不断返回错误剪辑未定义。
from pandas import concat
from pandarallel import pandarallel
from geopandas import read_file, clip
from time import time as ttime, strftime, gmtime
from datetime import timedelta
from calendar import month_name
from functools import partial
from os import chdir, path, remove
from multiprocessing import Pool
from numpy import array_split
def cell_calc(x, y):
df = clip(y.to_crs(x.crs), x, keep_geom_type=True).explode()
cell = x
TList = ['CARGO', 'FISHING', 'OTHER', 'PASSENGER', 'TANKER']
MList = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cell['ALL'] = len(df)
for type in TList:
cell[type] = len(df[df['MYTYPE']==type])
for month in MList:
mdf = df[df['MONTH']==month]
cell[month_name[month]+'_ALL'] = len(mdf)
for type in TList:
cell[month_name[month]+'_'+type] = len(mdf[mdf['MYTYPE'] == type])
return cell
if __name__ == '__main__':
GRIDDB = r'...\GRID.gdb'
GRID = 'GRID102001'
TRACKSDB = r'...\ShipTracks.gdb'
TRACKS = 'ShipTracks_clip'
OGDB = r'...\OutDir'
ODB = '2020.gpkg'
stime = ttime()
print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
GRIDGDF = read_file(GRID)
GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB', rows=1000)
TRACKSGDF = TRACKSGDF[['MYTYPE', 'MONTH', 'geometry']]
stime = ttime()
print('Computing cell statistics ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
func = partial(cell_calc, y=TRACKSGDF)
p = Pool(processes=16)
split_dfs = array_split(GRIDGDF, 16)
pool_results = p.map(func, split_dfs)
p.close()
p.join()
grid = concat(pool_results)
etime = ttime()
print('Cell statistics completed at ' + str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
(timedelta(seconds=etime - stime))))
chdir(OGDB)
grid.to_file(ODB, layer='AIS_GRID', driver='GPKG')
下面是错误:
AssertionError
Process SpawnPoolWorker-31:
Traceback (most recent call last):
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\pool.py", line 114, in worker
task = get()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\queues.py", line 366, in get
res = self._reader.recv_bytes()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 323, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 342, in _get_more_data
assert left > 0
我添加了这个循环,试图打破数据甚至更多...
for grid in array_split(GRIDGDF, 10):
count = count+1
tracks = clip(TRACKSGDF, grid)
print('Computing cell statistics for '+str(count)+' of 10 at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
func = partial(cell_calc, y=tracks)
p = Pool(processes=16)
split_dfs = array_split(grid, 16)
pool_results = p.map(func, split_dfs)
p.close()
p.join()
grid = concat(pool_results, axis=0)
etime = ttime()
print('Cell statistics completed for '+str(count)+' of 10 at '+ str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
(timedelta(seconds=etime - stime))))
results.append(grid)
1条答案
按热度按时间t40tm48m1#
经过一些测试后,我了解到使用多处理器模块加速覆盖在分割目标时效果更好(TRACKS)与面罩相对(GRID)。所以我更改了代码,以循环遍历网格的较大部分,并在grid_calc()函数我添加了一个多处理器的覆盖应用于目标数据的array_split。我还切换到了pivot_表,它比for循环快得多,可以计算每个月和类型的记录。
我不确定这是如何解决AssertionError的;重写是否修复了代码问题,或者这种方式是否更好地处理了PC资源。