您的位置 首页 编程知识

为什么Sqlalchemy数据库连接无法正确关闭?如何解决这个问题?

SQLAlchemy数据库连接的正确关闭方法及问题排查 在使用Python的SQLAlchemy库进行数据库操…

为什么Sqlalchemy数据库连接无法正确关闭?如何解决这个问题?

SQLAlchemy数据库连接的正确关闭方法及问题排查

在使用Python的SQLAlchemy库进行数据库操作时,确保数据库连接的正确关闭至关重要,以避免资源泄漏和性能问题。本文将分析一个常见的SQLAlchemy连接关闭问题,并提供解决方案。

以下代码片段展示了一个可能存在连接关闭问题的示例:

from sqlalchemy import create_engine, url, delete, update, select, exists from sqlalchemy.orm import sessionmaker, scoped_session from core.database.base import base  # 假设这是你的数据库基类 from lib.type import type  # 假设这是你的类型定义 from typing import Any from flask import g, current_app  import importlib import re   class Database:  # 类名改为首字母大写,符合Python规范      env = None      def set(self, key: str, value: Any):         """         设置属性值,根据环境变量设置到g.application或g.platform         """         if self.env == "application":             g.application = self.container._replace(**{key: value})         elif self.env == 'platform':             g.platform = self.container._replace(**{key: value})      @property     def container(self):         """         返回g.application或g.platform容器         """         if self.env == "application":             if "application" not in g:                 g.application = type.application(None, None, None)             return g.application         elif self.env == 'platform':             if "platform" not in g:                 g.platform = type.platform(None, None)             return g.platform      @property     def database_conf(self):         """         获取数据库配置         """         return base.setting(current_app.config["database"])      @property     def __database_core(self):         """         创建数据库会话,并缓存到实例属性         """         if not hasattr(self, '_database_core'):             self._database_core = self.__create_session(**self.database_conf)         return self._database_core      @property     def __create_engine(self):         """         获取数据库引擎,并缓存到实例属性         """         return self.__database_core.engine      @property     def __create_database(self):         """         获取数据库会话,并缓存到实例属性         """         return self.__database_core.session      def __create_session(self, **config):         """         创建数据库会话         """         engine = self.create_engine(**config)         session = scoped_session(sessionmaker(bind=engine, autoflush=True))         return type.database(engine=engine, session=session())      @classmethod     def create_engine(cls, **kwargs):         """         创建数据库引擎         """         return create_engine(url.create("mysql+pymysql", **kwargs), echo=True, isolation_level="autocommit")      @staticmethod     def create_all(models: list, engine=None):         """         创建所有模型的表         """         tables = [Database.get_model(model).__table__ for model in models]         base.metadata.create_all(bind=engine, tables=tables)      def create_table(self, tables: list):         """         创建指定模型的表         """         Database.create_all(models=tables, engine=self.__create_engine)      @staticmethod     def get_model(model: str):         """         获取模型对象         """         module = importlib.import_module(f"model.{model.split('_')[0]}.{model}")         class_name = ''.join(re.findall(r"[a-za-z]+", model.split(".")[-1].title()))         return getattr(module, class_name)()      @property     def database(self):         """         获取数据库会话         """         return self.__create_database      def table_data_query_all(self, model: Any, condition: list = None, order: list = None, limit: int = 500,                              fields: list = None) -> list[dict]:         """         查询所有数据         """         query = select(model)         if fields:             query = query.with_only_columns(*fields)         if condition:             query = query.filter(*condition)         if order:             query = query.order_by(*order)         results = [row.dict() for row in self.database.execute(query.limit(limit)).scalars()]         return results      def table_data_query_one(self, model: Any, condition: list = None) -> dict:         """         查询单条数据         """         result = self.database.execute(select(model).filter(*condition).limit(1)).scalar_one_or_none()         return None if result is None else result.dict()      def table_data_query_exists(self, condition: list) -> bool:         """         查询数据是否存在         """         return self.database.query(exists().where(*condition)).scalar()      def table_data_insert_all(self, models: list) -> None:         """         批量插入数据         """         with self.database as db:             db.add_all(models)             db.commit()      def table_data_insert_one(self, model, data: bool = False) -> int | dict:         """         插入单条数据         """         with self.database as db:             db.add(model)             db.commit()             return model.dict() if data else model.id      def table_data_update(self, model: Any, condition: list, data: dict) -> None:         """         更新数据         """         with self.database as db:             db.execute(update(model).where(*condition).values(**data))             db.commit() # 需要显式提交      def table_data_delete(self, model: Any, condition: list) -> None:         """         删除数据         """         with self.database as db:             db.execute(delete(model).where(*condition))             db.commit() # 需要显式提交      def close(self):         """         关闭数据库连接         """         if hasattr(self, '_database_core'):             self._database_core.session.close()             self._database_core.engine.dispose()             del self._database_core      def __del__(self):         """         析构函数,确保连接关闭         """         self.close()
登录后复制

改进说明:

  1. 类名规范: 将 database 改为 Database,符合Python命名规范。
  2. 属性缓存: 使用 @property 和实例属性缓存 _database_core,避免重复创建会话。
  3. 显式提交: 在table_data_update 和 table_data_delete 中添加了 db.commit(),确保事务提交。
  4. 资源释放: close() 方法中显式调用 session.close() 和 engine.dispose() 来释放资源。del self._database_core 删除缓存的会话对象。
  5. 异常处理: 可以考虑添加 try…except 块来处理潜在的异常,例如数据库连接错误。
  6. scoped_session 的使用: scoped_session 在 Flask 应用中通常配合 g 对象使用,确保每个请求使用独立的会话,并在请求结束时自动关闭。 但代码中没有体现Flask请求上下文管理,因此dispose()是必要的。如果使用Flask的上下文管理,dispose()可能不是必需的,但session.close()仍然是必要的。

主要问题在于 scoped_session 的使用和资源释放的时机。scoped_session 本身并不保证连接的自动关闭,它只是管理会话的范围。 self.database.get_bind().dispose() 在某些情况下可能无效,因为它可能无法正确地关闭底层的数据库连接。

因此,需要在合适的地方调用 close() 方法,或者在类的析构函数 __del__ 中调用 close() 方法,确保连接被正确关闭。 但是,依赖 __del__ 并非最佳实践,因为 Python 的垃圾回收机制不可预测。 推荐在使用完 Database 实例后,显式调用 instance.close()。

最佳实践:

  • 使用上下文管理器 (with 语句) 来管理数据库会话:这可以确保会话在代码块执行完毕后自动关闭。
  • 在 Flask 应用中,利用 Flask-SQLAlchemy 等扩展库,可以更方便地管理数据库连接和会话。 这些库通常会自动处理连接的关闭和释放。

通过以上改进,可以有效地解决 SQLAlchemy 数据库连接无法正确关闭的问题,并提高代码的健壮性和可维护性。 记住,显式地关闭连接是最佳实践,避免依赖垃圾回收机制。

以上就是Sqlalchemy数据库连接无法正确关闭?如何解决这个问题?的详细内容,更多请关注php中文网其它相关文章!

本文来自网络,不代表四平甲倪网络网站制作专家立场,转载请注明出处:http://www.elephantgpt.cn/8384.html

作者: nijia

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

联系我们

联系我们

18844404989

在线咨询: QQ交谈

邮箱: 641522856@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部