0%

Python之SqlAlchemy

SqlAlchemy

1. 理解

之前使用的是 pymysql 插件,类似于 JDBC 的连接方式,通过写大量的 sql 语句来进行关联表、筛选字段等功能。SqlAlchemy 和 Flask-SqlAlchemy(后者是针对于 Flask 框架做了进一步的优化)则是通过类似于 Mybatis 的方式,将模型与表进行匹配,将 sql 中的关键字提取成方法,以此来获取数据。使用下来的话,官方文档给的例子或者说明并不是很清楚,需要多去网上找例子。

Flask-SqlAlchemy 通过配置可以自动管理多个数据源,比如默认数据源和数据源2,数据源3,在model里添加__bind_key__="数据源别名"来自动在query执行时切换数据源

2. 使用

2.1 数据库连接

通过读取配置文件获取参数,然后创建一个连接引擎,这个引擎会返回一个session对话,通过这个对话来对数据库进行操作

1
2
3
4
5
6
7
8
9
# max_overflow: # 超过连接池大小外最多创建的连接
# pool_size: # 连接池大小
def getSession():
host, port, user, password, db = Conf.getDbConf()
linkStr = "mysql+pymysql://" + user + ":" + password + "@" + host + ":" + str(port) + "/" + db
engine = create_engine(linkStr, max_overflow=6, pool_size=6)
DBSession = sessionmaker(bind=engine)
session = DBSession()
return session

2.2 模型与表的关联

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

user_base = declarative_base()


# 对应 t_user 表的定义
class User(user_base):
__tablename__ = 't_user' # 这里是数据库中对应表名
id = Column(Integer, primary_key=True)
itcode = Column(String(50))
username = Column(String(50))
password = Column(String(50))
truename = Column(String(50))
email = Column(String(50))
is_active = Column(String(10))

2.3 增

1
2
3
4
5
6
7
# 如果表里存在自增字段,那么可以通过下面方式插入:创建一个类的实例,并显式写出每个字段的赋值,之后获取到session会话实例,将类的实例将入到会话中,并提交,最后将会话关闭
# 除查询外,其他三种都需要执行 session.commit() 来提交操作
new_user = User(itcode=itcode, username=username, password=password, truename=truename, email=email, is_active=is_active)
session = db.getSession()
session.add(new_user)
session.commit()
session.close()

2.4 删

1
2
3
# 简单来看这也算是一种查询:查询 User 类对应的数据表,过滤出 id 等于 del_id 的数据,调用 delete() 方法进行删除
# 这里的 del_res 返回的是受影响的总行数
del_res = session.query(User).filter(User.id == del_id).delete()

2.5 query

session.query() 括号内可以是某个类,比如 session.query(User) 返回的结果就是 User 这个类对应的表的所有字段;也可以是类的字段,比如 session.query(User.id, User.username) 返回的结果就是 id 和 username 两个字段

2.6 join

目前用到的表和表之间关联有两种方式

  • 直接在类中添加外键定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from model.User import User

domain_base = declarative_base()


# 对应 t_domain 表的定义
class Domain(domain_base):
__tablename__ = 't_domain'
id = Column(Integer, primary_key=True)
name = Column(String(50))
region = Column(String(50))
responser_id = Column(Integer, ForeignKey(User.id)) # 业务域负责人
responser_truename = Column(String(50))
is_active = Column(String(10))

这种定义方式存在找不到或者不全的情况,更倾向于第二种

  • 在 join 时显示写出
1
2
3
4
5
6
7
8
9
10
res = session \
.query(UserDomainRole.id, UserDomainRole.user_id, User.truename,
UserDomainRole.domain_id, Domain.name.label("domain_name"),
UserDomainRole.role_id, Role.name.label("role_name"),
UserDomainRole.create_time, UserDomainRole.update_time,
UserDomainRole.is_active) \
.join(UserDomainRole, User.id == UserDomainRole.user_id) \
.join(Domain, Domain.id == UserDomainRole.domain_id) \
.join(Role, Role.id == UserDomainRole.role_id) \
.filter(UserDomainRole.id == query_id).all()

