Tornado框架async异步调用数据库

作者: pdnbplus | 发布时间: 2024/06/20 | 阅读量: 256

# Tornado框架async异步调用数据库 -- 潘登同学的Tornado学习笔记

环境搭建

  • 把前端代码拿过来
  • 数据库连接上
from tornado import web
from tornado import ioloop
# 用来处理请求,并响应结果
class IndexHandler(web.RequestHandler):
    async def get(self):
        self.render('personal.html')  # 注意在app中设置模板路径

    async def post(self):
        pass

if __name__ == "__main__":
    app = web.Application([
        ('/',IndexHandler),
    ],debug=True,
    template_path = './demo2/',
    static_path='./demo2/',
    static_url_prefix='/static/',
    )
    # 设置监听端口
    app.listen(5000)
    # 通过时间循环来监听访问的端口
    ioloop.IOLoop.current().start()

异步操作数据库

pip install aiomysql

aiomysql ,它的底层大量的使用了 pymysql ,只是它通过 asyncio 实现的访问数据库

写一个测试用例吧...

import aiomysql
import asyncio
from tornado import ioloop
async def select_db():
    # 获取客户端的连接处
    async with aiomysql.create_pool(host="127.0.0.1",port=3306,user='root',password='xxx',db='tornado_db') as pool:
        # 获取一个链接,用来获取游标
        async with pool.acquire() as con:
            # 获取一个用游标,用来操作数据库
            async with con.cursor() as cur:
                # 执行sql
                sql = 'select * from t_user'
                await cur.execute(sql)
                rs = await cur.fetchone()
                print(rs)

if __name__ == '__main__':
    # select_db()   直接运行会警告
    # 第二种方式 asyncio
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(select_db())

    # 第三种方式 使用tornado
    ioloop.IOLoop.current().run_sync(select_db)

aiomysql的使用

使用aiomysql进行增查改操作

from tornado import web
from tornado import ioloop
import aiomysql

# 用来处理请求,并响应结果
class IndexHandler(web.RequestHandler):
    def initialize(self,mysql):
        self.mysql = mysql

    async def get(self):
        # 获取数据库信息
        # 获取客户端的连接处
        async with aiomysql.create_pool(host=self.mysql['host'],
                                        port=self.mysql['port'],
                                        user=self.mysql['user'],
                                        password=self.mysql['password'],
                                        db=self.mysql['db']) as pool:
            # 获取一个链接,用来获取游标
            async with pool.acquire() as con:
                # 获取一个用游标,用来操作数据库
                async with con.cursor() as cur:
                    # 执行sql
                    sql = 'select * from t_user'
                    await cur.execute(sql)
                    rs = await cur.fetchone()
        self.render('personal.html',user=rs)  # 注意在app中设置模板路径

    async def post(self):
        uname = self.get_argument('uname')
        nick_name = self.get_argument('nick_name')
        email = self.get_argument('email')
        password = self.get_argument('password')
        phone = self.get_argument('phone')
        language = self.get_argument('language')
        try:
            id = self.get_argument('id')
        except:
            id = None
        args = [uname,nick_name,email,password,phone,language]
        async with aiomysql.create_pool(
            host=self.mysql['host'],
            port=self.mysql['port'],
            user=self.mysql['user'],
            password=self.mysql['password'],
            db=self.mysql['db'],
        ) as pool:
            async with pool.acquire() as con:
                async with con.cursor() as cur:
                    if not id:
                        sql = 'insert into t_user value(0,%s,%s,%s,%s,%s,%s)'
                        # 执行增加
                        await cur.execute(sql,args)
                        # 提交事务
                        await con.commit()
                        # 获取生成的id
                        id = cur.lastrowid
                    else:
                        sql = 'update t_user set uname=%s, nick_name=%s, email=%s, pwd=%s, phone=%s, language=%s where id=%s'
                        await cur.execute(sql,args+[id])
                        await con.commit()
                    args.insert(0, id)
        self.render('personal.html',user=args)


