Flask 中一般使用 pymysql 或者 flask-sqlalchemy 操作数据库,本案例中使用 flask-sqlalchemy 进行基本的操作

安装

1
2
$ activate
$ pip install flask-sqlalchemy pymysql

需配置数据库驱动,我使用的是 pymysql,默认驱动为 mysqldb,python3 好像并不支持 pip 安装 mysqldbpymysql 效率好像也更高一些

连接

uri 格式 dialect+driver://username:password@host:port/database

实例:mysql+pymysql://root:root@localhost:3306/mydatabase

配置

1
2
3
from flask_sqlalchemy import SQLAlchemy
app.config['SQLALCHEMY_DATABASE_URI'] = mysql+pymysql://root:root@localhost:3306/mydatabase
db = SQLAlchemy(app)

发现会有警告信息

可以设置 SQLALCHEMY_TRACK_MODIFICATIONSFalse

SQLALCHEMY_TRACK_MODIFICATIONS 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存,如果不必要的可以禁用它。

新建表

支持的字段类型

类型 说明
Integer 一个整数
String (size) 有长度限制的字符串
Text 一些较长的 unicode 文本
DateTime 表示为 Python datetime 对象的时间和日期
Float 存储浮点值
Boolean 存储布尔值
PickleType 存储为一个持久化的 Python 对象
LargeBinary 存储一个任意大的二进制数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class User(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(30), nullable=False)
email = db.Column(db.String(30), nullable=False, unique=True)
password = db.Column(db.String(30), nullable=False)
role = db.Column(db.Integer, nullable=False, default=1)
createdDate = db.Column(db.DateTime, default=datetime.now())


class Article(db.Model):
# ...
uid = db.Column(db.Integer, db.ForeignKey('user.id'))


db.create_all()

primary_key 主键

autoincrement 是否自增长

nullable 是否为空

default 默认值,只有在添加数据时如果没给 model 传值,会使用 default

server_default 生成表结构时的 字段默认值,只能为 str 类型,其他类型会报错,可用 db.text() 进行包裹,直接使用单引号或者双引号效果应该也是一样的

增删改查

http://www.pythondoc.com/flask-sqlalchemy/api.html

1
2
3
user = User(username='admin', email='[email protected]', password='aaa', role=1)
db.session.add(user)
db.session.commit()

1
2
3
# user = User.query.filter_by(username='admin').first()
user = User.query.filter(User.username == 'admin').first()
print(user.email) # [email protected]

image-202203221419686

filter_by() 用于简单的查询,不支持比较运算符

filterfilter_by 强大,支持比较运算符 ==、>、< 等,or、in 等语法

image-202203221421606

1
2
3
user = User.query.filter(User.username == 'admin').first()
user.email = '[email protected]'
db.session.commit()

1
2
3
user = User.query.filter(User.username == 'admin').first()
db.session.delete(user)
db.session.commit()

一对多关系

外键 & 关系(ForeignKey & relationship)

如果需要查询文章的作者信息,需要两次查询,先查找文章 uid 再 通过 id 查询作者信息

1
2
3
article = Article.query.filter(Article.title == 'test').first()
print(article.author.username)
user = User.query.filter(User.id == article.uid).first()

可设置 relationship(),映射到 user

1
2
3
4
class Article(db.Model):
# ...
uid = db.Column(db.Integer, db.ForeignKey('user.id'))
author = db.relationship('User', backref=db.backref('articles'))

Article 模型添加一个 author 属性,可以通过个属性访问作者信息

backref 代表反向引用,可以通过 User.articles 查询这个模型所写的所有文章

1
2
article = Article.query.filter(Article.title == 'test').first()
print(article.author.username) # admin

查询作者所有文章可以使用

1
2
3
4
user = User.query.filter(User.id == '1').first()
print(user.articles) # [<Article 1>, <Article 2>]
for article in user.articles:
print(article.title) # test、test2

拆分 app 和 model

为了解决循环引用需要创建 db.py 新建数据库实例

  • 目录结构
1
2
3
4
. 
├── app.py
├── db.py
├── models.py
  • db.py
1
2
3
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()
  • models.py
1
2
3
4
5
6
from db import db
from datetime import datetime


class User(db.Model):
# ...
  • app.py
1
2
3
4
5
from db import db
from models import User

db.init_app(app)
db.create_all(app=app)

扩展

mysql 中的默认时间值

mysql 中 timestamp 类型可设置两种默认值

CURRENT_TIMESTAMP :当我更新这条记录的时候,这条记录的这个字段不会改变

CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP :当我更新这条记录的时候,这条记录的这个字段将会改变

所以可对以下字段设置

1
2
3
4
5
6
from datet

class Article(db.Model):
# ...
createdDate = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP'))
updatedDate = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP'))

sqlalchemy 上看到使用 onupdate 设置更新时间

1
2
3
4
5
6
7
8
import datetime

t = Table("mytable", metadata_obj,
Column('id', Integer, primary_key=True),

# define 'last_updated' to be populated with datetime.now()
Column('last_updated', DateTime, onupdate=datetime.datetime.now),
)

default & server_default

未使用 server_default 的表结构

Field Type Null Key Default Extra
id int NO PRI NULL auto_increment
username varchar(30) NO NULL
email varchar(30) NO UNI NULL
password varchar(30) NO NULL
role int NO NULL
createdDate datetime NO NULL

使用 server_default 的表结构

Field Type Null Key Default Extra
id int NO PRI NULL auto_increment
username varchar(30) NO NULL
email varchar(30) NO UNI NULL
password varchar(30) NO NULL
role int NO 1
createdDate datetime NO CURRENT_TIMESTAMP DEFAULT_GENERATED

这篇文章给我提供了很大的帮助 sqlalchemy中Column的默认值属性,非常感谢作者

model & json(dict)

JavaScript 中的 JSON(对象),Python 中叫做 dict(字典) {key: value}

JavaScript 中的 Array(数组),Python 中叫做 list(列表) []

flask-sqlalchemy 查询返回的数据类型为 class,在 flask 中无法直接 return,无法直接使用 session 保存

1
2
3
article = Article.query.first()
print(article) # <Article 1>
print(type(article)) # <class 'app.Article'>
  • 定义函数 to_dict 进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def to_dict(data):
if type(data).__name__ == 'list':
new_list = []
for item in data:
new_dict = {}
for k, v in item.__dict__.items():
if not k.startswith('_sa_instance_state'):
new_dict[k] = v
new_list.append(new_dict)
return new_list
else:
new_dict = {}
for k, v in data.__dict__.items():
if not k.startswith('_sa_instance_state'):
new_dict[k] = v
return new_dict

print(article) # {'content': 'content1', 'title': 'title1', 'aid': 1, 'cid': 1}

如果想要返回 JSON 数据格式需要用 jsonify 处理,否则会报错。jsonify 会将字典或者列表转为 JSON,并且将响应的 Content-Type 设置为 application/json

1
2
3
from flask import jsonify

return jsonify(to_dict(articles))
  • 给 model 增加方法

https://stackoverflow.com/questions/5022066/how-to-serialize-sqlalchemy-result-to-json

1
2
3
4
5
6
class User:
def as_dict(self):
return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

article = Article.query.first()
article = article.to_dict()