Flask框架-Graphql的使用 -- 潘登同学的flask学习笔记
Graphql
官网:https://graphql.cn/
GraphQL与RESTful关系
- GraphQL和RESTful一样,都是一种网站架构,一种前后端通信规范,不涉及语言,不同语言有不同的实现方案。
- GraphQL目前被认为是革命性的API工具,因为它可以让客户端在请求中指定希望得到的数据,而不像传统的RESTful那样只能呆板地在服务端进行预定义。
- 这样它就让前、后端团队的协作变得比以往更加的通畅,从而能够让组织更好地运作。
- 而实际上,GraphQL与RESTful都是基于HTTP进行数据的请求与接收,而且GraphQL也内置了很多RESTful模型的元素在里面。
- 那么在技术层面上,GraphQL和RESTful这两种API模型到底有什么异同呢?他们归根到底其实没多大区别,只不过GraphQL做了一些小改进,使得开发体验产生了较大的改变。
相同点
- 都有资源这个概念,而且都能通过ID去获取资源
- 都可以通过HTTP GET方式来获取资源
- 都可以使用JSON作为响应格式
差异点
- 在RESTful中,你所访问的路径就是该资源的唯一标识(ID);在GraphQL中,该标识与访问方式并不相关
- 在RESTful中,资源的返回结构与返回数量是由服务端决定;在GraphQL,服务端只负责定义哪些资源是可用的,由客户端自己决定需要得到什么资源
- 如果是对资源的简单查询,GraphQL与RESTful是类似的,都是通过指定资源的名称以及相关参数来取得,但不同的是,你可以根据资源之间的关联关系来发起一个复杂请求,而在RESTful中你只能定义一些特殊的URL参数来获取到特殊的响应,或者是通过发起多个请求、再自行把响应得到的数据进行组装才行。RESTful对数据的描述形式是一连串的URL端点,而GraphQL则是由相互之间有所关联的schema组成。
路由处理器(Route Handlers)vs 解析器(Resolvers)
这是一个Restful的路由代码
class HelloWorld(Resource):
def get(self):
return {'say': 'hello world'}
api.add_resource(HelloWorld, '/hello')
这里我们得到了一个可以返回“world”这个字符串的/hello端点。从这个例子我们可以看到一个RESTful API请求的的生命周期:
- 服务器收到请求并提取出HTTP方法名(比如这里就是GET方法)与URL路径
- API框架找到提前注册好的、请求路径与请求方法都匹配的代码
- 该段代码被执行,并得到相应结果
- API框架对结果进行序列化,添加上适当的状态码与响应头后,返回给客户端
而这是一个GraphQL的代码
class Query(ObjectType):
"""定义一个字符串属性域hello"""
say = String()
def resolve_say(root,info):
return f'Hello World!'
我们看到,这里并没有针对某个URL路径提供函数,而是把Query类型中的hello字段映射到一个函数上了。
在GraphQL中这样的函数我们称之为解析器(Resolver)。
然后我们就可以这样发起一个查询:
query {
say
}
至此,总结一下服务器对一个 GraphQL 请求的执行过程:
- 服务器收到 HTTP 请求,取出其中的 GraphQL 查询
- 遍历查询语句,调用里面每个字段所对应的 Resolver。在这个例子里,只有 Query 这个类型中的一个字段 hello
- Resolver 函数被执行并返回相应结果
- GraphQL 框架把结果根据查询语句的要求进行组装因此我们将会得到如下响应
{ "say": "Hello world!" }
还可以多次调用同一个Resolvers
query {
say
secondSay: say
}
形象表达为:
GraphQL的使用(Python版)
官方文档 https://graphql.cn/code/#python
安装GraphQL
pip install graphene
第一个GraphQL程序
from graphene import ObjectType,Schema,String
# 查询用的
class Query(ObjectType):
say = String()
def resolve_say(self,info):
return 'Hello GraphQL!!'
if __name__ == '__main__':
schema = Schema(query=Query)
# 定义查询
query_string = '''
{
say
}
'''
# 执行查询
rs = schema.execute(query_string)
print(rs.data)
FLaks中应用GraphQL--graphene
pip install Flask-GraphQL
from flask import Flask
from flask_graphql import GraphQLView
from graphene import ObjectType,Schema,String
# 创建对象
app = Flask(__name__)
# 路由地址
@app.route("/")
def index():
return "pandeng"
class Query(ObjectType):
hello = String()
def resolve_hello(self,info):
return 'Hello FLask GraphQL!!'
schema = Schema(query=Query)
# 与挂载类试图相似
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('graphql',schema=schema,graphiql=True))
if __name__ == '__main__':
app.run(debug=True)
报错解决
cannot import name 'get_default_backend' from 'graphql
pip install graphql-core==2.2.1
pip install graphene==2.1.8
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.graphql-server-core 1.2.0 requires graphql-core<3,>=2.3, but you have graphql-core 2.2.1 which is incompatible.graphql-relay 3.1.0 requires graphql-core>=3.1, but you have graphql-core 2.2.1 which is incompatible.graphene 3.0 requires graphql-core~=3.1.2, but you have graphql-core 2.2.1 which is incompatible.
这一段的报错就是全部重装,然后最后再执行一下上面的代码,不要用pip来重装,到site-package中,把与GraphQL有关的全删了;
graphene属性参数
from flask import Flask
from flask_graphql import GraphQLView
from graphene import ObjectType,Schema,String
# 创建对象
app = Flask(__name__)
# 路由地址
@app.route("/")
def index():
return "pandeng"
class Query(ObjectType):
hello = String()
# 需求 将输出的数据的JSON对象的键改为my_info(也会改调用的方法名),性别默认为男,个人描述为必填项
me = String(
name='my_info',
sex=String(default_value='男'),
desc = String(required=True))
def resolve_hello(self,info):
return 'Hello FLask GraphQL!!'
# self是一个None,而info是一个解析器,记住这两个是必填项就行
def resolve_me(self,info,sex,desc):
# 函数这里设置的参数要与上面命名的对应上
return f'PD,sex:{sex}, desc:{desc}'
schema = Schema(query=Query)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('graphql',schema=schema,graphiql=True))
if __name__ == '__main__':
app.run(debug=True)
最终结果
数据类型
- graphene.String:字符串
- graphene.Int:整数
- graphene.Float:浮点
- graphene.Boolean:布尔值
- graphene.ID:唯一的字符串
- graphene.Data:年月日
- graphene.Time:时分秒
- graphene.DateTime:年月日时分秒
- graphene.Decimal:钱
- graphene.JSONString:JSON字符串
- graphene.NonNull:数据返回不可能为空
- graphene.List:列表
基本数据类型
import graphene
from flask import Flask
from flask_graphql import GraphQLView
import datetime
import decimal
class Query(graphene.ObjectType):
str_= graphene.String()
int_ = graphene.Int()
float_ = graphene.Float()
boolean_ = graphene.Boolean()
id_ = graphene.ID()
def resolve_str_(self,info):
return 'Hello'
def resolve_int_(self,info):
return 1
def resolve_float_(self,info):
return 1.1
def resolve_boolean_(self,info):
return True
def resolve_id_(self,info):
return 1.2
data_ = graphene.Date()
time_ = graphene.Time()
date_time = graphene.DateTime()
decimal_ = graphene.Decimal()
def resolve_data_(self,info):
return datetime.date(2050,1,2)
def resolve_time_(self,info):
return datetime.time(1,2,3)
def resolve_date_time(self,info):
return datetime.datetime(2050,1,2,3,4,5)
def resolve_decimal_(self,info):
return decimal.Decimal("10.30")
json_ = graphene.JSONString()
def resolve_json_(self,info):
return {"name":"graphql"}
if __name__ =='__main__':
schema = graphene.Schema(query= Query)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
NonNull与List
class Query(graphene.ObjectType):
uname = graphene.String(required=True)
# NonNull中不能是方法,也就是不能使graphene.String()
user_name = graphene.NonNull(graphene.String) # Exception: NonNull could not have a mounted String() as inner type. Try with NonNull(String).
uname_list = graphene.List(graphene.String)
uname_list2 = graphene.List(graphene.NonNull(graphene.String))
uname_list3 = graphene.NonNull(graphene.List(graphene.String))
def resolve_uname(self,info):
return None
def resolve_user_name(self,info):
return None
def resolve_uname_list(self,info):
return None
def resolve_uname_list2(self,info):
return [None]
def resolve_uname_list3(self,info):
return [None]
自定义数据类型
import graphene
from flask import Flask
from flask_graphql import GraphQLView
# 记得继承graphene.ObjectType
class Person(graphene.ObjectType):
first_name = graphene.String()
last_name = graphene.String()
class Query(graphene.ObjectType):
person = graphene.Field(Person)
persons = graphene.List(Person)
def resolve_person(self,info):
# return Person(first_name="p",last_name="d")
# 上下两种方法都可以
return {"first_name":"p","last_name":"d"}
def resolve_persons(self,info):
# return [Person(first_name="p",last_name="d")
# Person(first_name="d",last_name="p")]
# 上下两种方法都可以
return [
{"first_name":"尚学堂","last_name":"Mr.Gao"},
{"first_name":"尚学堂","last_name":"Mr.Li"}
]
if __name__ =='__main__':
schema = graphene.Schema(query= Query)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
GraphQL接口
这个接口是,工厂方法模式(Factory method pattern),是一种实现了"工厂"概念的面向对象设计模式。就像其他创建型模式一样,它也是处理在不指定对象具体类型的情况下创建对象的问题。工厂方法模式的实质是"定义一个创建对象的接口,但让实现这个接口的类来决定实例化哪个类。工厂方法让类的实例化推迟到子类中进行。
之前的python基础中也实现过工厂模式,这里还是看JAVA怎么说吧 https://zhuanlan.zhihu.com/p/110419316
我们来实现一个创建动物的工厂模式,复习以下元类与类装饰器
'''
动态创建类 元类-->类
用途
动态创建类
使用:
类 = type(类名,(父类...),{属性,方法})
'''
Person = type('Person',(),{})
p1 = Person()
print(p1)
##静态创建类
class Animal():
def __init__(self,color):
self.color = color
def eat(self):
print('我要吃手手')
def sleep(self):
print('我要睡觉觉')
Dog = type('Dog',(Animal,),{'age':3,'sleep':sleep})
dahuang = Dog('yellow')
print(dahuang.age)
dahuang.sleep()
#%%类装饰器
'''
在不修改函数源代码的前提下,增加新的功能
'''
class AAA():
def __init__(self,func):
# print('我是AAA.init()')
self.__func = func
def __call__(self,*args,**kwargs):
self.addfunc()
self.__func()
def addfunc(self):
print('新增用户权限验证')
print('新增系统日志处理')
@AAA
#类装饰器相当于 test = AAA(test)
def test():
print('我是功能一')
test()
然后我们用graphene的方式来实现工厂模式,设计动物类,然后产生老鼠和鸟的类
import graphene
from flask import Flask
from flask_graphql import GraphQLView
class Animal(graphene.Interface):
id = graphene.ID()
name = graphene.String()
class Mouse(graphene.ObjectType):
# Class Meta与type的操作基本差不多,功能差不多
class Meta:
interfaces =(Animal,)
run = graphene.String()
class Bird(graphene.ObjectType):
class Meta:
interfaces =(Animal,)
fly = graphene.String()
class Query(graphene.ObjectType):
mouse = graphene.Field(Mouse)
bird = graphene.Field(Bird)
animal = graphene.Field(Animal,type_ = graphene.Int(required = True))
def resolve_mouse(self,info):
# 在自定义数据类型中测试了可以这样return
return {'id':1,'name':'杰瑞','run':'跑呀~Tom来了!!'}
def resolve_bird(self,info):
return {'id':2,'name':'鹦鹉','fly':'跑呀~tina来了!!'}
# 返回元类的时候不能笑之前那样写,要先创建一个子类,然后返回去时,就只会返回元类的方法属性等
def resolve_animal(self,info,type_):
# return Animal(id=3,name='动物') # "message": "An Interface cannot be intitialized",
if type_ == 1:
return Mouse(id= 4, name='米老鼠',run='追唐老鸭啦~~~')
else:
return Bird(id = 3,name='鹦鹉',fly = '跑呀~Tom来了!!')
if __name__ =='__main__':
schema = graphene.Schema(query= Query)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
枚举的使用
沿用上面的代码
class Role(graphene.Enum):
JR = 1
YW = 2
TM = 3
# 修改一下属性参数
animal = graphene.Field(Animal,type_ =Role())
# 修改一下resolve_animal()
def resolve_animal(self,info,type_):
if type_ == Role.JR:
return Mouse(id= 4, name='米老鼠',run='追唐老鸭啦~~~')
elif type_ == Role.YW:
return Bird(id = 3,name='鹦鹉',fly = '跑呀~Tom来了!!')
else:
return Mouse(id= 5,name="Tom",run="别让杰瑞跑了~~快追!!")
GraphQL查询
先创建数据,数据大概是这样的: 元类是一个角色,子类是人类和机器人;然后有一个枚举类型
'''
===================== data =========================
上面是创建数据的操作
'''
human_data = {}
droid_data = {}
def setup():
global human_data, droid_data
luke = Human(
id="1000",
name="Luke Skywalker",
friends=["1002", "1003", "2000", "2001"],
appears_in=[4, 5, 6],
home_planet="Tatooine",
)
vader = Human(
id="1001",
name="Darth Vader",
friends=["1004"],
appears_in=[4, 5, 6],
home_planet="Tatooine",
)
han = Human(
id="1002",
name="Han Solo",
friends=["1000", "1003", "2001"],
appears_in=[4, 5, 6],
home_planet=None,
)
leia = Human(
id="1003",
name="Leia Organa",
friends=["1000", "1002", "2000", "2001"],
appears_in=[4, 5, 6],
home_planet="Alderaan",
)
tarkin = Human(
id="1004",
name="Wilhuff Tarkin",
friends=["1001"],
appears_in=[4],
home_planet=None,
)
human_data = {
"1000": luke,
"1001": vader,
"1002": han,
"1003": leia,
"1004": tarkin,
}
c3po = Droid(
id="2000",
name="C-3PO",
friends=["1000", "1002", "1003", "2001"],
appears_in=[4, 5, 6],
primary_function="Protocol",
)
r2d2 = Droid(
id="2001",
name="R2-D2",
friends=["1000", "1002", "1003"],
appears_in=[4, 5, 6],
primary_function="Astromech",
)
droid_data = {"2000": c3po, "2001": r2d2}
def get_character(id):
return human_data.get(id) or droid_data.get(id)
def get_friends(character):
return map(get_character, character.friends)
def get_hero(episode):
if episode == 5:
return human_data["1000"]
return droid_data["2001"]
def get_human(id):
return human_data.get(id)
def get_droid(id):
return droid_data.get(id)
'''
===================== schema =========================
下面是类的设置与查询设置
'''
import graphene
from flask_graphql import GraphQLView
from flask import Flask
class Episode(graphene.Enum): # 剧集
NEWHOPE = 4 # 星球大战4 新希望
EMPIRE = 5 # 黑金帝国
JEDI = 6 # 星球大战 绝地
class Character(graphene.Interface): # 角色 元类
id = graphene.ID()
name = graphene.String()
friends = graphene.List(lambda: Character)
appears_in = graphene.List(Episode) # 出演
def resolve_friends(self, info):
# 返回朋友的一个列表
# The character friends is a list of strings
return [get_character(f) for f in self.friends]
class Human(graphene.ObjectType): # 人类
class Meta:
interfaces = (Character,)
home_planet = graphene.String() # 地球家园
class Droid(graphene.ObjectType): # 机器人
class Meta:
interfaces = (Character,)
primary_function = graphene.String() # 主要功能
class Query(graphene.ObjectType):
# 查询参数只有三个
hero = graphene.Field(Character, episode=Episode())
human = graphene.Field(Human, id=graphene.String())
droid = graphene.Field(Droid, id=graphene.String())
def resolve_hero(root, info, episode=None):
return get_hero(episode)
def resolve_human(root, info, id):
return get_human(id)
def resolve_droid(root, info, id):
return get_droid(id)
if __name__ == '__main__':
setup()
schema = graphene.Schema(query=Query)
app = Flask(__name__)
app.add_url_rule('/graphql', view_func=GraphQLView.as_view('graphql',schema=schema, graphiql=True))
app.run(debug=True)
- 1.查一个,查询语句
human(id:"1002"){
id
name
friends {
id
name
}
appearsIn
}
- 2.当我们想在一个界面查询多个数据时,会报错
{
human(id:"1002"){
id
name
friends {
id
}
appearsIn
}
human(id:"1003"){
id
name
friends {
id
}
appearsIn
}
}
按照他的提示,给他起个别名,结果正常
{
h1:human(id:"1002"){
id
name
friends {
id
}
appearsIn
}
h2:human(id:"1003"){
id
name
friends {
id
}
appearsIn
}
}
- 3.查询中简化查询语句,结果一致
# 将查询语句封装成一个类 起个名字hf
fragment hf on Human{
id
name
homePlanet
}
# 进行查询
{
h1:human(id:"1002"){
...hf
}
h2:human(id:"1003"){
...hf
}
}
- 4.之前我们查元类的时候,只会返回元类特有的属性,如果我们想查询子类也有的属性值,加
...on
{
hero(episode:EMPIRE){
id
name
friends {
id
}
appearsIn
# 想查询homePlanet,但是hero是元类,没有homePlanet属性
# 但是hero(episode:EMPIRE)返回的是一个id为1000的人类,人类是有homePlanet属性的
# 增加一个...on Human{}
...on Human{
homePlanet
}
...on Droid{
primaryFunction
# 能正常查但是没结果
}
}
}
- 5.给查询语句定义名称,动态传递参数,在查询下面输入参数
query q1($id:Episode) {
# $id理解为变量,Episode理解为参数
hero(episode:$id){
id
name
friends {
id
}
appearsIn
...on Droid{
primaryFunction
}
...on Human{
homePlanet
}
}
}
从这一套的操作的时候,就能发现,GraphQL是需要什么才查什么,就是比Restful更加灵活
GraphQL增删改
使用Mutation
关键字,增加数据的代码是比较难理解,注释写的很清晰了,一些很难理解的就是固定写法了,核心其实就是创建数据,存储数据,返回相应结果;复杂的略过即可。
import graphene
from flask import Flask
from flask_graphql import GraphQLView
users =[]
# 第0步:数据的定义
class Person(graphene.ObjectType):
name = graphene.String()
age = graphene.Int()
# 第二步创建增加数据的函数
class CreatePerson(graphene.Mutation):
# 定义可以增加的属性,类似查询参数时的 hero = graphene.Field(Character, episode=Episode())
person = graphene.Field(Person)
msg = graphene.String()
class Arguments: # 设置可以传进来的参数
name = graphene.String()
age = graphene.Int()
# 接受传进来的参数,类似查询时候传进来的查询参数,而这里是增加数据而已
def mutate(self,info,name,age):
# 存储的逻辑
p = Person(name =name,age = age)
# 这里没有连数据库,直接用的本地list
users.append(p)
# 响应请求的结果 返回查询结果
return CreatePerson(person = p,msg="success")
# Mutation 第一步
class Mutation(graphene.ObjectType): # 映射对外使用字段
# 增加数据 的函数
create_person = CreatePerson.Field()
class Query(graphene.ObjectType):
persons = graphene.List(Person)
def resolve_persons(self,info):
return users
if __name__ =='__main__':
schema = graphene.Schema(query=Query,mutation=Mutation)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
增加语句
mutation add1{
createPerson(age:19,name:"pandeng"){
person {
name
age
}
msg
}
}
注意
在GraphQL中其实是前端在查询,所以字符串要双引号,不然报错
可以看出虽然写的很复杂,但是最后调用的时候还是很舒服的
import graphene
from flask import Flask
from flask_graphql import GraphQLView
class Person(graphene.ObjectType):
name = graphene.String()
age = graphene.Int()
users =[
Person(name="sxt",age="18"),
Person(name="Mr.Li",age= "20")
]
class UpdatePerson(graphene.Mutation):
person = graphene.Field(Person)
msg = graphene.String()
class Arguments:
name = graphene.String()
age = graphene.Int()
def mutate(self,info,name,age):
for u in users:
if u.name == name:
u.age = age
return UpdatePerson(person =u,msg ="success")
return UpdatePerson(msg='fail')
class DeletePerson(graphene.Mutation):
person = graphene.Field(Person)
msg = graphene.String()
class Arguments:
name = graphene.String()
def mutate(self,info,name):
for u in users:
if u.name == name:
users.remove(u)
return UpdatePerson(person =u,msg ="success")
return UpdatePerson(msg='fail')
# Mutation
class Mutation(graphene.ObjectType): # 映射对外使用字段
# 这里可以理解为注册到Mutation上,就像之前注册到Query上一样
update_person = UpdatePerson.Field()
delete_person = DeletePerson.Field()
class Query(graphene.ObjectType):
persons = graphene.List(Person)
def resolve_persons(self,info):
return users
if __name__ =='__main__':
schema = graphene.Schema(query=Query,mutation=Mutation)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
GraphQL连接数据库SQLAlchemy
pip install graphene-sqlalchemy
新建一个数据库
CREATE DATABASE GraphQL_db
DEFAULT CHARACTER SET = 'utf8mb4';
先创建数据
from random import randint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, ForeignKey,Integer,String,and_,Text,Table,func,create_engine
# 数据库的变量
HOST = '127.0.0.1' # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'GraphQL_db'
USER = 'root'
PWD = 'xxx'
# # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)
# 创建一个基础类
Base = declarative_base(engine)
Session = sessionmaker(engine)
class User(Base):
__tablename__ = "t_user"
id = Column(Integer,primary_key=True,autoincrement=True)
uname = Column(String(50),nullable=False)
age = Column(Integer)
# newss = relationship('News') 不推荐
def __repr__(self) -> str:
return f'<User: id={self.id} uname={self.uname} age={self.age}>'
def create_data():
Base.metadata.create_all()
with Session() as session:
for i in range(10):
user = User(uname=f'uname{i}',age=randint(5,20))
session.add(user)
session.commit()
if __name__ == '__main__':
create_data()
Base.query = scoped_session(Session).query_property()
关键 ,先贴上源码:
接上面代码
from random import randint
import graphene
from graphene_sqlalchemy import SQLAlchemyObjectType
from flask import Flask
from flask_graphql import GraphQLView
# 这句话才能是允许查询
Graph_Session = scoped_session(Session)
Base.query = Graph_Session.query_property()
class graphene_user(SQLAlchemyObjectType):
# 继承了SQLAlchemyObjectType 就可以直接继承之前创建的User了
class Meta:
model = User
class Query(graphene.ObjectType):
# 注册 查询所有
users = graphene.List(graphene_user)
user= graphene.Field(graphene_user,id=graphene.String())
# 解析
def resolve_users(self,info):
query = graphene_user.get_query(info)
return query.all()
def resolve_user(self,info,id):
return graphene_user.get_node(info,id)
if __name__ =='__main__':
schema = graphene.Schema(query=Query)
# 这样可以测试接口
query_with_argument = 'query qi{user(id:"3"){uname,id,age}}'
result = schema.execute(query_with_argument)
print(result.data['user'])
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
查询结果:
新增数据
from random import randint
import graphene
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,scoped_session
from sqlalchemy import Column, ForeignKey,Integer,String,and_,Text,Table,func,create_engine
from graphene_sqlalchemy import SQLAlchemyObjectType
from flask import Flask
from flask_graphql import GraphQLView
# 数据库的变量
HOST = '127.0.0.1' # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'GraphQL_db'
USER = 'root'
PWD = 'xxx'
# # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)
# 创建一个基础类
Base = declarative_base(engine)
Session = sessionmaker(engine)
# 这句话才能是允许查询
Graph_Session = scoped_session(Session)
Base.query = Graph_Session.query_property()
class User(Base):
__tablename__ = "t_user"
id = Column(Integer,primary_key=True,autoincrement=True)
uname = Column(String(50),nullable=False)
age = Column(Integer)
# newss = relationship('News') 不推荐
def __repr__(self) -> str:
return f'<User: id={self.id} uname={self.uname} age={self.age}>'
def create_data():
Base.metadata.drop_all()
Base.metadata.create_all()
with Session() as session:
for i in range(10):
user = User(uname=f'uname{i}',age=randint(5,20))
session.add(user)
session.commit()
class graphene_user(SQLAlchemyObjectType):
# 继承了SQLAlchemyObjectType 就可以直接继承之前创建的User了
class Meta:
model = User
class CreateUser(graphene.Mutation):
user = graphene.Field(graphene_user)
msg = graphene.Boolean()
# 指定参数
class Arguments:
# id = graphene.String() # 不传了因为是自增长
uname = graphene.String()
age = graphene.Int()
# 解析
def mutate(self,info,uname,age):
user = User(uname=uname,age=age)
# 加入数据库
# 使用with Session形式报错,无法响应到前端,但是能添加数据
# Instance <User at 0x1eb97368a60> is not bound to a Session;
# attribute refresh operation cannot proceed
# with Session() as session:
# session.add(user)
# session.commit()
Graph_Session.add(user)
Graph_Session.commit()
return CreateUser(user=user,msg=True)
class Mutation(graphene.ObjectType):
# 注册添加一个
create_user = CreateUser.Field()
class Query(graphene.ObjectType):
# 注册 查询所有
users= graphene.List(graphene_user)
# 注意前面导包导进来的是sqlalchemy的String,而这里要用GraphQL的string
user= graphene.Field(graphene_user,id=graphene.String())
# 解析
def resolve_users(self,info):
query = graphene_user.get_query(info)
return query.all()
def resolve_user(self,info,id):
return graphene_user.get_node(info,id)
if __name__ =='__main__':
# create_data()
schema = graphene.Schema(query=Query,mutation=Mutation)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
对于修改和删除
就是把前面的修改删除拿下来改吧改吧就ok了
报错解决
Instance <User at 0x1eb97368a60> is not bound to a Session; attribute refresh operation cannot proceed
错误代码,不能正常响应,但是可以添加数据到数据库
class CreateUser(graphene.Mutation):
user = graphene.Field(graphene_user)
msg = graphene.Boolean()
# 指定参数
class Arguments:
# id = graphene.String() # 不传了因为是自增长
uname = graphene.String()
age = graphene.Int()
# 解析
def mutate(self,info,uname,age):
user = User(uname=uname,age=age)
# 加入数据库
# 使用with Session形式报错,无法响应到前端,但是能添加数据
with Session() as session:
session.add(user)
session.commit()
return CreateUser(user=user,msg=True)
原因: 过早关闭session,(我也不知道,之前这样做都是可以的),修改为
class CreateUser(graphene.Mutation):
user = graphene.Field(graphene_user)
msg = graphene.Boolean()
# 指定参数
class Arguments:
# id = graphene.String() # 不传了因为是自增长
uname = graphene.String()
age = graphene.Int()
# 解析
def mutate(self,info,uname,age):
user = User(uname=uname,age=age)
# 加入数据库
# 修改后!!!!!!!!!!!!!!!!!!!!!!!!
Graph_Session.add(user)
Graph_Session.commit()
return CreateUser(user=user,msg=True)
使用graphene_sqlalchemy
from random import randint
import graphene
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker,scoped_session
from sqlalchemy import Column, ForeignKey,Integer,String,and_,Text,Table,func,create_engine
from graphene_sqlalchemy import SQLAlchemyConnectionField, SQLAlchemyObjectType
from flask import Flask
from flask_graphql import GraphQLView
# 数据库的变量
HOST = '127.0.0.1' # 自己本机的就是127.0.0.1 或 localhost
PORT = 3306
DATA_BASE = 'GraphQL_db'
USER = 'root'
PWD = 'mysql2002'
# # DB_URI = f'数据库的名+驱动名://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
DB_URI = f'mysql+pymysql://{USER}:{PWD}@{HOST}:{PORT}/{DATA_BASE}'
engine = create_engine(DB_URI)
# 创建一个基础类
Base = declarative_base(engine)
Session = sessionmaker(engine)
# 这句话才能是允许查询
Graph_Session = scoped_session(Session)
Base.query = Graph_Session.query_property()
class User(Base):
__tablename__ = "t_user"
id = Column(Integer,primary_key=True,autoincrement=True)
uname = Column(String(50),nullable=False)
age = Column(Integer)
# newss = relationship('News') 不推荐
def __repr__(self) -> str:
return f'<User: id={self.id} uname={self.uname} age={self.age}>'
def create_data():
Base.metadata.drop_all()
Base.metadata.create_all()
with Session() as session:
for i in range(10):
user = User(uname=f'uname{i}',age=randint(5,20))
session.add(user)
session.commit()
class graphene_user(SQLAlchemyObjectType):
# 继承了SQLAlchemyObjectType 就可以直接继承之前创建的User了
class Meta:
model = User
# 接口 继承graphene.relay.Node
# 因为Query是一个通用的查询,可以查询很多类,所以只要继承了这个接口,就能用他的方法查询
Interfaces = (graphene.relay.Node,)
class Query(graphene.ObjectType):
node = graphene.relay.Node.Field()
user = SQLAlchemyConnectionField(graphene_user.connection)
if __name__ =='__main__':
# create_data()
schema = graphene.Schema(query=Query)
app = Flask(__name__)
app.add_url_rule('/graphql',view_func=GraphQLView.as_view('grapql',schema=schema, graphiql=True))
app.run(debug=True)
规范操作后出现null
然后我们的数据库中也是有数据的
那怎么办? 去看源码,点进Node中,因为Node是接口元类
那我们就把刚才的源码中提到的type name:id
(这里的name:id,nama是指查询的那个子类,id是主键)去加一波密,在网站中输入base64,进行编码,结果为Z3JhcGhlbmVfdXNlcjoxMQ==
重新查询,查询成功
node查询总结
虽然这样查询比较繁琐,但是全都集成进了node节点,真正实现了一个节点进行查询,实现了这幅图的效果;
前端就使用base64的库将参数进行传递加密再进行传递即可;
后续再增加数据,就只需要再写一个model
(比如User)对应的子类graphene_model
(比如graphene_user),一样继承Node节点,最后挂载在Query
上即可;
graphene_sqlalchemy中connection查询
- first:前几个
- last:后几个 last:x
- after:索引x 往后(不含x) after:
arrayconnection: x
- before:索引x 往前(不含x)
刚才我们挂载的时候写的一条语句似乎还没有用上,我们去使用一下
class Query(graphene.ObjectType):
node = graphene.relay.Node.Field()
user = SQLAlchemyConnectionField(graphene_user.connection)
注意到右边还有一些参数,好像能排序,试试年龄升序
还能返回前几个数据和后几个数据 ,first
与last
看到还有befor与after,在一通翻源码下,我们知道了还是一个base64的编码arrayconnection:
把arrayconnection:4
拿去转码,转为base64(YXJyYXljb25uZWN0aW9uOjQ=
,放进before后面与after后面
组合操作
需求:对应python切片[2:5]
组合操作,after:1 first:3
或者 before:5 first:3
me:id去加一波密,在网站中输入base64,进行编码,结果为
Z3JhcGhlbmVfdXNlcjoxMQ==`
组合操作,after:1 first:3
或者 before:5 first:3