Flask 中一般使用 pymysql 或者 flask-sqlalchemy 操作数据库,本案例中使用 flask-sqlalchemy 进行基本的操作

安装

shell
1
2
$ activate
$ pip install flask-sqlalchemy pymysql

需配置数据库驱动,我使用的是 pymysql,默认驱动为 mysqldb,python3 好像并不支持 pip 安装 mysqldbpymysql 效率好像也更高一些

连接

uri 格式 dialect+driver://username:password@host:port/database

实例:mysql+pymysql://root:root@localhost:3306/mydatabase

配置

python
1
2
3
from flask_sqlalchemy import SQLAlchemy
app.config['SQLALCHEMY_DATABASE_URI'] = mysql+pymysql://root:root@localhost:3306/mydatabase
db = SQLAlchemy(app)

发现会有警告信息

可以设置 SQLALCHEMY_TRACK_MODIFICATIONSFalse

SQLALCHEMY_TRACK_MODIFICATIONS 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存,如果不必要的可以禁用它。

新建表

支持的字段类型

类型 说明
Integer 一个整数
String (size) 有长度限制的字符串
Text 一些较长的 unicode 文本
DateTime 表示为 Python datetime 对象的时间和日期
Float 存储浮点值
Boolean 存储布尔值
PickleType 存储为一个持久化的 Python 对象
LargeBinary 存储一个任意大的二进制数据
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(30), nullable=False)
email = db.Column(db.String(30), nullable=False, unique=True)
password = db.Column(db.String(30), nullable=False)
role = db.Column(db.Integer, nullable=False, default=1)
createdDate = db.Column(db.DateTime, default=datetime.now())


class Article(db.Model):
# ...
uid = db.Column(db.Integer, db.ForeignKey('user.id'))


db.create_all()

primary_key 主键

autoincrement 是否自增长

nullable 是否为空

default 默认值,只有在添加数据时如果没给 model 传值,会使用 default

server_default 生成表结构时的 字段默认值,只能为 str 类型,其他类型会报错,可用 db.text() 进行包裹,直接使用单引号或者双引号效果应该也是一样的

增删改查

http://www.pythondoc.com/flask-sqlalchemy/api.html

python
1
2
3
user = User(username='admin', email='admin@oyal.ml', password='aaa', role=1)
db.session.add(user)
db.session.commit()

python
1
2
3
# user = User.query.filter_by(username='admin').first()
user = User.query.filter(User.username == 'admin').first()
print(user.email) # admin@oyal.ml

image-202203221419686

filter_by() 用于简单的查询,不支持比较运算符

filterfilter_by 强大,支持比较运算符 ==、>、< 等,or、in 等语法

image-202203221421606

python
1
2
3
user = User.query.filter(User.username == 'admin').first()
user.email = 'oyal@qq.com'
db.session.commit()

plaintext
1
2
3
user = User.query.filter(User.username == 'admin').first()
db.session.delete(user)
db.session.commit()

一对多关系

外键 & 关系(ForeignKey & relationship)

如果需要查询文章的作者信息,需要两次查询,先查找文章 uid 再 通过 id 查询作者信息

python
1
2
3
article = Article.query.filter(Article.title == 'test').first()
print(article.author.username)
user = User.query.filter(User.id == article.uid).first()

可设置 relationship(),映射到 user

python
1
2
3
4
class Article(db.Model):
# ...
uid = db.Column(db.Integer, db.ForeignKey('user.id'))
author = db.relationship('User', backref=db.backref('articles'))

Article 模型添加一个 author 属性,可以通过个属性访问作者信息

backref 代表反向引用,可以通过 User.articles 查询这个模型所写的所有文章

python
1
2
article = Article.query.filter(Article.title == 'test').first()
print(article.author.username) # admin

查询作者所有文章可以使用

python
1
2
3
4
user = User.query.filter(User.id == '1').first()
print(user.articles) # [<Article 1>, <Article 2>]
for article in user.articles:
print(article.title) # test、test2

拆分 app 和 model

为了解决循环引用需要创建 db.py 新建数据库实例

  • 目录结构
plaintext
1
2
3
4
. 
├── app.py
├── db.py
├── models.py
  • db.py
python
1
2
3
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()
  • models.py
python
1
2
3
4
5
6
from db import db
from datetime import datetime


class User(db.Model):
# ...
  • app.py
python
1
2
3
4
5
from db import db
from models import User

db.init_app(app)
db.create_all(app=app)

扩展

mysql 中的默认时间值

mysql 中 timestamp 类型可设置两种默认值

CURRENT_TIMESTAMP :当我更新这条记录的时候,这条记录的这个字段不会改变

CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP :当我更新这条记录的时候,这条记录的这个字段将会改变

所以可对以下字段设置

python
1
2
3
4
5
6
from datet

class Article(db.Model):
# ...
createdDate = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'))
updatedDate = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))

sqlalchemy 上看到使用 onupdate 设置更新时间

python
1
2
3
4
5
6
7
8
import datetime

t = Table("mytable", metadata_obj,
Column('id', Integer, primary_key=True),

# define 'last_updated' to be populated with datetime.now()
Column('last_updated', DateTime, onupdate=datetime.datetime.now),
)

default & server_default

未使用 server_default 的表结构

Field Type Null Key Default Extra
id int NO PRI NULL auto_increment
username varchar(30) NO NULL
email varchar(30) NO UNI NULL
password varchar(30) NO NULL
role int NO NULL
createdDate datetime NO NULL

使用 server_default 的表结构

Field Type Null Key Default Extra
id int NO PRI NULL auto_increment
username varchar(30) NO NULL
email varchar(30) NO UNI NULL
password varchar(30) NO NULL
role int NO 1
createdDate datetime NO CURRENT_TIMESTAMP DEFAULT_GENERATED

这篇文章给我提供了很大的帮助 sqlalchemy中Column的默认值属性,非常感谢作者

model & json(dict)

JavaScript 中的 JSON(对象),Python 中叫做 dict(字典) {key: value}

JavaScript 中的 Array(数组),Python 中叫做 list(列表) []

flask-sqlalchemy 查询返回的数据类型为 class,在 flask 中无法直接 return,无法直接使用 session 保存

python
1
2
3
article = Article.query.first()
print(article) # <Article 1>
print(type(article)) # <class 'app.Article'>
  • 定义函数 to_dict 进行处理
python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def to_dict(data):
if type(data).__name__ == 'list':
new_list = []
for item in data:
new_dict = {}
for k, v in item.__dict__.items():
if not k.startswith('_sa_instance_state'):
new_dict[k] = v
new_list.append(new_dict)
return new_list
else:
new_dict = {}
for k, v in data.__dict__.items():
if not k.startswith('_sa_instance_state'):
new_dict[k] = v
return new_dict

print(article) # {'content': 'content1', 'title': 'title1', 'aid': 1, 'cid': 1}

如果想要返回 JSON 数据格式需要用 jsonify 处理,否则会报错。jsonify 会将字典或者列表转为 JSON,并且将响应的 Content-Type 设置为 application/json

python
1
2
3
from flask import jsonify

return jsonify(to_dict(articles))
  • 给 model 增加方法

https://stackoverflow.com/questions/5022066/how-to-serialize-sqlalchemy-result-to-json

python
1
2
3
4
5
6
class User:
def as_dict(self):
return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

article = Article.query.first()
article = article.to_dict()