ORM框架SQLAlchemy

简介

SQLAlchemy是用Python编程语言开发的一个开源项目。它提供了SQL工具包和ORM(对象关系映射)工具,使用MIT许可证发行。

SQLAlchemy采用简单的Python语言,提供高效和高性能的数据库访问,实现了完整的企业级持久模型。它的理念是,SQL数据库的量级和性能比对象集合重要,而对象集合的抽象又重要于表和行。

orm英文全称object relational mapping,就是对象映射关系程序,简单来说我们类似python这种面向对象的程序来说一切皆对象,但是我们使用的数据库却都是关系型的,为了保证一致的使用习惯,通过orm将编程语言的对象模型和数据库的关系模型建立映射关系,这样我们在使用编程语言对数据库进行操作的时候可以直接使用编程语言的对象模型进行操作就可以了,而不用直接使用sql语言。

img

orm的优点:

  1. 隐藏了数据访问细节,“封闭”的通用数据库交互,ORM的核心。他使得我们的通用数据库交互变得简单易行,并且完全不用考虑该死的SQL语句。快速开发,由此而来。
  2. ORM使我们构造固化数据结构变得简单易行。

缺点:

  1. 无可避免的,自动化意味着映射和关联管理,代价是牺牲性能(早期,这是所有不喜欢ORM人的共同点)。现在的各种ORM框架都在尝试使用各种方法来减轻这块(LazyLoad,Cache),效果还是很显著的。

img

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
MySQL-Python
``mysql+mysqldb:``//``<user>:<password>@<host>[:<port>]/<dbname>

pymysql
``mysql+pymysql:``//``<username>:<password>@<host>/<dbname>[?<options>]

MySQL-Connector
``mysql+mysqlconnector:``//``<user>:<password>@<host>[:<port>]/<dbname>

cx_Oracle
``oracle+cx_oracle:``//user``:pass@host:port``/dbname``[?key=value&key=value...]

更多详见:http:``//docs``.sqlalchemy.org``/en/latest/dialects/index``.html

安装

安装sqlalchemy

1
2
pip3 install sqlalchemy
pip3 install pymysql

本文使用MySQL作为数据库,使用pymysql作为驱动,因此需要安装pymysql

在连接数据库前,需要使用到一些配置信息,然后把它们组合成满足以下条件的字符串:

链接数据库

配置信息

1
dialect+driver://username:password@host:port/database
  • dialect:数据库,如:sqlite、mysql、oracle等
  • driver:数据库驱动,用于连接数据库的,本文使用pymysql
  • username:用户名
  • password:密码
  • host:IP地址
  • port:端口
  • database:数据库
1
2
3
4
5
6
7
8
HOST = 'localhost'
PORT = 3306
USERNAME = 'root'
PASSWORD = '123456'
DB = 'myclass'

## dialect + driver://username:passwor@host:port/database
DB_URI = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DB}'

建议将配置信息放到你的配置文件中,如config.py

创建引擎并连接数据库

1
2
3
4
5
6
7
8
9
from sqlalchemy import create_engine
from config import DB_URI


engine = create_engine(DB_URI) # 创建引擎
conn = engine.connect() # 连接
result = conn.execute('SELECT 1') # 执行SQL
print(result.fetchone())
conn.close() # 关闭连接

创建ORM模型并映射到数据库中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker
from config import DB_URI

engine = create_engine(DB_URI)
Base = declarative_base(engine) # SQLORM基类
session = sessionmaker(engine)() # 构建session对象


class Student(Base):
__tablename__ = 'student' # 表名
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
age = Column(Integer)
sex = Column(String(10))

class Student(Base):
__tablename__ = 'student' # 表名
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50))
age = Column(Integer)
sex = Column(String(10))
class School(Base):
__tablename__ = 'School' # 表名
id = Column(Integer, primary_key=True, autoincrement=True)
name=Column(String(50),nullable=False,index=True)
school_name=Column(String(50))
age = Column(Integer)

Base.metadata.create_all() # 将模型映射到数据库中

执行上面代码,将会在数据库中生成对应的映射表student。

新增

创建表后,接下来我们要添加数据,代码如下:

1
2
3
student = Student(name='Tony', age=18, sex='male')  # 创建一个student对象
session.add(student) # 添加到session
session.commit() # 提交到数据库

也可以批量添加数据:

1
2
3
4
5
session.add_all([
Student(name='Jane', age=16, sex='female'),
Student(name='Ben', age=20, sex='male')
])
session.commit()

查询

sqlalchemy提供了query()方法来查询数据

获取所有数据

1
2
3
4
item_list = session.query(Student).all()
print(item_list)
for item in item_list:
print(item.name, item.age)

执行结果如下

1
2
3
4
[<mymodel.Student object at 0x000002A0E6A38088>, <mymodel.Student object at 0x000002A0E6A38208>, <mymodel.Student object at 0x000002A0E6A38288>]
Tony 18
Jane 16
Ben 20

查询得到的item_list是一个包含多个Student对象的列表

指定查询列