if __name__ == "__main__":
    settings = {
        'template_path': './demo2/',
        'static_path': './demo2/',
        'static_url_prefix': '/static/',
        'debug': True,
        'mysql':{
            'host': '127.0.0.1',
            'port': 3306,
            'user': 'root',
            'password': 'xxx',
            'db': 'tornado_db',
        }
    }
    app = web.Application([
        web.URLSpec('/',IndexHandler,{'mysql':settings.get('mysql')}),
    ],**settings)
    # 设置监听端口
    app.listen(5000)
    # 通过时间循环来监听访问的端口
    ioloop.IOLoop.current().start()

ORM--peewee的使用

官方文档

pip install pymysql
pip install peewee

使用方法

  • 创建数据库的实例
  • 创建一个指定数据库的模型类
    • 继承 peewee.Model
    • 重写类 Meta 并指定database属性为数据库实例

字段类型

peewee字段类型

常用字段

  • AutoField: 专门用于设置主键且自增长的字段 -- integer
  • IntegerField: 整数类型 -- integer
  • FloatField: 浮点类型 -- real
  • DecimalField: 钱类型 -- numeric
  • CharField: 字符串类型 -- varchar
  • TextField: 字符串类型(没有长度限制) -- text
  • UUIDField: 字符串类型 -- varchar(40)
  • TimestampField: 时间戳 -- integer(秒数)
  • DateTimeField: 年月日时分秒 -- datetime
  • BooleanField: 布尔类型 -- bool
  • ForeignKeyField: 外键 -- integer

常用参数

  • null=False:允许为空
  • index=False:不是索引
  • unique=False:不要求唯一
  • column_name=None:映射表中的字段名,如果为None,那么采用变量名当做字段名
  • default=None:默认值
  • primary_key=True:为主键
  • constraints=None:约束
  • verbose_name:字段注释
from peewee import *

# 创建数据库模型类
db = MySQLDatabase('tornado_db',host='127.0.0.1',
                    port=3306,user='root',passwd='xxx')

# 创建模型类
class Company(Model):
    # 默认会生成ID
    name = CharField(verbose_name='公司名称')
    full_name = CharField()
    year = IntegerField(2050)

    class Meta:
        database = db

        # 设置表名
        table_name = 't_company'

# 映射表结构
def init_table():
    db.create_tables([Company])

if __name__ == '__main__':
    init_table()

增加数据

def add_one_data():
    c1 = Company(name = '抖吟', full_name = '抖出强大,吟唱人生', year = 2018)
    c1.save()

def add_one_data1():
    # 这里的company既可以写对象(比如上面c1类似的) 也可以写他的id
    m1 = Music(name='大雨', singer='周深', duration='2:30', _type='pop', company=1)
    m1.save()

数据查询

普通查询

def query_data():
    m1 = Music.get(Music.id == 2)
    m2 = Music.get_by_id(3)
    m3 = Music[3]  # 与上一句类似
    print(m1.singer)
    print(m2.singer)
    print(m3.singer)

数据筛选

类名.select 方法可用于筛选字段

def query_data2():
    temp_rs = Music.select()  # 默认是所有列与数据只会生成SQL
    print(temp_rs)
    temp_iter = Music.select().execute() # 生成的是一个可迭代对象
    for m in temp_iter:
        print(f'{m.id}=={m.name}--{m.singer}=={m._type}')

    temp_iter = Music.select(Music.id,Music.name,Music.singer).execute() # 过滤列
    for m in temp_iter:
        print(f'{m.id}=={m.name}--{m.singer}=={m._type}')

过滤条件

类名.select.where 方法可用于过滤条件

def query_data3():
    m1 = Music.select().where(Music.id > 5)
    for m in m1:
        print(f'{m.id}=={m.singer}')

    m2 = Music.select().where((Music.id > 5) & (Music.singer == 'By2'))
    for m in m2:
        print(f'{m.id}=={m.singer}=={m._type}')

    m3 = Music.select().where((Music.id > 6) | (Music.singer == '林志炫'))
    for m in m3:
        print(f'{m.id}=={m.singer}=={m.duration}')

模糊查询

