python—减少在postgresql中使用多个内部连接插入记录的执行时间

4bbkushb  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(130)

我有这样一个用例,我必须每天在 DATAVANT_COVID_MATCH 表,同时连接其他3个表。

No of records in each table:
DATAVANT_COVID_MATCH        = 10k
COVID_PATNT_REGISTRY        = 800k
COVID_PATNT_REGISTRY_DEID   = 800k
MORTALITY_INDEX(Total count = 220 Million
MORTALITY_INDEX            ~= 10 Million records for each interval 
(Note: No of Records b/w 1980-2000 could have more than 15M records, and b/w 1940-1980 count could go down to <5M records)

Partitioned Table:
DATAVANT_COVID_MATCH
MORTALITY_INDEX
INDEX:
INDEX created for every partition of MORTALITY_INDEX table

example:
CREATE INDEX mortality_index_1941_dod_idx
    ON datavant_stg_o.mortality_index_1940 USING btree
    (dod ASC NULLS LAST)
    TABLESPACE pg_default
CREATE INDEX mortality_index_1941_1945_dod_idx
    ON datavant_stg_o.mortality_index_1941_1945 USING btree
    (dod ASC NULLS LAST)
    TABLESPACE pg_default;
etc...

所以我的问题是,如何减少执行时间?如有任何意见/建议,我们将不胜感激。谢谢您
下面是当前每天运行的代码

import os
import logging
import psycopg2
import socket
logging.basicConfig(level=logging.INFO)
cpc_name = socket.gethostname()

periods = ['1940']
periods.extend(['{}_{}'.format(i, i + 4) for i in range(1941, 2031, 5)])

if __name__ == "__main__":
    logging.info("Starting test process")
    logging.info("  cpc = {}".format(cpc_name) + '\n')
    for period in periods:
        # Do your psycopg2 connection here and get your cursor
        connection = psycopg2.connect(user        = os.environ.get("DATABASE_USER", "USR_NAME"),
                                      password    = os.environ.get("DATABASE_PASS", "DB_PASS"),
                                      host        = os.environ.get("DATABASE_HOST", "db_host.internal.com"),
                                      port        = 5432,
                                      dbname      = os.environ.get("DATABASE_NAME", "psql_db"),
                                      options     = "-c search_path=DATAVANT_O")
        with connection.cursor() as cursor:
            logging.info(str(connection.get_dsn_parameters()) + '\n')
            cursor.execute("SELECT version();")
            connection.commit()
            conn = cursor.fetchone()
            logging.info("You are connected to - " + str(conn) + '\n')
            cursor.execute("""
                    INSERT INTO DATAVANT_O.DATAVANT_COVID_MATCH_{}
                    SELECT
                    CUST_LAST_NM,
                    CUST_FRST_NM,
                    CIGNA_DOB,
                    CIGNA_ZIP,
                    DATAVANT_DOD,
                    DATAVANT_DOB,
                    DEATH_VERIFICATION,
                    DATA_SOURCE,
                    INDIV_ENTPR_ID
                    FROM
                    (
                        SELECT
                        CR.PATNT_LAST_NM AS CUST_LAST_NM,
                        CR.PATNT_FRST_NM AS CUST_FRST_NM,
                        CRD.CUST_BRTH_DT AS CIGNA_DOB,
                        CR.PATNT_POSTL_CD AS CIGNA_ZIP,
                        MI.DOD AS DATAVANT_DOD,
                        MI.DOB AS DATAVANT_DOB,
                        MI.DEATH_VERIFICATION,
                        MI.DATA_SOURCE,
                        CRD.INDIV_ENTPR_ID,
                        ROW_NUMBER () OVER (PARTITION BY CRD.INDIV_ENTPR_ID ORDER BY CRD.INDIV_ENTPR_ID DESC)
                        FROM DATAVANT_O.COVID_PATNT_REGISTRY_DEID CRD
                        INNER JOIN DATAVANT_STG_O.MORTALITY_INDEX_{} MI ON
                        CRD.TOKEN_1 = MI.TOKEN_1 AND
                        CRD.TOKEN_2 = MI.TOKEN_2 AND
                        CRD.TOKEN_4 = MI.TOKEN_4
                        INNER JOIN DATAVANT_O.COVID_PATNT_REGISTRY CR ON
                        CR.INDIV_ENTPR_ID = CRD.INDIV_ENTPR_ID

                    ) x
                    WHERE
                    ROW_NUMBER = 1;""". format(period, period)
            )
            # Commit and close your connection here
            connection.commit()
            count = cursor.rowcount
            if count == 0:
                logging.info("There are no matching records to insert in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + '\n')
            else:
                logging.info("No of Record(s) Inserted in DATAVANT_COVID_MATCH table for Date: {} " . format(period) + "--> " + str(count) +  '\n')
            connection.close()
            logging.info("PostgreSQL connection is closed" + "\n")

我是否应该在sql脚本中做些改进,或者通过向表中添加索引或添加更多分区来升级表,以帮助减少执行时间?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题