1
2
3
4
item_list = session.query(Student.name).all()
print(item_list)

## [('Tony',), ('Jane',), ('Ben',)]

获取返回数据的第一行

1
2
3
4
item = session.query(Student.name).first()
print(item)

## ('Tony',)

使用filter()方法进行筛选过滤

1
2
3
4
item_list = session.query(Student.name).filter(Student.age >= 18).all()
print(item_list)

## [('Tony',), ('Ben',)]

使用order_by()进行排序

1
2
3
4
item_list = session.query(Student.name, Student.age).order_by(Student.age.desc()).all() # desc()表示倒序
print(item_list)

## [('Ben', 20), ('Tony', 18), ('Jane', 16)]

多个查询条件(and和or)

1
2
3
4
5
6
7
8
9
10
11
12
13
## 默认为and, 在filter()中用,分隔多个条件表示and
item_list = session.query(Student.name, Student.age, Student.sex).filter(
Student.age >= 10, Student.sex == 'female'
).all()
print(item_list) # [('Jane', 16, 'female')]

from sqlalchemy import or_

## 使用or_连接多个条件
item_list = session.query(Student.name, Student.age, Student.sex).filter(
or_(Student.age >= 20, Student.sex == 'female')
).all()
print(item_list) # [('Jane', 16, 'female'), ('Ben', 20, 'male')]

equal/like/in

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
## 等于
item_list = session.query(Student.name, Student.age, Student.sex).filter(
Student.age == 18
).all()
print(item_list) # [('Tony', 18, 'male')]

## 不等于
item_list = session.query(Student.name, Student.age, Student.sex).filter(
Student.age != 18
).all()
print(item_list) # [('Jane', 16, 'female'), ('Ben', 20, 'male')]

## like
item_list = session.query(Student.name, Student.age, Student.sex).filter(
Student.name.like('%To%')
).all()
print(item_list) # [('Tony', 18, 'male')]

## in
item_list = session.query(Student.name, Student.age, Student.sex).filter(
Student.age.in_([16, 20])
).all()
print(item_list) # [('Jane', 16, 'female'), ('Ben', 20, 'male')]

count计算个数

1
2
count = session.query(Student).count()
print(count) # 3

切片

1
2
item_list = session.query(Student.name).all()[:2]
print(item_list) # [('Tony',), ('Jane',)]

分组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from sqlalchemy.sql import func
res=session.query(Student.age).group_by(Student.age).all()
res=session.query(
func.max(Student.age),
func.min(Student.age),
func.sum(Student.age),
func.avg(Student.age),
func.count(Student.age),
).group_by(Student.age).all()


res=session.query(
Student.age,
func.count(1),
).group_by(Student.age).having(func.count(1) > 2).all()

连表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
res=session.query(School,Student).all() #select * from School,Student;
##where条件
res=session.query(School,Student).filter(School.age==Student.age).all()
## for row in res:
## School_tb=row[0]
## Student_tb=row[1]
## print(School_tb.id,School_tb.name,Student_tb.id,Student_tb.name)

##内连接
res=session.query(Student).join(School)
##join默认为内连接,SQLAlchemy会自动帮我们通过foreign key字段去找关联关系
##但是上述查询的结果均为Student表的字段,这样链表还有毛线意义,于是我们修改为
res=session.query(Student.id,Student.name,School.name).join(School).all()

##左连接:isouter=True
res=session.query(Student.id,Student.name,School.dname).join(School,isouter=True).all()

##右连接:同左连接,只是把两个表的位置换一下

组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
##七、组合
q1=session.query(Emp.id,Emp.ename).filter(Emp.id > 0,Emp.id < 5)
q2=session.query(Emp.id,Emp.ename).filter(
or_(
Emp.ename.like('%海%'),
Emp.ename.like('%昊%'),
)
)
res1=q1.union(q2) #组合+去重
res2=q1.union_all(q2) #组合,不去重

print([i.ename for i in q1.all()]) #['林海峰', '李杰', '武配齐', '元昊']
print([i.ename for i in q2.all()]) #['林海峰', '元昊']
print([i.ename for i in res1.all()]) #['林海峰', '李杰', '武配齐', '元昊']
print([i.ename for i in res2.all()]) #['林海峰', '李杰', '武配齐', '元昊', '元昊', '林海峰']

修改

修改数据可以使用update()方法,update完成后记得执行session.commit()

1
2
3
4
5
6
## 修改Tony的age为22
session.query(Student).filter(Student.name == 'Tony').update({'age': 22})
session.commit()

item = session.query(Student.name, Student.age).filter(Student.name == 'Tony').first()
print(item)

执行结果如下

1
('Tony', 22)

删除

删除数据使用delete()方法,同样也需要执行session.commit()提交事务

1
2
3
4
5
6
## 删除名称为Ben的数据
session.query(Student).filter(Student.name == 'Ben').delete()
session.commit()

item_list = session.query(Student.name, Student.age).all()
print(item_list)

执行结果如下

1
[('Tony', 22), ('Jane', 16)]

原文出自:蓝莓薄荷
有少些补充