SqlAlchemy
1. 理解
之前使用的是 pymysql 插件,类似于 JDBC 的连接方式,通过写大量的 sql 语句来进行关联表、筛选字段等功能。SqlAlchemy 和 Flask-SqlAlchemy(后者是针对于 Flask 框架做了进一步的优化)则是通过类似于 Mybatis 的方式,将模型与表进行匹配,将 sql 中的关键字提取成方法,以此来获取数据。使用下来的话,官方文档给的例子或者说明并不是很清楚,需要多去网上找例子。
Flask-SqlAlchemy 通过配置可以自动管理多个数据源,比如默认数据源和数据源2,数据源3,在model里添加__bind_key__="数据源别名"
来自动在query执行时切换数据源
2. 使用
2.1 数据库连接
通过读取配置文件获取参数,然后创建一个连接引擎,这个引擎会返回一个session对话,通过这个对话来对数据库进行操作
1 2 3 4 5 6 7 8 9
| # max_overflow: # 超过连接池大小外最多创建的连接 # pool_size: # 连接池大小 def getSession(): host, port, user, password, db = Conf.getDbConf() linkStr = "mysql+pymysql://" + user + ":" + password + "@" + host + ":" + str(port) + "/" + db engine = create_engine(linkStr, max_overflow=6, pool_size=6) DBSession = sessionmaker(bind=engine) session = DBSession() return session
|
2.2 模型与表的关联
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from sqlalchemy import Column, Integer, String from sqlalchemy.ext.declarative import declarative_base
user_base = declarative_base()
# 对应 t_user 表的定义 class User(user_base): __tablename__ = 't_user' # 这里是数据库中对应表名 id = Column(Integer, primary_key=True) itcode = Column(String(50)) username = Column(String(50)) password = Column(String(50)) truename = Column(String(50)) email = Column(String(50)) is_active = Column(String(10))
|
2.3 增
1 2 3 4 5 6 7
| # 如果表里存在自增字段,那么可以通过下面方式插入:创建一个类的实例,并显式写出每个字段的赋值,之后获取到session会话实例,将类的实例将入到会话中,并提交,最后将会话关闭 # 除查询外,其他三种都需要执行 session.commit() 来提交操作 new_user = User(itcode=itcode, username=username, password=password, truename=truename, email=email, is_active=is_active) session = db.getSession() session.add(new_user) session.commit() session.close()
|
2.4 删
1 2 3
| # 简单来看这也算是一种查询:查询 User 类对应的数据表,过滤出 id 等于 del_id 的数据,调用 delete() 方法进行删除 # 这里的 del_res 返回的是受影响的总行数 del_res = session.query(User).filter(User.id == del_id).delete()
|
2.5 query
session.query()
括号内可以是某个类,比如 session.query(User)
返回的结果就是 User 这个类对应的表的所有字段;也可以是类的字段,比如 session.query(User.id, User.username)
返回的结果就是 id 和 username 两个字段
2.6 join
目前用到的表和表之间关联有两种方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.ext.declarative import declarative_base from model.User import User
domain_base = declarative_base()
# 对应 t_domain 表的定义 class Domain(domain_base): __tablename__ = 't_domain' id = Column(Integer, primary_key=True) name = Column(String(50)) region = Column(String(50)) responser_id = Column(Integer, ForeignKey(User.id)) # 业务域负责人 responser_truename = Column(String(50)) is_active = Column(String(10))
|
这种定义方式存在找不到或者不全的情况,更倾向于第二种
1 2 3 4 5 6 7 8 9 10
| res = session \ .query(UserDomainRole.id, UserDomainRole.user_id, User.truename, UserDomainRole.domain_id, Domain.name.label("domain_name"), UserDomainRole.role_id, Role.name.label("role_name"), UserDomainRole.create_time, UserDomainRole.update_time, UserDomainRole.is_active) \ .join(UserDomainRole, User.id == UserDomainRole.user_id) \ .join(Domain, Domain.id == UserDomainRole.domain_id) \ .join(Role, Role.id == UserDomainRole.role_id) \ .filter(UserDomainRole.id == query_id).all()
|
2.7 别名
如果 query 中存在多个重复字段,那么可以在后面添加 label("别名")
的方式来进行区分
2.8 filter
如果是已确定的条件,那么通过 session.query().filter(User.id == query_id)
这样显式写出来没问题,但是多数情况下,类似于 Mybatis 中 where 1 = 1
后跟着 if 判断语句,并不清楚会有几个查询条件需要放到 filter 中,这时候需要换成下面的写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| # 需要 filter 中遍历的参数 params = list() if user_id != "": params.append(类.user_id == user_id) if domain_id != "": params.append(类.domain_id == domain_id) if role_id != "": params.append(类.role_id == role_id) if is_active != "": params.append(类.is_active == is_active)
session = db.getSession() res = session.query(字段).join().join().filter(*params).filter().all() # 可以同时写多个 filter 条件来过滤 # 将params传到 filter 中,会自动进行赋值,注意要用 *params # 最后调用 all() 方法返回所有数据,first() 返回第一条数据, one() 返回一条数据
|
2.9 分页
1 2 3 4
| # 通过 limit 和 offset 来实现分页 # limit(10) 表示返回10条数据 # offset(lower) 表示把下标移动到 lower 的位置,然后从下一个位置开始返回 res = session.query().join().filter().limit(10).offset(lower).all()
|
2.10 排序
1 2
| 通过 order_by() 方法来进行数据排序,可以添加多个字段排序规则 order_by(类1.字段1, 类2.字段2.desc())
|
2.11 改
1 2 3 4 5 6 7 8 9 10 11 12
| # 和 filter 中查询条件一样,多数情况下不知道更新哪几个字段,需要动态赋值 # 和 filter 不一样的是,filter(*params) 和 update(params) params = {} if user_id != "": params["user_id"] = user_id if domain_id != "": params["domain_id"] = domain_id if role_id != "": params["role_id"] = role_id if is_active != "": params["is_active"] = is_active update_res = session.query(类).filter(类.id == query_id).update(params)
|
2.12 聚合函数使用
2.12.1 group_concat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| # 对于分组来说,可以直接调用 group_by() 方法来指定按照哪些字段来分组 # 对于像 group_concat() 这样的函数使用,需要引用 func from sqlalchemy.sql import func
res = session \ .query(User.id, User.itcode, User.username, User.password, User.truename, User.email, func.group_concat(UserDomainRole.domain_id), func.group_concat(Domain.name), func.group_concat(Domain.responser_id), func.group_concat(Domain.responser_truename), func.group_concat(UserDomainRole.role_id), func.group_concat(Role.name)) \ .join(UserDomainRole, User.id == UserDomainRole.user_id) \ .join(Domain, Domain.id == UserDomainRole.domain_id) \ .join(Role, Role.id == UserDomainRole.role_id) \ .filter(User.id == query_id, UserDomainRole.is_active == 'Y', Domain.is_active == 'Y', Role.is_active == 'Y') \ .group_by(User.id, User.itcode, User.username, User.password, User.truename, User.email).all()
|
2.12.2 count
1 2 3 4 5 6 7 8
| # 虽然有直接的 count() 方法,但是在数据量大的时候,会很慢,所以需要换一种方式 first = session.query(UserDomainRole.id, User.id, Domain.id, Role.id) \ .join(UserDomainRole, User.id == UserDomainRole.user_id)\ .join(Domain, Domain.id == UserDomainRole.domain_id)\ .join(Role, Role.id == UserDomainRole.role_id)\ .filter(*params)\ .filter(Domain.is_active == 'Y', Role.is_active == 'Y') totalCount = first.with_entities(func.count(UserDomainRole.id)).scalar()
|