# Tornado框架async异步调用数据库 -- 潘登同学的Tornado学习笔记
环境搭建
- 把前端代码拿过来
- 数据库连接上
from tornado import web
from tornado import ioloop
# 用来处理请求,并响应结果
class IndexHandler(web.RequestHandler):
async def get(self):
self.render('personal.html') # 注意在app中设置模板路径
async def post(self):
pass
if __name__ == "__main__":
app = web.Application([
('/',IndexHandler),
],debug=True,
template_path = './demo2/',
static_path='./demo2/',
static_url_prefix='/static/',
)
# 设置监听端口
app.listen(5000)
# 通过时间循环来监听访问的端口
ioloop.IOLoop.current().start()
异步操作数据库
pip install aiomysql
aiomysql
,它的底层大量的使用了 pymysql
,只是它通过 asyncio
实现的访问数据库
写一个测试用例吧...
import aiomysql
import asyncio
from tornado import ioloop
async def select_db():
# 获取客户端的连接处
async with aiomysql.create_pool(host="127.0.0.1",port=3306,user='root',password='xxx',db='tornado_db') as pool:
# 获取一个链接,用来获取游标
async with pool.acquire() as con:
# 获取一个用游标,用来操作数据库
async with con.cursor() as cur:
# 执行sql
sql = 'select * from t_user'
await cur.execute(sql)
rs = await cur.fetchone()
print(rs)
if __name__ == '__main__':
# select_db() 直接运行会警告
# 第二种方式 asyncio
# loop = asyncio.get_event_loop()
# loop.run_until_complete(select_db())
# 第三种方式 使用tornado
ioloop.IOLoop.current().run_sync(select_db)
使用aiomysql进行增查改操作
from tornado import web
from tornado import ioloop
import aiomysql
# 用来处理请求,并响应结果
class IndexHandler(web.RequestHandler):
def initialize(self,mysql):
self.mysql = mysql
async def get(self):
# 获取数据库信息
# 获取客户端的连接处
async with aiomysql.create_pool(host=self.mysql['host'],
port=self.mysql['port'],
user=self.mysql['user'],
password=self.mysql['password'],
db=self.mysql['db']) as pool:
# 获取一个链接,用来获取游标
async with pool.acquire() as con:
# 获取一个用游标,用来操作数据库
async with con.cursor() as cur:
# 执行sql
sql = 'select * from t_user'
await cur.execute(sql)
rs = await cur.fetchone()
self.render('personal.html',user=rs) # 注意在app中设置模板路径
async def post(self):
uname = self.get_argument('uname')
nick_name = self.get_argument('nick_name')
email = self.get_argument('email')
password = self.get_argument('password')
phone = self.get_argument('phone')
language = self.get_argument('language')
try:
id = self.get_argument('id')
except:
id = None
args = [uname,nick_name,email,password,phone,language]
async with aiomysql.create_pool(
host=self.mysql['host'],
port=self.mysql['port'],
user=self.mysql['user'],
password=self.mysql['password'],
db=self.mysql['db'],
) as pool:
async with pool.acquire() as con:
async with con.cursor() as cur:
if not id:
sql = 'insert into t_user value(0,%s,%s,%s,%s,%s,%s)'
# 执行增加
await cur.execute(sql,args)
# 提交事务
await con.commit()
# 获取生成的id
id = cur.lastrowid
else:
sql = 'update t_user set uname=%s, nick_name=%s, email=%s, pwd=%s, phone=%s, language=%s where id=%s'
await cur.execute(sql,args+[id])
await con.commit()
args.insert(0, id)
self.render('personal.html',user=args)
if __name__ == "__main__":
settings = {
'template_path': './demo2/',
'static_path': './demo2/',
'static_url_prefix': '/static/',
'debug': True,
'mysql':{
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'password': 'xxx',
'db': 'tornado_db',
}
}
app = web.Application([
web.URLSpec('/',IndexHandler,{'mysql':settings.get('mysql')}),
],**settings)
# 设置监听端口
app.listen(5000)
# 通过时间循环来监听访问的端口
ioloop.IOLoop.current().start()
ORM--peewee的使用
pip install pymysql
pip install peewee
使用方法
- 创建数据库的实例
- 创建一个指定数据库的模型类
- 继承
peewee.Model
- 重写类
Meta
并指定database属性为数据库实例
- 继承
字段类型
常用字段
- AutoField: 专门用于设置主键且自增长的字段 -- integer
- IntegerField: 整数类型 -- integer
- FloatField: 浮点类型 -- real
- DecimalField: 钱类型 -- numeric
- CharField: 字符串类型 -- varchar
- TextField: 字符串类型(没有长度限制) -- text
- UUIDField: 字符串类型 -- varchar(40)
- TimestampField: 时间戳 -- integer(秒数)
- DateTimeField: 年月日时分秒 -- datetime
- BooleanField: 布尔类型 -- bool
- ForeignKeyField: 外键 -- integer
常用参数
- null=False:允许为空
- index=False:不是索引
- unique=False:不要求唯一
- column_name=None:映射表中的字段名,如果为None,那么采用变量名当做字段名
- default=None:默认值
- primary_key=True:为主键
- constraints=None:约束
- verbose_name:字段注释
from peewee import *
# 创建数据库模型类
db = MySQLDatabase('tornado_db',host='127.0.0.1',
port=3306,user='root',passwd='xxx')
# 创建模型类
class Company(Model):
# 默认会生成ID
name = CharField(verbose_name='公司名称')
full_name = CharField()
year = IntegerField(2050)
class Meta:
database = db
# 设置表名
table_name = 't_company'
# 映射表结构
def init_table():
db.create_tables([Company])
if __name__ == '__main__':
init_table()
增加数据
def add_one_data():
c1 = Company(name = '抖吟', full_name = '抖出强大,吟唱人生', year = 2018)
c1.save()
def add_one_data1():
# 这里的company既可以写对象(比如上面c1类似的) 也可以写他的id
m1 = Music(name='大雨', singer='周深', duration='2:30', _type='pop', company=1)
m1.save()
数据查询
普通查询
def query_data():
m1 = Music.get(Music.id == 2)
m2 = Music.get_by_id(3)
m3 = Music[3] # 与上一句类似
print(m1.singer)
print(m2.singer)
print(m3.singer)
数据筛选
类名.select 方法可用于筛选字段
def query_data2():
temp_rs = Music.select() # 默认是所有列与数据只会生成SQL
print(temp_rs)
temp_iter = Music.select().execute() # 生成的是一个可迭代对象
for m in temp_iter:
print(f'{m.id}=={m.name}--{m.singer}=={m._type}')
temp_iter = Music.select(Music.id,Music.name,Music.singer).execute() # 过滤列
for m in temp_iter:
print(f'{m.id}=={m.name}--{m.singer}=={m._type}')
过滤条件
类名.select.where 方法可用于过滤条件
def query_data3():
m1 = Music.select().where(Music.id > 5)
for m in m1:
print(f'{m.id}=={m.singer}')
m2 = Music.select().where((Music.id > 5) & (Music.singer == 'By2'))
for m in m2:
print(f'{m.id}=={m.singer}=={m._type}')
m3 = Music.select().where((Music.id > 6) | (Music.singer == '林志炫'))
for m in m3:
print(f'{m.id}=={m.singer}=={m.duration}')
模糊查询
def query_like():
m1 = Music.select().where(Music.name.contains('你'))
for m in m1:
print(f'{m.id}=={m.name}=={m._type}')
数据排序
类名.select.order_by 方法可用于数据排序
def query_order_by():
# m1 = Music.select().order_by(Music.id.asc()) # 默认升序排列
# m1 = Music.select().order_by(+Music.id) # 默认升序排列
m1 = Music.select().order_by(Music.id.desc()) # 降序排列
m1 = Music.select().order_by(-Music.id) # 降序排列
for m in m1:
print(f'{m.id}=={m.name}=={m._type}')
数据分页
类名.select.paginate 方法可用于分页
def query_by_page():
# select * from t_music limit 0,3
m1 = Music.select().paginate(1,3)
# 第一个参数表示第几页(从1开始), 第二个参数是表示每页显示几条
for m in m1:
print(f'{m.id}=={m.name}=={m._type}')
数据修改
在peewee中要更新数据,可以使用以下2种方式:
- 直接修改对象的属性,并保存(save)
- 通过类名.update方法修改,并执行(execute方法)
def update_data():
m = Music.get_by_id(2)
m.singer = '林小炫'
m.save()
# 生成一个SQL对象
# sql_object = Music.update(singer='林大炫').where(Music.id == 2)
# sql_object.execute()
m2 = Music.get_by_id(2)
print(m2.singer)
数据删除
在peewee中要删除数据,可以使用以下2种方式:
- 直接调用对象的
delete_instance
方法 - 通过
类名.delete
方法,并执行(execute方法)
def delete_data():
m = Music.get_by_id(9)
m.delete_instance()
# sql_object = Music.delete().where(Music.id > 5)
# sql_object.execute()
警告解决
运行代码,总是提示警告 Warning: (3090, "Changing sql mode'NO_AUTO_CREATE_USER' is deprecated. It will be removed in a future release.")result = self._query(query)
解决方案
set @@GLOBAL.sql_mode='';
set sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';
异步peewee
现成的异步peewee https://github.com/05bit/peewee-async
PostgreSQL
pip install --pre peewee-async; pip install aiopg
MySQL
pip install --pre peewee-async; pip install aiomysql
使用方法
类的创建基本与peewee一样,需要注意的是:
- 创建peewee_async异步的数据库实例
- 修改Model里面的database属性为 peewee_async的实例
import peewee_async
from peewee import *
import asyncio
# 创建数据库模型类
db = peewee_async.MySQLDatabase('tornado_db',host='127.0.0.1',
port=3306,user='root',password='xxx')
manager = peewee_async.Manager(database=db)
# 创建模型类
class Company(Model):
# 默认会生成ID
name = CharField(verbose_name='公司名称')
full_name = CharField(verbose_name='公司全名')
year = IntegerField(verbose_name='开业时间')
class Meta:
database = db
# 设置表名
table_name = 't_company'
class Music(Model):
name = CharField(verbose_name='音乐名称')
singer = CharField(verbose_name='演唱者')
duration = CharField(verbose_name='时长')
_type = CharField(verbose_name='类型')
# 如果没有写field,会默认用另一张表的主键做外键
# ForeignKeyField会自动生成一个company_id作为外键
company = ForeignKeyField(Company,verbose_name='版权所属',backref='musics')
class Meta:
database = db
# 设置表名
table_name = 't_music'
# -------------------------------------------------------- #
def create_data():
Music.create(name='我以为我可以一了百了', singer='林志炫', duration="3:10", _tyep='pop', company=1)
async def create_data_by_manager():
await manager.create(Music,name='逆战', singer='张杰', duration='2:20', _type='pop', company=2)
async def query_data():
rs = await manager.execute(Music.select().where(Music.id>5))
for i in rs:
print(f'{i.id}=={i.name}=={i.singer}')
if __name__ == '__main__':
# create_data()
loop = asyncio.get_event_loop()
loop.run_until_complete(create_data_by_manager())
loop.run_until_complete(query_data())
loop.close()