我有这样一个用例,我必须每天在 DATAVANT_COVID_MATCH
表,同时连接其他3个表。
No of records in each table:
DATAVANT_COVID_MATCH = 10k
COVID_PATNT_REGISTRY = 800k
COVID_PATNT_REGISTRY_DEID = 800k
MORTALITY_INDEX(Total count = 220 Million
MORTALITY_INDEX ~= 10 Million records for each interval
(Note: No of Records b/w 1980-2000 could have more than 15M records, and b/w 1940-1980 count could go down to <5M records)
Partitioned Table:
DATAVANT_COVID_MATCH
MORTALITY_INDEX
INDEX:
INDEX created for every partition of MORTALITY_INDEX table
example:
CREATE INDEX mortality_index_1941_dod_idx
ON datavant_stg_o.mortality_index_1940 USING btree
(dod ASC NULLS LAST)
TABLESPACE pg_default
CREATE INDEX mortality_index_1941_1945_dod_idx
ON datavant_stg_o.mortality_index_1941_1945 USING btree
(dod ASC NULLS LAST)
TABLESPACE pg_default;
etc...
所以我的问题是,如何减少执行时间?如有任何意见/建议,我们将不胜感激。谢谢您
下面是当前每天运行的代码
import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()
periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2031, 5)])
if __name__ == "__main__":
logging.info("Starting test process")
logging.info(" cpc = {}".format(cpc_name) + '\n')
for period in periods:
# Do your psycopg2 connection here and get your cursor
connection = psycopg2.connect(user = os.environ.get("DATABASE_USER", "USR_NAME"),
password = os.environ.get("DATABASE_PASS", "DB_PASS"),
host = os.environ.get("DATABASE_HOST", "db_host.internal.com"),
port = 5432,
dbname = os.environ.get("DATABASE_NAME", "psql_db"),
options = "-c search_path=DATAVANT_O")
with connection.cursor() as cursor:
logging.info(str(connection.get_dsn_parameters()) + '\n')
cursor.execute("SELECT version();")
connection.commit()
conn = cursor.fetchone()
logging.info("You are connected to - " + str(conn) + '\n')
cursor.execute("""
INSERT INTO DATAVANT_O.DATAVANT_COVID_MATCH_{}
SELECT
CUST_LAST_NM,
CUST_FRST_NM,
CIGNA_DOB,
CIGNA_ZIP,
DATAVANT_DOD,
DATAVANT_DOB,
DEATH_VERIFICATION,
DATA_SOURCE,
INDIV_ENTPR_ID
FROM
(
SELECT
CR.PATNT_LAST_NM AS CUST_LAST_NM,
CR.PATNT_FRST_NM AS CUST_FRST_NM,
CRD.CUST_BRTH_DT AS CIGNA_DOB,
CR.PATNT_POSTL_CD AS CIGNA_ZIP,
MI.DOD AS DATAVANT_DOD,
MI.DOB AS DATAVANT_DOB,
MI.DEATH_VERIFICATION,
MI.DATA_SOURCE,
CRD.INDIV_ENTPR_ID,
ROW_NUMBER () OVER (PARTITION BY CRD.INDIV_ENTPR_ID ORDER BY CRD.INDIV_ENTPR_ID DESC)
FROM DATAVANT_O.COVID_PATNT_REGISTRY_DEID CRD
INNER JOIN DATAVANT_STG_O.MORTALITY_INDEX_{} MI ON
CRD.TOKEN_1 = MI.TOKEN_1 AND
CRD.TOKEN_2 = MI.TOKEN_2 AND
CRD.TOKEN_4 = MI.TOKEN_4
INNER JOIN DATAVANT_O.COVID_PATNT_REGISTRY CR ON
CR.INDIV_ENTPR_ID = CRD.INDIV_ENTPR_ID
) x
WHERE
ROW_NUMBER = 1;""". format(period, period)
)
# Commit and close your connection here
connection.commit()
count = cursor.rowcount
if count == 0:
logging.info("There are no matching records to insert in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + '\n')
else:
logging.info("No of Record(s) Inserted in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + "--> " + str(count) + '\n')
connection.close()
logging.info("PostgreSQL connection is closed" + "\n")
我是否应该在sql脚本中做些改进,或者通过向表中添加索引或添加更多分区来升级表,以帮助减少执行时间?
暂无答案!
目前还没有任何答案,快来回答吧!