Postgresql +心理学:使用POSTGRESQL函数调用大容量插入大型数据

jaql4c8m  于 2022-11-04  发布在  PostgreSQL
关注(0)|答案(2)|浏览(313)

我正在处理大量非常简单的数据(点云)。我想使用Python将这些数据插入Postgresql数据库中的一个简单表中。
下面是我需要执行的插入语句的一个示例:
INSERT INTO points_postgis (id_scan, scandist, pt) VALUES (1, 32.656,**ST_MakePoint**(1.1, 2.2, 3.3));

请注意INSERT语句中对Postgresql函数ST_MakePoint的调用。

我必须调用它数十亿次(是的,数十亿次),所以显然我必须以更优化的方式将数据插入Postgresql。有很多策略可以批量插入数据,正如本文以非常好的和信息丰富的方式所介绍的(insertmany,copy等)。https://hakibenita.com/fast-load-data-python-postgresql
但是当你需要在服务器端调用一个函数时,没有一个例子展示如何执行这些插入操作。当我需要使用psycopg在Postgresql数据库的服务器端调用函数时,如何批量INSERT数据?
任何帮助都是非常感谢的!谢谢!
请注意,使用CSV没有多大意义,因为我的数据很大。或者,我已经尝试用ST_MakePoint函数的3个输入的简单列填充临时表,然后,在所有数据都进入该临时函数后,调用INSERT/SELECT。问题是,这花费了大量时间,而且我需要的磁盘空间量是无意义的。

bvuwiixz

bvuwiixz1#

为了在合理的时间内以最小的努力完成这一任务,最重要的是将此任务分解为多个组成部分,以便您可以分别利用不同的Postgres特性。
首先,您需要先创建减去几何变换的表。例如:

create table temp_table (
    id_scan bigint, 
    scandist numeric, 
    pt_1 numeric, 
    pt_2 numeric, 
    pt_3 numeric
);

因为我们没有添加任何索引和约束,所以这很可能是将“原始”数据导入RDBMS的最快方法。
最好的方法是使用COPY方法,您可以直接从Postgres使用该方法(如果您有足够的访问权限),也可以通过Python接口使用https://www.psycopg.org/docs/cursor.html#cursor.copy_expert
以下是实现此目的的示例代码:

iconn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(target_host, target_usr, target_db, target_pw, "require")
iconn = psycopg2.connect(iconn_string)
import_cursor = iconn.cursor()
csv_filename = '/path/to/my_file.csv'
copy_sql = "COPY temp_table (id_scan, scandist, pt_1, pt_2, pt_3) FROM STDIN WITH CSV HEADER DELIMITER ',' QUOTE '\"' ESCAPE '\\' NULL AS 'null'"
with open(csv_filename, mode='r', encoding='utf-8', errors='ignore') as csv_file:
    import_cursor.copy_expert(copy_sql, csv_file)
iconn.commit()

下一步将是从现有的原始数据中高效地创建所需的表,然后您将能够使用单个SQL语句创建实际的目标表,并让RDBMS发挥它的魔力。
数据进入RDBMS后,对其进行一些优化并在适用的情况下添加一两个索引(首选主索引或唯一索引,以加快转换速度)是有意义的。
这将取决于您的数据/用例,但类似以下内容应该会有所帮助:

alter table temp_table add primary key (id_scan); --if unique
-- or
create index idx_temp_table_1 on temp_table(id_scan); --if not unique

要将数据从原始表移动到目标表,请执行以下操作:

with temp_t as (
    select id_scan, scandist, ST_MakePoint(pt_1, pt_2, pt_3) as pt from temp_table
)
INSERT INTO points_postgis (id_scan, scandist, pt)
SELECT temp_t.id_scan, temp_t.scandist, temp_t.pt
FROM temp_t;

这将一次从上一个表中选择所有数据并转换它。
第二个选项与此类似。您可以将所有原始数据直接加载到points_postgis,同时将其分隔为3个临时列。然后使用alter table points_postgis add column pt geometry;进行更新,并删除临时列:update points_postgis set pt = ST_MakePoint(pt_1, pt_2, pt_3);alter table points_postgis drop column pt_1, drop column pt_2, drop column pt_3;
主要的收获是最好的选择不是集中在最后的final表状态上,而是将其分解成容易实现的块。Postgres将轻松处理数十亿行的导入,以及随后的转换。

nuypyhwy

nuypyhwy2#

以下是使用生成带校验位的UPC A条形码的函数的一些简单示例:
1.使用execute_batch. execute_batchpage_size参数,允许您使用多行语句批处理插入。默认情况下,此参数设置为100,一次插入100行。您可以增加此参数,以减少到服务器的往返次数。
1.仅使用execute并从另一个表中选择数据。

import psycopg2
from psycopg2.extras import execute_batch

con = psycopg2.connect(dbname='test', host='localhost', user='postgres', 
port=5432)
cur = con.cursor()

cur.execute('create table import_test(id integer, suffix_val varchar, upca_val 
varchar)')
con.commit()

# Input data as a list of tuples. Means some data is duplicated.

input_list = [(1, '12345', '12345'), (2, '45278', '45278'), (3, '61289', 
'61289')]
execute_batch(cur, 'insert into import_test values(%s, %s, 
upc_check_digit(%s))', input_list)
con.commit()

select * from import_test ;
 id | suffix_val |   upca_val   
----+------------+--------------
  1 | 12345      | 744835123458
  2 | 45278      | 744835452787
  3 | 61289      | 744835612891

# Input data as list of dicts and using named parameters to avoid duplicating data.

input_list_dict = [{'id': 50, 'suffix_val': '12345'}, {'id': 51, 'suffix_val': '45278'}, {'id': 52, 'suffix_val': '61289'}]  

execute_batch(cur, 'insert into import_test values(%(id)s, 
%(suffix_val)s, upc_check_digit(%(suffix_val)s))', input_list_dict)
con.commit()

select * from import_test ;
 id | suffix_val |   upca_val   
----+------------+--------------
  1 | 12345      | 744835123458
  2 | 45278      | 744835452787
  3 | 61289      | 744835612891
 50 | 12345      | 744835123458
 51 | 45278      | 744835452787
 52 | 61289      | 744835612891

# Create a table with values to be used for inserting into final table

cur.execute('create table input_vals (id integer, suffix_val varchar)')
con.commit()

execute_batch(cur, 'insert into input_vals values(%s, %s)', [(100, '76234'), 
(101, '92348'), (102, '16235')])
con.commit()
cur.execute('insert into import_test select id, suffix_val, 
upc_check_digit(suffix_val) from input_vals')
con.commit()

 select * from import_test ;
  id   | suffix_val |   upca_val   
-------+------------+--------------
     1 | 12345      | 744835123458
     2 | 45278      | 744835452787
     3 | 61289      | 744835612891
 12345 | 12345      | 744835123458
 45278 | 45278      | 744835452787
 61289 | 61289      | 744835612891
   100 | 76234      | 744835762343
   101 | 92348      | 744835923485
   102 | 16235      | 744835162358

相关问题