import psycopg2
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from collections import defaultdict
dict = defaultdict(list)
t_host = "localhost"
t_port = "9200"
t_dbname_ES = "companydatabase" #index
t_user = "elastic"
t_pw = "changeme"
client_ES = Elasticsearch([t_host],http_auth=(t_user, t_pw),port=t_port)
t_host = "localhost"
t_port = "5999"
t_dbname = "postgres"
t_user = "postgres"
t_pw = "postgres"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()
column_name_list = ["Address","Age","DateOfJoining","Designation","FirstName","Gender","Interests","LastName","MaritalStatus","Salary"]
column_type_list = ["text not null","integer","date","text","text","text","text","text","text","text","text","text","integer"]
table_name = 'sample_table2' #table name to insert data into
column_names = ', '.join(column_name_list)
column_types = ", ".join(column_type_list)
# table creation
create_table_query = "CREATE TABLE {} (".format(table_name)
for i in range(len(column_name_list)):
create_table_query += column_name_list[i]
create_table_query += " "
create_table_query += column_type_list[i]
if i != len(column_name_list) - 1:
create_table_query += ", "
create_table_query += ");"
try:
db_cursor.execute(create_table_query)
db_conn.commit()
except psycopg2.Error as e:
t_message = "Database error: " + e
# data insertion
s = Search(index=t_dbname_ES).using(client_ES).query("match_all")
total_documents = s.count() #total count of records in the index
count=0
for hit in s.scan(): #looping over all records one at a time
count+=1
total_documents -=1
for i in range(len(column_name_list)): #appending the data fethed from document in a list of dictionary.
dict[column_name_list[i]].append(hit[column_name_list[i]])
if count==10000 or total_documents==0: #appending data in postgres 10k records at a time
insert_query = "INSERT INTO "+table_name+" (" + column_names + ")"+" VALUES"
for i in range(min(10000,count)):
insert_query += "("
for j in range(len(column_name_list)):
if j!=0:
insert_query+=', '+ "'"+str(dict[column_name_list[j]][i])+"'"
else:
insert_query+="'"+str(dict[column_name_list[j]][i])+"'"
insert_query += "),"
insert_query= insert_query[:-1]
insert_query += ";"
for i in range(len(column_name_list)): #making the list empty for next iteration of 10k records
dict[column_name_list[i]]=[]
try:
db_cursor.execute(insert_query)
db_conn.commit()
count=0
except psycopg2.Error as e:
t_message = "Database error: " + e
db_cursor.close()
db_conn.close()
1条答案
按热度按时间s4n0splo1#
没关系,我得到了答案。我所做的就是
创建与postgres和elastic search的连接
在postgresql中创建表
在字典列表中以10k为单位存储数据。
从postgresql中的字典列表中传输数据,然后清空该列表以进行下一次迭代。