As described in the title I am using SQLAlchemy and PYODBC in a python script to insert large csv files (up to 14GB) into a locally hosted SQL Server database.
I know that I cannot host a 14gb dataframe in memory so I am using the chunk feature in pandas to run batch inserts and have experimented with batch sizes as small as 100 rows which easily fits into memory.
I have a feeling the memory error is related to the SQL side of things. To minimize load processing I do not have any indexes on the tables I am inserting into (hash tables). No matter the batch size I am running out of memory at the same point in the loading process.
What am I overlooking? Should I be flushing the memory somehow? Or is SQL Server waiting to commit the transaction until the connection closes?
Here is my current code:
`import os
import glob
import traceback
import pandas as pd
import pyodbc
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm
# Replace the following variables with your database connection details
DB_USERNAME = '-----'
DB_PASSWORD = '-----'
DB_HOST = '------'
DB_NAME = '------'
DATA_DIRECTORY = "---------"
ERRORS_DIRECTORY = os.path.join(DATA_DIRECTORY, "errors")
def create_database_engine():
engine = create_engine(f"mssql+pyodbc://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True )
return engine
def batch_insert_to_database(engine, data, table, error_log_file):
# Insert the data into the database in batches
# The first batch will DROP ANY EXISTING TABLE IN THE DATABASE WITH THE SAME NAME
try:
data.to_sql(table, con=engine, index=False, if_exists='append')
except SQLAlchemyError as e:
error_message = f"Error during batch insertion: {e}"
print("error encountered")
with open(error_log_file, 'a') as error_log:
error_log.write(error_message + '\n')
traceback.print_exc(file=error_log)
return False
return True
def load_table_data(csv, table, errors_directory):
print(f"Beginning load for {table}")
# Create a database engine
engine = create_database_engine()
# Create an empty errors DataFrame to store failed batches
errors_df = pd.DataFrame()
# Batch size for insertion
batch_size = 100
# Initialize tqdm for progress tracking
progress_bar = tqdm(total=0, desc="Processing")
with engine.connect() as connection:
truncate_command = text(f"TRUNCATE TABLE [dbo].[{table}]")
connection.execute(truncate_command)
error_report_file = os.path.join(errors_directory, f"errors_{table}.txt")
# Read data from the CSV file in batches
for batch_data in pd.read_csv(csv, chunksize=batch_size):
# Try to insert the batch into the database
success = batch_insert_to_database(engine, batch_data, table, error_report_file)
# If the batch insertion fails, add the batch to the errors DataFrame
if not success:
errors_df = pd.concat([errors_df, batch_data])
# Update the progress bar
progress_bar.update(len(batch_data))
# Close the progress bar
progress_bar.close()
error_data_file = os.path.join(errors_directory, f"errors_{table}.csv")
# Save the errors DataFrame to a CSV file
if not errors_df.empty:
errors_df.to_csv(error_data_file, index=False)
print(f"Errors saved to {error_data_file}")
def main():
pattern = os.path.join(DATA_DIRECTORY, '**', '*.gz')
gz_files = glob.glob(pattern, recursive=True)
tables = [[file, file.split(os.path.sep)[-1].split(".")[0]] for file in gz_files]
for table_data in tables:
print(table_data)
load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
if __name__ == "__main__":
main()`
Here is the stack trace:
Traceback (most recent call last):
File "C:\...\main.py", line 97, in <module>
main()
File "C:\...\main.py", line 94, in main
load_table_data(table_data[0], table_data[1], ERRORS_DIRECTORY)
File "C:\...\main.py", line 66, in load_table_data
success = batch_insert_to_database(engine, batch_data, table, error_report_file)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\main.py", line 29, in batch_insert_to_database
data.to_sql(table, con=engine, index=False, if_exists='append')
File "C:\...\pandas\core\generic.py", line 3008, in to_sql
return sql.to_sql(
^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 788, in to_sql
return pandas_sql.to_sql(
^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1958, in to_sql
total_inserted = sql_engine.insert_records(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1498, in insert_records
return table.insert(chunksize=chunksize, method=method)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 1059, in insert
num_inserted = exec_insert(conn, keys, chunk_iter)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\pandas\io\sql.py", line 951, in _execute_insert
result = conn.execute(self.table.insert(), data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1412, in execute
return meth(
^^^^^
File "C:\...\sqlalchemy\sql\elements.py", line 516, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1635, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1844, in _execute_context
return self._exec_single_context(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\sqlalchemy\engine\base.py", line 1984, in _exec_single_context
self._handle_dbapi_exception(
File "C:\...\sqlalchemy\engine\base.py", line 2342, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "C:\...\sqlalchemy\engine\base.py", line 1934, in _exec_single_context
self.dialect.do_executemany(
File "C:\...\sqlalchemy\dialects\mssql\pyodbc.py", line 716, in do_executemany
super().do_executemany(cursor, statement, parameters, context=context)
File "C:\...\sqlalchemy\engine\default.py", line 918, in do_executemany
cursor.executemany(statement, parameters)
And the error message is simply "MemoryError".
Further investigation revealed it is a specific file that is causing the error even if I don't load prior ones. I am starting to think it might be a bad error message. Thank you for all the suggestions. I don't see how there could be a memory error retrieving 100 rows with 6 columns from a csv. I will post if/when I find it.
1条答案
按热度按时间dwbf0jvd1#
I don't know a lot of sqlserver but maybe try this to see if the entire import is trying to pile up in a single transaction, there might be a better way but this would be informative, (this would commit every batch):