如何将数据从ElasticSearch传输到postgres?

ntjbwcob  于 2021-06-09  发布在  ElasticSearch
关注(0)|答案(1)|浏览(516)

我在ElasticSearch中有大量的数据,我想编写一个脚本来创建一个对应于特定索引的表,并将所有数据传输到postgres。

s4n0splo

s4n0splo1#

没关系,我得到了答案。我所做的就是
创建与postgres和elastic search的连接
在postgresql中创建表
在字典列表中以10k为单位存储数据。
从postgresql中的字典列表中传输数据,然后清空该列表以进行下一次迭代。

import psycopg2
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from collections import defaultdict
dict = defaultdict(list)

t_host = "localhost"
t_port = "9200"
t_dbname_ES = "companydatabase" #index
t_user = "elastic"
t_pw = "changeme"
client_ES = Elasticsearch([t_host],http_auth=(t_user, t_pw),port=t_port)

t_host = "localhost"
t_port = "5999"
t_dbname = "postgres"
t_user = "postgres"
t_pw = "postgres"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()

column_name_list = ["Address","Age","DateOfJoining","Designation","FirstName","Gender","Interests","LastName","MaritalStatus","Salary"]
column_type_list = ["text not null","integer","date","text","text","text","text","text","text","text","text","text","integer"]
table_name = 'sample_table2'  #table name to insert data into
column_names = ', '.join(column_name_list)
column_types = ", ".join(column_type_list)

# table creation

create_table_query = "CREATE TABLE {} (".format(table_name)
for i in range(len(column_name_list)):
    create_table_query += column_name_list[i]
    create_table_query += " "
    create_table_query += column_type_list[i]
    if i != len(column_name_list) - 1:
        create_table_query += ", "
create_table_query += ");"
try:
    db_cursor.execute(create_table_query)
    db_conn.commit()
except psycopg2.Error as e:
    t_message = "Database error: " + e

# data insertion

s = Search(index=t_dbname_ES).using(client_ES).query("match_all")
total_documents = s.count() #total count of records in the index
count=0
for hit in s.scan(): #looping over all records one at a time
    count+=1
    total_documents -=1
    for i in range(len(column_name_list)):  #appending the data fethed from document in a list of dictionary.
        dict[column_name_list[i]].append(hit[column_name_list[i]])

    if count==10000 or total_documents==0:   #appending data in postgres 10k records at a time
        insert_query = "INSERT INTO "+table_name+" (" + column_names + ")"+" VALUES"
        for i in range(min(10000,count)):
            insert_query += "("
            for j in range(len(column_name_list)):
                if j!=0:
                    insert_query+=', '+ "'"+str(dict[column_name_list[j]][i])+"'"
                else:
                    insert_query+="'"+str(dict[column_name_list[j]][i])+"'"
            insert_query += "),"
        insert_query= insert_query[:-1]
        insert_query += ";"

        for i in range(len(column_name_list)):  #making the list empty for next iteration of 10k records
            dict[column_name_list[i]]=[]

        try:
            db_cursor.execute(insert_query)
            db_conn.commit()
            count=0
        except psycopg2.Error as e:
            t_message = "Database error: " + e

db_cursor.close()
db_conn.close()

相关问题