重试失败的sqlalchemy查询

pu82cl6c  于 2021-06-18  发布在  Mysql
关注(0)|答案(4)|浏览(666)

每次我重新开始 mysql 服务,我的应用程序在任何查询中收到以下错误:

  1. result = self._query(query)
  2. File "/usr/local/lib/python3.6/site-packages/pymysql/cursors.py", line 328, in _query
  3. conn.query(q)
  4. File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 516, in query
  5. self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  6. File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 727, in _read_query_result
  7. result.read()
  8. File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 1066, in read
  9. first_packet = self.connection._read_packet()
  10. File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 656, in _read_packet
  11. packet_header = self._read_bytes(4)
  12. File "/usr/local/lib/python3.6/site-packages/pymysql/connections.py", line 702, in _read_bytes
  13. CR.CR_SERVER_LOST, "Lost connection to MySQL server during query")
  14. sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2013, 'Lost connection to MySQL server during query') [SQL: ...] [parameters: {...}] (Background on this error at: http://sqlalche.me/e/e3q8)

之后的任何查询都将照常成功。
这只是一个常见的用例,例如,一般来说,我可能希望根据错误重试任何查询。
有没有什么方法可以在较低级别上捕获并重试查询 sqlalchemy 应用程序编程接口?做尝试或习惯 query 方法在我的代码中是不合理的,因为我使用它太多次,它不可维护。

nkhmeac6

nkhmeac61#

非常感谢这个片段,我不得不对它进行一些修改,以便直接使用sqlalchemy.orm:如果它对任何人都有用的话。。

  1. from sqlalchemy.exc import OperationalError, StatementError
  2. from sqlalchemy.orm.query import Query as _Query
  3. from time import sleep
  4. class RetryingQuery(_Query):
  5. __max_retry_count__ = 3
  6. def __init__(self, *args,**kwargs):
  7. super().__init__(*args,**kwargs)
  8. def __iter__(self):
  9. attempts = 0
  10. while True:
  11. attempts += 1
  12. try:
  13. return super().__iter__()
  14. except OperationalError as ex:
  15. if "server closed the connection unexpectedly" not in str(ex):
  16. raise
  17. if attempts <= self.__max_retry_count__:
  18. sleep_for = 2**(attempts - 1)
  19. logging.error(
  20. "/!\ Database connection error: retrying Strategy => sleeping for {}s"
  21. " and will retry (attempt #{} of {}) \n Detailed query impacted: {}".format(
  22. sleep_for, attempts, self.__max_retry_count__, ex)
  23. )
  24. sleep(sleep_for)
  25. continue
  26. else:
  27. raise
  28. except StatementError as ex:
  29. if "reconnect until invalid transaction is rolled back" not in str(ex):
  30. raise
  31. self.session.rollback()

对于用法:将选项传递给sessionmaker:

  1. sqlalchemy.orm.sessionmaker(bind=engine, query_cls=RetryingQuery)
展开查看全部
dnph8jn4

dnph8jn42#

我不得不稍微调整它,使之与postgres一起工作,它有一个不同的错误消息。我知道这个问题是有标签的 mysql ,但是通过搜索发现了这个问题(并且有完全相同的问题),所以可能会帮助某人。
我还得接住 StatementError: (sqlalchemy.exc.InvalidRequestError) Can't reconnect until invalid transaction is rolled back 在重试之前 flask 就爆炸了。
最后我让它指数退避,因为为什么不呢?

  1. import logging
  2. from flask_sqlalchemy import BaseQuery
  3. from sqlalchemy.exc import OperationalError, StatementError
  4. from time import sleep
  5. class RetryingQuery(BaseQuery):
  6. __retry_count__ = 3
  7. def __init__(self, *args,**kwargs):
  8. super().__init__(*args,**kwargs)
  9. def __iter__(self):
  10. attempts = 0
  11. while True:
  12. attempts += 1
  13. try:
  14. return super().__iter__()
  15. except OperationalError as ex:
  16. if "server closed the connection unexpectedly" not in str(ex):
  17. raise
  18. if attempts < self.__retry_count__:
  19. sleep_for = 2**(attempts - 1)
  20. logging.error(
  21. "Database connection error: {} - sleeping for {}s"
  22. " and will retry (attempt #{} of {})".format(
  23. ex, sleep_for, attempts, self.__retry_count__
  24. )
  25. )
  26. sleep(sleep_for)
  27. continue
  28. else:
  29. raise
  30. except StatementError as ex:
  31. if "reconnect until invalid transaction is rolled back" not in str(ex):
  32. raise
  33. self.session.rollback()
展开查看全部
xqnpmsa8

xqnpmsa83#

sqlalchemy还允许您聆听 engine_connectconnection 已创建。这使得实现悲观断开处理的自定义逻辑成为可能
下面的代码段实现了指数退避以供重试。它取自apache airflow的sqlalchemy utils:http://airflow.apache.org/docs/1.10.3/_modules/airflow/utils/sqlalchemy.html

  1. @event.listens_for(engine, "engine_connect")
  2. def ping_connection(connection, branch):
  3. """
  4. Pessimistic SQLAlchemy disconnect handling. Ensures that each
  5. connection returned from the pool is properly connected to the database.
  6. http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html#disconnect-handling-pessimistic
  7. """
  8. if branch:
  9. # "branch" refers to a sub-connection of a connection,
  10. # we don't want to bother pinging on these.
  11. return
  12. start = time.time()
  13. backoff = initial_backoff_seconds
  14. # turn off "close with result". This flag is only used with
  15. # "connectionless" execution, otherwise will be False in any case
  16. save_should_close_with_result = connection.should_close_with_result
  17. while True:
  18. connection.should_close_with_result = False
  19. try:
  20. connection.scalar(select([1]))
  21. # If we made it here then the connection appears to be healthy
  22. break
  23. except exc.DBAPIError as err:
  24. if time.time() - start >= reconnect_timeout_seconds:
  25. log.error(
  26. "Failed to re-establish DB connection within %s secs: %s",
  27. reconnect_timeout_seconds,
  28. err)
  29. raise
  30. if err.connection_invalidated:
  31. log.warning("DB connection invalidated. Reconnecting...")
  32. # Use a truncated binary exponential backoff. Also includes
  33. # a jitter to prevent the thundering herd problem of
  34. # simultaneous client reconnects
  35. backoff += backoff * random.random()
  36. time.sleep(min(backoff, max_backoff_seconds))
  37. # run the same SELECT again - the connection will re-validate
  38. # itself and establish a new connection. The disconnect detection
  39. # here also causes the whole connection pool to be invalidated
  40. # so that all stale connections are discarded.
  41. continue
  42. else:
  43. log.error(
  44. "Unknown database connection error. Not retrying: %s",
  45. err)
  46. raise
  47. finally:
  48. # restore "close with result"
  49. connection.should_close_with_result = save_should_close_with_result
展开查看全部
h9a6wy2h

h9a6wy2h4#

显然地 sqlalchemy 有一个很好的自定义查询类的选项,这正是我所需要的。
类实现:

  1. import logging
  2. from flask_sqlalchemy import BaseQuery
  3. from sqlalchemy.exc import OperationalError
  4. from time import sleep
  5. class RetryingQuery(BaseQuery):
  6. __retry_count__ = 3
  7. __retry_sleep_interval_sec__ = 0.5
  8. def __init__(self, *args,**kwargs):
  9. super().__init__(*args,**kwargs)
  10. def __iter__(self):
  11. attempts = 0
  12. while True:
  13. attempts += 1
  14. try:
  15. return super().__iter__()
  16. except OperationalError as ex:
  17. if "Lost connection to MySQL server during query" not in str(ex):
  18. raise
  19. if attempts < self.__retry_count__:
  20. logging.debug(
  21. "MySQL connection lost - sleeping for %.2f sec and will retry (attempt #%d)",
  22. self.__retry_sleep_interval_sec__, attempts
  23. )
  24. sleep(self.__retry_sleep_interval_sec__)
  25. continue
  26. else:
  27. raise

用法:

  1. class BaseModel(Model):
  2. ...
  3. query_class = RetryingQuery
  4. ...
  5. db = SQLAlchemy(model_class=BaseModel, query_class=RetryingQuery)
展开查看全部

相关问题