欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python中flask接口的增删查改

程序员文章站 2023-10-27 15:17:52
import jsonfrom flask import Flask, requestapp = Flask(__name__)import decimal# 防止json报错class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return float(o) return super(De...
import json
from flask import Flask, request

app = Flask(__name__)
import decimal


# 防止json报错
class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return float(o)
        return super(DecimalEncoder, self).default(o)


# 只接受get方法访问
@app.route("/getUser", methods=["GET"])
def getUser():
    # 默认返回内容
    ret_dict = {"code": 200, "msg": "success", "data": False}
    # 判断入参是否为空
    if request.args.to_dict() == {}:
        ret_dict["code"] = 5004
        ret_dict["msg"] = "param invalid"
        return json.dumps(ret_dict, ensure_ascii=False)
    # 获取传入的params参数
    param = request.args.to_dict()
    my_id = param.get("my_id")
    resp = getUserModel(my_id)

    # 对参数进行操作
    ret_dict["data"] = resp
    return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)


@app.route("/delete", methods=["DELETE"])
def delete():
    
    ret_dict = {"code": 200, "msg": "success", "data": False}#成功标准
    param  = request.args.to_dict()

    my_id = param.get("my_id")
    deleteUserModel(my_id)
    return  json.dumps(ret_dict,ensure_ascii=False)
# 改
@app.route("/update", methods=["PATCH"])
def update():
    # 默认返回内容
    ret_dict = {"code": 200, "msg": "success", "data": False}
    data = {}
    param = request.args.to_dict()
    for k  in ["my_id","name","player_name","height"]:
        data[k] = param.get(k)
    ppd= updateuser(data)

    # 对参数进行操作
    ret_dict["data"] = ppd
    return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)
@app.route("/add", methods=["POST"])
def add():
    ret_dict = {"code":200,"msg":"success","date":False}
    ad={}
    param = request.args.to_dict()
    for k in ["my_id", "name", "player_name", "height"]:
        ad[k] = param.get(k)
    pdd = adduser(ad)

    # 对参数进行操作
    ret_dict["data"] = pdd
    return json.dumps(ret_dict, ensure_ascii=False, cls=DecimalEncoder)





import sqlalchemy as sq
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, String, Integer, Float, create_engine
from sqlalchemy.ext.declarative import declarative_base

# 连接数据库
engine = create_engine('mysql+pymysql://用户名:密码@数据库ip/数据库名')
# 注意下载pymysql的库
# 创建基类
Base = declarative_base()


# 定义User对象
class Model(Base):
    # 表的名字
    __tablename__ = 'user'

    # 表的结构
    my_id = Column(Integer, primary_key=True)
    # primary_key若为True,这列就是表的主键
    name = Column(String(20))
    player_name = Column(String(255))
    height = Column(Float(3, 2))

    # 创造字典
    def dict(self):
        return {c.name: getattr(self, c.name, None)
                for c in self.__table__.columns}

    # 改
    # def updateUser2( name, play_name, heghit):
    #     user = session.query(Model).filter_by(my_id=my_id).first()
    #     if user is not None:
    #         user.name = name
    #         user.play_name = play_name
    #         user.heghit = heghit
    #     session.commit()


DBSession = sessionmaker(bind=engine)
# 创建 session 对象:n
session = DBSession()
Base.metadata.create_all(engine)


# 加
def adduser(ad):
    art = Model(player_name=ad["player_name"],height=ad["height"],name=ad["name"])
    session.add(art)
    session.commit()


# 查
def getUserModel( my_id):
    row = session.query(Model).get(my_id)
    if row is None:
        return {}
    else:
        return row.dict()


# 删
def deleteUserModel( my_id):
    lie = session.query(Model).filter_by(my_id=my_id).first()
    if lie is not None:
        session.delete(lie)
    session.commit()
# 改
def updateuser(data):
    user = session.query(Model).filter_by(my_id=data["my_id"]).first()
    if user is not None:
        user.name=data["name"]
        user.player_name=data["player_name"]
        user.height=data["height"]
    session.commit()

# 关闭模型
def close():
    session.close()


# 数据库调用
# row= getUser(session)
# print([row.dict() for row in row])
# updateUser("")
#  u.delete()
# adduser(session)
# rows  = getUser(session)
# print([row.dict() for row in rows])
# close()
if __name__ == '__main__':
    app.run()

本文地址:https://blog.csdn.net/ddym66/article/details/107563934

相关标签: python 数据库