def query_like():
    m1 = Music.select().where(Music.name.contains('你'))
    for m in m1:
        print(f'{m.id}=={m.name}=={m._type}')

数据排序

类名.select.order_by 方法可用于数据排序

def query_order_by():
    # m1 = Music.select().order_by(Music.id.asc())  # 默认升序排列
    # m1 = Music.select().order_by(+Music.id)  # 默认升序排列
    m1 = Music.select().order_by(Music.id.desc())  # 降序排列
    m1 = Music.select().order_by(-Music.id)  # 降序排列

    for m in m1:
        print(f'{m.id}=={m.name}=={m._type}')

数据分页

类名.select.paginate 方法可用于分页

def query_by_page():
    #  select * from t_music limit 0,3
    m1 = Music.select().paginate(1,3)
    # 第一个参数表示第几页(从1开始), 第二个参数是表示每页显示几条

    for m in m1:
        print(f'{m.id}=={m.name}=={m._type}')

数据修改

在peewee中要更新数据,可以使用以下2种方式:

  • 直接修改对象的属性,并保存(save)
  • 通过类名.update方法修改,并执行(execute方法)
def update_data():
    m = Music.get_by_id(2)
    m.singer = '林小炫'
    m.save()
    # 生成一个SQL对象
    # sql_object = Music.update(singer='林大炫').where(Music.id == 2)
    # sql_object.execute()

    m2 = Music.get_by_id(2)
    print(m2.singer)

数据删除

在peewee中要删除数据,可以使用以下2种方式:

  • 直接调用对象的 delete_instance 方法
  • 通过 类名.delete 方法,并执行(execute方法)
def delete_data():
    m = Music.get_by_id(9)
    m.delete_instance()

    # sql_object = Music.delete().where(Music.id > 5)
    # sql_object.execute()

警告解决

运行代码,总是提示警告 Warning: (3090, "Changing sql mode'NO_AUTO_CREATE_USER' is deprecated. It will be removed in a future release.")result = self._query(query)

解决方案

set @@GLOBAL.sql_mode='';
set sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION';

异步peewee

现成的异步peewee https://github.com/05bit/peewee-async

PostgreSQL
pip install --pre peewee-async; pip install aiopg

MySQL
pip install --pre peewee-async; pip install aiomysql

使用方法

类的创建基本与peewee一样,需要注意的是:

  • 创建peewee_async异步的数据库实例
  • 修改Model里面的database属性为 peewee_async的实例
import peewee_async
from peewee import *
import asyncio

# 创建数据库模型类
db = peewee_async.MySQLDatabase('tornado_db',host='127.0.0.1',
                    port=3306,user='root',password='xxx')
manager = peewee_async.Manager(database=db)

# 创建模型类
class Company(Model):
    # 默认会生成ID
    name = CharField(verbose_name='公司名称')
    full_name = CharField(verbose_name='公司全名')
    year = IntegerField(verbose_name='开业时间')

    class Meta:
        database = db

        # 设置表名
        table_name = 't_company'

class Music(Model):
    name = CharField(verbose_name='音乐名称')
    singer = CharField(verbose_name='演唱者')
    duration = CharField(verbose_name='时长')
    _type = CharField(verbose_name='类型')
    # 如果没有写field,会默认用另一张表的主键做外键
    # ForeignKeyField会自动生成一个company_id作为外键
    company = ForeignKeyField(Company,verbose_name='版权所属',backref='musics')

    class Meta:
        database = db

        # 设置表名
        table_name = 't_music'

# -------------------------------------------------------- #

def create_data():
    Music.create(name='我以为我可以一了百了', singer='林志炫', duration="3:10", _tyep='pop', company=1)

async def create_data_by_manager():
    await manager.create(Music,name='逆战', singer='张杰', duration='2:20', _type='pop', company=2)

async def query_data():
    rs = await manager.execute(Music.select().where(Music.id>5))
    for i in rs:
        print(f'{i.id}=={i.name}=={i.singer}')

if __name__ == '__main__':
    # create_data()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(create_data_by_manager())
    loop.run_until_complete(query_data())
    loop.close()