2.7 别名

如果 query 中存在多个重复字段,那么可以在后面添加 label("别名") 的方式来进行区分

2.8 filter

如果是已确定的条件,那么通过 session.query().filter(User.id == query_id) 这样显式写出来没问题,但是多数情况下,类似于 Mybatis 中 where 1 = 1 后跟着 if 判断语句,并不清楚会有几个查询条件需要放到 filter 中,这时候需要换成下面的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 需要 filter 中遍历的参数
params = list()
if user_id != "":
params.append(类.user_id == user_id)
if domain_id != "":
params.append(类.domain_id == domain_id)
if role_id != "":
params.append(类.role_id == role_id)
if is_active != "":
params.append(类.is_active == is_active)

session = db.getSession()
res = session.query(字段).join().join().filter(*params).filter().all()
# 可以同时写多个 filter 条件来过滤
# 将params传到 filter 中,会自动进行赋值,注意要用 *params
# 最后调用 all() 方法返回所有数据,first() 返回第一条数据, one() 返回一条数据

2.9 分页

1
2
3
4
# 通过 limit 和 offset 来实现分页
# limit(10) 表示返回10条数据
# offset(lower) 表示把下标移动到 lower 的位置,然后从下一个位置开始返回
res = session.query().join().filter().limit(10).offset(lower).all()

2.10 排序

1
2
通过 order_by() 方法来进行数据排序,可以添加多个字段排序规则
order_by(类1.字段1, 类2.字段2.desc())

2.11 改

1
2
3
4
5
6
7
8
9
10
11
12
# 和 filter 中查询条件一样,多数情况下不知道更新哪几个字段,需要动态赋值
# 和 filter 不一样的是,filter(*params) 和 update(params)
params = {}
if user_id != "":
params["user_id"] = user_id
if domain_id != "":
params["domain_id"] = domain_id
if role_id != "":
params["role_id"] = role_id
if is_active != "":
params["is_active"] = is_active
update_res = session.query(类).filter(类.id == query_id).update(params)

2.12 聚合函数使用

2.12.1 group_concat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 对于分组来说,可以直接调用 group_by() 方法来指定按照哪些字段来分组
# 对于像 group_concat() 这样的函数使用,需要引用 func
from sqlalchemy.sql import func

res = session \
.query(User.id, User.itcode, User.username, User.password, User.truename, User.email,
func.group_concat(UserDomainRole.domain_id), func.group_concat(Domain.name),
func.group_concat(Domain.responser_id), func.group_concat(Domain.responser_truename),
func.group_concat(UserDomainRole.role_id), func.group_concat(Role.name)) \
.join(UserDomainRole, User.id == UserDomainRole.user_id) \
.join(Domain, Domain.id == UserDomainRole.domain_id) \
.join(Role, Role.id == UserDomainRole.role_id) \
.filter(User.id == query_id, UserDomainRole.is_active == 'Y', Domain.is_active == 'Y',
Role.is_active == 'Y') \
.group_by(User.id, User.itcode, User.username, User.password, User.truename, User.email).all()
2.12.2 count
1
2
3
4
5
6
7
8
# 虽然有直接的 count() 方法,但是在数据量大的时候,会很慢,所以需要换一种方式
first = session.query(UserDomainRole.id, User.id, Domain.id, Role.id) \
.join(UserDomainRole, User.id == UserDomainRole.user_id)\
.join(Domain, Domain.id == UserDomainRole.domain_id)\
.join(Role, Role.id == UserDomainRole.role_id)\
.filter(*params)\
.filter(Domain.is_active == 'Y', Role.is_active == 'Y')
totalCount = first.with_entities(func.count(UserDomainRole.id)).scalar()