1:开篇
1.在实践中检验真理 在实践中发展真理
陆陆续续的整理整个学习Fastapi框架一些小笔记的过程发现,自己其实对某些知识点掌握到实际应用的时候还是容易忘记,而且部分知识点也是难以理解和使用,熬了一段时间之后,也算是完整了公司内部的后台从flask迁移到Fastapi框架上。
再迁移的过程中深知,对于python异步的使用,还是需要多多实践,才能深入去理解和应用asyncio异步IO的使用,也累积了自己一点使用这个框架过程中一些使用小经验。
为了检验自己的知识掌握程度和系统化的进行学习和使用这个框架,另外不辜负关注我的公众号的粉丝们(虽然没多少个,但是仍然感激他们的不离不弃,毕竟我写的东西感觉太凌乱,太烂了~哈哈)之前说有没有实践,所以不管怎么样,还是把自己这些时间整理的脚手架搭建的一些知识点再进行系统的补充一下。
2.内容概要
相关的文章叙述主要通过实战叙述我们日常一些 API 构建过程一些流程,其实无非而已是开发-测试-部署三个范畴:
开发环境的搭建(docker环境搭建)
API设计一些规范描述
对脚手架个功能点的整理
项目配置文件处理
API日志记录处理
异步redis缓存处理
同步数据库整合使用
异步数据库的整合和使用
全局错误异常处理
全局的Http请求响应报文的处理
扩展第三方插件-限流器
扩展第三方的插件-错误统计处理
扩展-第三方插件-全局的认证JWT
扩展-第三方插件-消息队列的整合
API版本的规划和处理
Api相关的单元测试和引入相关性能分析
可能的话(还是带出K8S的应用(我还在学习中~))
然后就是完整一个API服务的部署,这里主要是结合Docker和Drone进行相关部署
内容上,可能也会随时改变,但是整体可能还是会围绕上面几个概要点展开。
以上几点是大概接下来我的文章我自己会去整理的,因为我的表达能力有限,可能有些地方诉述会词不达意,希望海涵,也希望各位大佬批评指出。
PS:脚手架纯属个人经验的不断优化累积的沉淀结果,属于个人经验之谈,仅供参考!如有错误,还请各位大佬指正!
3.关于脚手架
3.1 序言
首先,经过之前一系列的基础知识的学习,大致上如何把这个框架应用到我们的自己的业务环境中的,就需要适当进行整合和封装一下,让它更加快捷方便自己的业务开放。
如何的去封装整合一个属于合适自己的脚手架,这个要看自己喜欢是没有的风格了吧,另外如果公司中也有相关的规范要求的话,那你只能根据公司的规范要求来设计。
但是总体上,我个人觉得一个脚手架其实是离不开几点要素,我自己的话,则封装一些常用几个功能(其实也是上面提到几个要点):
支持 Swagger 接口文档生成(fastapi纯天然自带基因)
需要支持对应的日志收集-我这里使用loguru进行日志记录
扩展支持相关的JWT的接口鉴权验证
支持相关的接口的cors跨域支持
对中间件扩展的支持(但是fastapi中间件扩展如何涉及读取reques中的body等信息会有问题,需要考虑自己的需求)
如果你喜欢配置文件类型的读取的方式,还可以进行相关配置文件解析
全局的统一响应返回体(无论是错误还是正常的)
内部的日志的自定义traceid链路追踪(包含第三方的请求处理)
支持相关的rate 接口限流(基于redis)
支持全局的异常捕获,可以进行相关通知
sentry异常的捕获上传处理
3.2 脚手架整体结构说明
因为如果项目是针对单一的项目,不是一个大型的项目的话,可以不需提取公共的模块出来,如果有有必要的公共的模块想共享使用的话,就可以提取公共的出来!我这里暂时不提炼出来。
以下是我的自己脚手架的整体的一个结构:
PS:纯属个人的组织方式,仅供参考
3.3计划基于脚手架之上会案例结构小示例:
使用异步的方式进行对接三地方接口(天气预报接口)
基于Vue制作一个简单的用户管理系统(使用别人的Vue模板是最快滴)
长远计划,弄一个数据自己的后台系统
2.搭建开始
一个脚手架的搭建首先从规划我们的项目结构开始(当然需要根据自己企业自身的而定咯)。首先就是如下截图,是我自己规划的结构图:
规划好的项目结构之后,那接下里我们第一步就是开始先定义一个我们自己的一个fastapi的对象以及初始化这个基础的对象的时候一些配置信息读取。
因为之前使用flask,习惯了那种工厂模式方式来初始化我们的对象,所以这里也沿袭了那种风格来定义:
从上面的可以大概理解出我们的整个的应用启动的时候,需要哪些配件了!
2.1 全局配置文件说明
因为启动一个APP对象的时候,涉及到一些配置信息的时候,我们的需要统一去管理,为了更细致区分,甚至我自己划分了更明晰的配置功能:
比如上图所示的,有:
全局认证的配置信息
文档配置信息
数据库的配置信息
redis的配置信息
PS:通常生产环境,根据个人需求吧,为了安全性,肯定是把一些重要信息通过写入环境变量来读取!可以结合一下读取环境变量的方式来解析对应的配置信息!
dotenv 这个库可以了解一下!
主要配置文件信息有:
auth_conf.py:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : auth_conf
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/6/9
-------------------------------------------------
修改描述-2021/6/9:
-------------------------------------------------
"""
from functools import lru_cache
from pydantic import BaseSettings
import pprint
import secrets
from typing import List
pp = pprint.PrettyPrinter(indent=4)
class AuthUrlSettings(BaseSettings):
# token相关-加密算法
JWT_ALGORITHM: str = "HS256" #
# 秘钥生成
# JWT_SECRET_KEY: str = secrets.token_urlsafe(32) # 随机生成的base64位字符串
JWT_SECRET_KEY: str = '09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7'
# token配置的有效期
JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 3 # token的时效 3 天 = 60 * 24 * 3
JWT_REFRESH_EXPIRES_DAYS: int = 1
# 跨域设置
ORIGINS: List[str] = ["*"]
class Config:
env_file = ".env"
case_sensitive = True
env_file_encoding = 'utf-8'
ADMIN_WHILE_ROUTE = [
# '/sys/user/logout',
'/5gmsg/sys/user/login',
'/nw/sys/user/login',
'/',
'/check',
'/check23',
'/jcg_admin/api/v1/login',
'/websocket/1',
'/openapi_url',
'/nw/sys/user/login',
'/nw/sys/user/loginceshi'
]
@lru_cache()
def get_auth_settings():
return AuthUrlSettings()
auth = get_auth_settings()
复制代码
docs_conf.py(初始化我们的Fastapi对象的时候使用到):
from functools import lru_cache
from pydantic import BaseSettings
import pprint
pp = pprint.PrettyPrinter(indent=4)
class DocsSettings(BaseSettings):
"""配置类"""
API_V1_STR: str = ""
# 文档接口描述相关的配置
DOCS_URL = API_V1_STR + '/docs'
REDOC_URL = API_V1_STR + '/redocs'
# OPENAPI_URL配置我们的openapi,json的地址
OPENAPI_URL = API_V1_STR + '/openapi_url'
# 接口描述
TITLE = "5G消息管理系统后台"
# 首页描述文档的详细介绍信息
DESC = """
`xxxxxx消息管理系统后台用`
- 前端:使用 ANT VBEN的框架进行搭建
- 后端: 同步模式的多线程模式+ 单线程模式的协程模式
- 技术栈 :FastAPI+ POSTGRESQL+自制ORM
**具体项目描述**
- [1] [系统管理后台模块]
- [2] [商户管理模块]
- [3] [消息模板模块]
"""
TAGS_METADATA = [
{
"name": "后台管理系统",
"description": "后台所有的公司的相关的权限管理",
},
# {
# "name": "xxxxxx消息管理模块",
# "description": "后台所有的公司的相关的权限管理",
# },
# {
# "name": "xxxxx",
# "description": "xxxxx后台所有的公司的相关的权限管理",
# "externalDocs": {
# "description": "子文档信息",
# "url": "https://fastapi.tiangolo.com/",
# },
# },
]
# 配置代理相关的参数信息
SERVERS = [
{"url": "/", "description": "本地调试环境"},
{"url": "https://xx.xx.com", "description": "线上测试环境"},
{"url": "https://xx2.xx2.com", "description": "线上生产环境"},
]
@lru_cache()
def get_settings():
return DocsSettings()
# 配置实例的对象的创建
docs = get_settings()
复制代码
pgdb_conf.py:
from functools import lru_cache
from pydantic import BaseSettings
import pprint
pp = pprint.PrettyPrinter(indent=4)
class DatabaseSettings(BaseSettings):
DEPLOY_HOST: str = '0.0.0.0'
DEPLOY_PORT: int = 8888
DEPLOY_DEBUG: bool = False
DEPLOY_RELOAD: bool = False
DEPLOY_ACCESS_LOG: bool = False
@lru_cache()
def get_settings():
return DatabaseSettings()
# 配置实例的对象的创建
pgconf = get_settings()
复制代码
redis_conf.py:
from functools import lru_cache
from pydantic import BaseSettings
import pprint
from pydantic import AnyUrl, BaseSettings
import os
pp = pprint.PrettyPrinter(indent=4)
class RedisSettings(BaseSettings):
DEPLOY_HOST: str = '0.0.0.0'
DEPLOY_PORT: int = 8888
DEPLOY_DEBUG: bool = False
DEPLOY_RELOAD: bool = False
DEPLOY_ACCESS_LOG: bool = False
# redis://:root12345@127.0.0.1:6379/0?encoding=utf-8
redis_url: AnyUrl = os.environ.get("REDIS_URL", "redis://127.0.0.1:6379/0?encoding=utf-8")
redis_password: str = os.getenv("REDIS_PASSWORD", "")
redis_db: int = int(os.getenv("REDIS_DB", "0"))
# 哨兵机制的链接的配置
use_redis_sentinel: bool = (
True if os.getenv("REDIS_USE_SENTINEL", "0") == "1" else False
)
redis_sentinel_port: int = int(os.getenv("REDIS_SENTINEL_PORT", "26379"))
redis_sentinel_url: str = os.getenv("REDIS_SENTINEL_URL", "")
redis_sentinel_password: str = os.getenv("REDIS_SENTINEL_PASSWORD", "")
redis_sentinel_master_name: str = os.getenv(
"REDIS_SENTINEL_MASTER_NAME", "molmaster"
)
@lru_cache()
def get_settings():
return RedisSettings()
# 配置实例的对象的创建
redisconf = get_settings()
复制代码
以上是是所以关于配置信息说明:
2.2 App示例对象创建
一个Fastapi的实例就是一个应用服务,这个服务需要哪些配件,再启动的时候就需要配置好。所以有了以下规划:
主要内容就是:
创建一个APP对应的时候,顺便实例化我们的一些插件或导入的API服务等,一下是详细的方法的说明:
所以我们的展开的时候也会按上面的相关注册功能模块展开。
2.2.1 注册日志模块的处理
在一个应用中日志对我们的后续的异常定位是必不可缺的一部分,所以我们的需要对我们的相关的请求日志做好写入本地,便于后续回溯问题定位。
这里的日志模块处理,我使用的是loguru,这个库其实是挺不错的日志库,也支持异步的写入,所以再异步里的,感觉使用上应该是很理想的。
再定义日志模块之前,需要考虑的问题点:
日志存贮目录
日志记录格式
日志切割处理
那基于上述几个问题,我们把我们的日志记录,也进行一个插件化的方式来引入。
所以再我们的ext模块下,就有对应日志插件的处理:
定义日志:
loger_config.py 文件内容:
import time
# 封装一下关于记录序号的日志记录用于全链路的日志请求的日志
from datetime import datetime
from loguru import logger
def creat_customize_log_loguru(pro_path=None):
'''
:param pro_path: 当前需要生产的日志文件的存在路径
:return:
'''
import os
if not pro_path:
# BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
pro_path = os.path.split(os.path.realpath(__file__))[0]
# 定义info_log文件名称
log_file_path = os.path.join(pro_path, 'log/info_{time:YYYYMMDD}.log')
# 定义err_log文件名称
err_log_file_path = os.path.join(pro_path, 'log/error_{time:YYYYMMDD}.log')
from sys import stdout
LOGURU_FORMAT: str = '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <16}</level> | <bold>{message}</bold>'
# 这句话很关键避免多次的写入我们的日志
logger.configure(handlers=[{'sink': stdout, 'format': LOGURU_FORMAT}])
# 这个也可以启动避免多次的写入的作用,但是我们的 app:register_logger:40 -无法输出
# logger.remove()
# 错误日志不需要压缩
format = " {time:YYYY-MM-DD HH:mm:ss:SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} |\n {message}"
# enqueue=True表示 开启异步写入
# 使用 rotation 参数实现定时创建 log 文件,可以实现每天 0 点新创建一个 log 文件输出了
logger.add(err_log_file_path, format=format, rotation='00:00', encoding='utf-8', level='ERROR', enqueue=True) # Automatically rotate too big file
# 对应不同的格式
format2 = " {time:YYYY-MM-DD HH:mm:ss:SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} | {message}"
# enqueue=True表示 开启异步写入
# 使用 rotation 参数实现定时创建 log 文件,可以实现每天 0 点新创建一个 log 文件输出了
logger.add(log_file_path, format=format2, rotation='00:00', compression="zip", encoding='utf-8', level='INFO', enqueue=True) # Automatically rotate too big file
复制代码
PS:注意点,再使用这个日志处理器的时候,下面句很关键避免多次的写入我们的日志,没有这个的话,会在记录日志的时候重复写入多次!
logger.configure(handlers=[{'sink': stdout, 'format': LOGURU_FORMAT}])
定义日志记录对象:
为什么需要这样的处理,因为之前几篇文章,我也有叙说过,关于fastapi在中间件处理日志的时候问题,使用中间件的方式记录日志的话,我们的无法对 request: Request 二次使用,而我的自己对日志需求就是如下示例:
2021-08-03 10:21:14:718 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 1, "event_type": "request", "msg": {"useragent": {"os": "Windows 10", "browser": "QQ Browser 10.8.4405", "device": {"family": "Other", "brand": null, "model": null}}, "url": "/check", "method": "GET", "ip": "127.0.0.1", "params": {}, "ts": "2021-08-03 10:21:14"}}
2021-08-03 10:21:14:719 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 2, "event_type": "response", "msg": {"status_code": 200, "cost_time": "0.00", "rsp": "ok", "ts": "2021-08-03 10:21:14"}}
复制代码
因为我的请求日志和记录响应的日志是分开记录,所以我上面处理方式和某大佬提出的是有点出路的。基于我自己上面那种需求,所以我就分开记录,分开记录就需要一个 thread_id:所有有下面使用自定义ContextLogerRoute的实现:
文件名称:
contexr_logger_route.py
复制代码
文件内容:
from time import perf_counter
from loguru import logger
from fastapi import APIRouter, FastAPI, Request, Response, Body
from fastapi.routing import APIRoute
from typing import Callable, List
from fastapi.responses import Response
from apps.utils import json_helper
import uuid
import shortuuid
from datetime import datetime
from user_agents import parse
from urllib.parse import parse_qs
# 因为Fastapi无法再中间二次消费请求的问题,只能通过自定义的路由的方式来进行日志的记录
class ContextLogerRoute(APIRoute):
pass
# 配置需要特殊记录的请求的头的值的信息
nesss_access_heads_keys = []
# 封装一下关于记录序号的日志记录用于全链路的日志请求的日志
@staticmethod
async def async_trace_add_log_record(request: Request, event_type='', msg={}, remarks=''):
'''
:param event_type: 日志记录事件描述
:param msg: 日志记录信息字典
:param remarks: 日志备注信息
:return:
'''
# print("我当前的请求ID:",request.app.state.curr_request,id(request.app.state.curr_request))
# print("我当前的请求ID:", request,id(request))
#
# print("我当前的请求ID:", request.app.state.curr_request.state.ssss)
# print("我当前的请求ID:", request.state.ssss)
# 如果没有这个标记的属性的,说明这个接口的不需要记录啦!
if hasattr(request.state, 'traceid'):
# 自增编号索引序
trace_links_index = request.state.trace_links_index = getattr(request.state, 'trace_links_index') + 1
log = {
# 自定义一个新的参数复制到我们的请求上下文的对象中
'traceid': getattr(request.state, 'traceid'),
# 定义链路所以序号
'trace_index': trace_links_index,
# 时间类型描述描述
'event_type': event_type,
# 日志内容详情
'msg': msg,
# 日志备注信息
'remarks': remarks,
}
# 为少少相关记录,删除不必要的为空的日志内容信息,
if not remarks:
log.pop('remarks')
if not msg:
log.pop('msg')
try:
log_msg = json_helper.dict_to_json_ensure_ascii(log) # 返回文本
logger.info(log_msg)
except:
logger.info(getattr(request.state, 'traceid') + ':索引:' + str(getattr(request.state, 'trace_links_index')) + ':日志信息写入异常')
async def _init_trace_start_log_record(self, request: Request):
'''
请求记录初始化
:return:
'''
# 配置当前的清除的上下文对象
# request.app.
path_info = request.url.path
if path_info not in ['/favicon.ico'] and 'websocket' not in path_info:
if request.method != 'OPTIONS':
# 追踪索引
request.state.trace_links_index = 0
# 追踪ID
# request.traceid = str(uuid.uuid4()).replace('-', '')
request.state.traceid = shortuuid.uuid()
# 计算时间
request.state.start_time = perf_counter()
# 获取请求来源的IP,请求的方法
ip, method, url = request.client.host, request.method, request.url.path
# print('scope', request.scope)
# 先看表单有没有数据:
try:
body_form = await request.form()
except:
body_form = None
body = None
try:
body_bytes = await request.body()
if body_bytes:
try:
body = await request.json()
except:
pass
if body_bytes:
try:
body = body_bytes.decode('utf-8')
except:
body = body_bytes.decode('gb2312')
except:
pass
# 从头部里面获取出对应的请求头信息,用户用户机型等信息获取
user_agent = parse(request.headers["user-agent"])
browser = user_agent.browser.version
if len(browser) >= 2:
browser_major, browser_minor = browser[0], browser[1]
else:
browser_major, browser_minor = 0, 0
user_os = user_agent.os.version
if len(user_os) >= 2:
os_major, os_minor = user_os[0], user_os[1]
else:
os_major, os_minor = 0, 0
log_msg = {
# 'headers': str(request.headers),
# 'user_agent': str(request.user_agent),
# 记录请求头信息----如果需要特殊的获取某些请求的记录则做相关的配置即可
'headers': [request.headers.get(i, '') for i in self.nesss_access_heads_keys] if self.nesss_access_heads_keys else None,
# 记录请求URL信息
"useragent":
{
"os": "{} {}".format(user_agent.os.family, user_agent.os.version_string),
'browser': "{} {}".format(user_agent.browser.family, user_agent.browser.version_string),
"device": {
"family": user_agent.device.family,
"brand": user_agent.device.brand,
"model": user_agent.device.model,
}
},
'url': url,
# 记录请求方法
'method': method,
# 记录请求来源IP
'ip': ip,
# 'path': request.path,
# 记录请求提交的参数信息
'params': {
'query_params': parse_qs(str(request.query_params)),
'from': body_form,
'body': body
},
# 记录请求的开始时间
"ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
# 'start_time': f'{(start_time)}',
}
if not log_msg['headers']:
log_msg.pop('headers')
if not log_msg['params']['query_params']:
log_msg['params'].pop('query_params')
if not log_msg['params']['from']:
log_msg['params'].pop('from')
if not log_msg['params']['body']:
log_msg['params'].pop('body')
# 执行写入--日志具体的内容信息
await self.async_trace_add_log_record(request, event_type='request', msg=log_msg)
async def _init_trace_end_log_record(self, request: Request, response: Response):
# https://stackoverflow.com/questions/64115628/get-starlette-request-body-in-the-middleware-context
# 如果响应图的类型,仅仅记录字符串类型的结尾的日志信息
if 'image' not in response.media_type and hasattr(request.state, 'traceid'):
start_time = getattr(request.state, 'start_time')
end_time = f'{(perf_counter() - start_time):.2f}'
# 获取响应报文信息内容
rsp = None
if isinstance(response, Response):
rsp = str(response.body, encoding='utf-8')
log_msg = {
# 记录请求耗时
"status_code": response.status_code,
'cost_time': end_time,
# 记录请求响应的最终报文信息--eval的作用是去除相关的 转义符号 ""ok""===》ok
'rsp': json_helper.json_to_dict(rsp),
"ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
}
await self.async_trace_add_log_record(request, event_type='response', msg=log_msg)
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
# 自定义路由的方式内容
async def custom_route_handler(request: Request) -> Response:
# 请求前的处理-日志的初始化操作
await self._init_trace_start_log_record(request)
response: Response = await original_route_handler(request)
# 一个API请求处理完成后的-日志收尾记录
await self._init_trace_end_log_record(request, response)
return response
return custom_route_handler
复制代码
有了上面的插件之后,那接下里就可以直接再app对象里面注册了.
有了上面仅仅是对我们的日志对象和我们日志信息的配置的初始化,下一步我们还需要对应我们的路由的日志也进行初始化:
给我们的路由添加自定义的路由实现,用于请求日志的记录:
有了上面的注册时候,如果在有需要记录日志的地方只需要使用:
await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface)
复制代码
验证示例如下:
从上面看我们可以把整个请求的链路日志给记录下来了!
2.2.2 注册全局异常捕获信息
全局异常处理器,对于统一异常的拦截捕获和统一响应报文,是非常关键滴!
2.2.2.1 异常类插件-扩展位置:
2.2.2.2 异常类插件- 注册方式:
思考,其实我们的所有的插件扩展思路无法就是把我们app对象注册到我们的扩展中,让他们可以在我们的扩展里面引用app对象相关调用,所以顺着这个思路其实我们扩展我们所谓的插件,这样就可以很方面定义自己需要东西了!
如我们的全局异常的对象主要使用的我们的app的异常拦截的,所以我们的可以自定义自己的一个异常的捕获类,来处理相关的所有的异常。
2.2.2.3 异常类插件- 类实现:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/15
-------------------------------------------------
修改描述-2021/7/15:
-------------------------------------------------
"""
from fastapi import FastAPI, Request
from apps.response.json_response import *
from starlette.exceptions import HTTPException as StarletteHTTPException
from fastapi.exceptions import HTTPException as FastapiHTTPException
from fastapi.exceptions import RequestValidationError
from pydantic.errors import *
from apps.ext.logger import logger
import traceback
from apps.utils.singleton_helper import Singleton
@Singleton
class ApiExceptionHandler():
def __init__(self, app=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if app is not None:
self.init_app(app)
def init_app(self, app: FastAPI):
# @app.exception_handler(StarletteHTTPException)
# @app.exception_handler(RequestValidationError)
# @app.exception_handler(Exception)
app.add_exception_handler(Exception, handler=self.all_exception_handler)
# 捕获StarletteHTTPException返回的错误异常,如返回405的异常的时候,走的是这个地方
app.add_exception_handler(StarletteHTTPException, handler=self.http_exception_handler)
app.add_exception_handler(RequestValidationError, handler=self.validation_exception_handler)
async def validation_exception_handler(self, request: Request, exc: RequestValidationError):
# print("参数提交异常错误selfself", exc.errors()[0].get('loc'))
# 路径参数错误
# 判断错误类型
if isinstance(exc.raw_errors[0].exc, IntegerError):
pass
elif isinstance(exc.raw_errors[0].exc, MissingError):
pass
return ParameterException(http_status_code=400, api_code=400, message='参数校验错误', result={
"detail": exc.errors(),
"body": exc.body
})
async def all_exception_handler(self, request: Request, exc: Exception):
'''
全局的捕获抛出的HTTPException异常,注意这里需要使用StarletteHTTPException的才可以
:param request:
:param exc:
:return:
'''
# log_msg = f"捕获到系统错误:请求路径:{request.url.path}\n错误信息:{traceback.format_exc()}"
if isinstance(exc, StarletteHTTPException) or isinstance(exc, FastapiHTTPException):
if exc.status_code == 405:
return MethodnotallowedException()
if exc.status_code == 404:
return NotfoundException()
elif exc.status_code == 429:
return LimiterResException()
elif exc.status_code == 500:
return InternalErrorException()
elif exc.status_code == 400:
# 有部分的地方直接的选择使用raise的方式抛出了异常,这里也需要进程处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')
return BadrequestException(msg=exc.detail)
return BadrequestException()
else:
# 其他内部的异常的错误拦截处理
logger.exception(exc)
traceback.print_exc()
return InternalErrorException()
async def http_exception_handler(self, request: Request, exc: StarletteHTTPException):
'''
全局的捕获抛出的HTTPException异常,注意这里需要使用StarletteHTTPException的才可以
:param request:
:param exc:
:return:
'''
# 这里全局监听了我们的所有的HTTP响应,包括了200 的也会尽到这里来!
# log_msg = f"捕获到系统错误:请求路径:{request.url.path}\n错误信息:{traceback.format_exc()}"
if exc.status_code == 405:
return MethodnotallowedException()
if exc.status_code == 404:
return NotfoundException()
elif exc.status_code == 429:
return LimiterResException()
elif exc.status_code == 500:
return InternalErrorException()
elif exc.status_code == 400:
# 有部分的地方直接的选择使用raise的方式抛出了异常,这里也需要进程处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')
return BadrequestException(msg=exc.detail)
复制代码
PS:上面实现其实可以使用两种方式,一个是定义函数的方式,然后通过add_exception_handler处理我们的异常;另一种其实就是直接装饰器的方式,为了清晰点我们的采取的第一种方案!
2.2.2.4 异常类插件- 验证:
验证函数故意引发异常:
结果:
异常引发捕获位置:
对应的引发异常响应报文定义:
2.2.2.5 补充全局响应报文定义):
定义的位置:
默认其实是对JSON解析的有三种,我们为方便分开了三个,这个只显示一个!
json_response.py
```
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : json_response
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/15
-------------------------------------------------
修改描述-2021/7/15:
-------------------------------------------------
"""
from typing import Any, Dict, Optional
# 自定义返回的错误的响应体信息
# ORJSONResponse一依赖于:orjson
from fastapi.responses import JSONResponse
import time
from fastapi.encoders import jsonable_encoder
class ApiResponse(JSONResponse):
# 定义返回响应码--如果不指定的话则默认都是返回200
http_status_code = 200
# 默认成功
api_code = 0
# 默认Node.如果是必选的,去掉默认值即可
result: Optional[Dict[str, Any]] = None # 结果可以是{} 或 []
message = '成功'
success = True
timestamp = int(time.time() * 1000)
def __init__(self, success= None, http_status_code=None, api_code=None, result=None, message=None, **options):
if result:
self.result = result
if message:
self.message = message
if api_code:
self.api_code = api_code
if success != None:
self.success = success
if http_status_code:
self.http_status_code = http_status_code
# 返回内容体
body = dict(
message=self.message,
code=self.api_code,
success=self.success,
result=self.result,
timestamp=self.timestamp,
# 形如request="POST v1/client/register"
# request=request.method + ' ' + self.get_url_no_param()
)
# customize_headers = {
# # 'Access-Control-Allow-Origin': '*',
# # access-control-allow-methods: DELETE, GET, OPTIONS, PATCH, POST, PUT
# 'access-control-allow-methods': 'DELETE, GET, OPTIONS, PATCH, POST, PUT',
# 'Access-Control-Allow-Origin': '*',
# 'Access-Control-Allow-Headers': '*,X-Access-Token,School-Teacher-Token,school-teacher-token,T-Access-Token,x-access-token,Referer, Accept, Origin, User-Agent,X-Requested-With, Content-Type, X-File-Name',
# # 'Access-Control-Request-Headers': 'Content-Type,Access-Token',
# 'Content-Type': 'application/json;charset=UTF-8'
# }
# jsonable_encoder 处理不同字符串返回 比如时间戳 datatime类型的处理
super(ApiResponse, self).__init__(status_code=self.http_status_code, content=jsonable_encoder(body), **options)
# 这个render会自动调用,如果这里需要特殊的处理的话,可以重写这个地方
# def render(self, content: Any) -> bytes:
#
# return dict_to_json_ensure_ascii_indent(content)
class BadrequestException(ApiResponse):
http_status_code = 400
api_code = 10031
result = None # 结果可以是{} 或 []
message = '错误的请求'
success = False
class LimiterResException(ApiResponse):
http_status_code = 429
api_code = 429
result = None # 结果可以是{} 或 []
message = '访问的速度过快'
success = False
class ParameterException(ApiResponse):
http_status_code = 400
result = {}
message = '参数校验错误,请检查提交的参数信息'
api_code = 10031
success = False
class UnauthorizedException(ApiResponse):
http_status_code = 401
result = {}
message = '未经许可授权'
api_code = 10032
success = False
class ForbiddenException(ApiResponse):
http_status_code = 403
result = {}
message = '失败!当前访问没有权限,或操作的数据没权限!'
api_code = 10033
success = False
class NotfoundException(ApiResponse):
http_status_code = 404
result = {}
message = '访问地址不存在'
api_code = 10034
success = False
class MethodnotallowedException(ApiResponse):
http_status_code = 405
result = {}
message = '不允许使用此方法提交访问'
api_code = 10034
success = False
class OtherException(ApiResponse):
http_status_code = 800
result = {}
message = '未知的其他HTTPEOOER异常'
api_code = 10034
success = False
class InternalErrorException(ApiResponse):
http_status_code = 500
result = {}
message = '程序员哥哥睡眠不足,系统崩溃了!'
api_code = 500
success = False
class InvalidTokenException(ApiResponse):
http_status_code = 401
api_code = 401
message = '很久没操作,令牌失效'
success = False
class ExpiredTokenException(ApiResponse):
http_status_code = 422
message = '很久没操作,令牌过期'
api_code = 10050
success = False
class FileTooLargeException(ApiResponse):
http_status_code = 413
api_code = 413
result = None # 结果可以是{} 或 []
message = '文件体积过大'
class FileTooManyException(ApiResponse):
http_status_code = 413
message = '文件数量过多'
api_code = 10120
result = None # 结果可以是{} 或 []
class FileExtensionException(ApiResponse):
http_status_code = 401
message = '文件扩展名不符合规范'
api_code = 10121
result = None # 结果可以是{} 或 []
class Success(ApiResponse):
http_status_code = 200
api_code = 200
result = None # 结果可以是{} 或 []
message = '自定义成功返回'
success = True
class Fail(ApiResponse):
http_status_code = 200
api_code = 200
result = None # 结果可以是{} 或 []
message = '自定义成功返回'
success = False
```
复制代码
注意点:
上面的一个地方是引入jsonable_encoder 解决的事是部分的json数据类型的的问题!
2.2.3 全局配置跨域设置
这个跨域比较简单,这里不展开叙述,这里提一点就是可以针对某个地址ijnx跨域设置!通过设置:allow_origins来设置自持跨域的白名单。
2.2.4 注册全局中间件的注册
通常全局中间件处理一般对处理认证的,因为大部分的认证都是所有的接口的,所以为了方便,使用中间件的方式进行认证处理是最简单的方式。
这里我们使用全局认证的插件来说明:
2.2.4.1 全局中间件-认证中间件位置:
2.2.4.2 全局中间件-注册方式:
2.2.4.3 全局中间件-实现类:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : auth
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/6/7
-------------------------------------------------
修改描述-2021/6/7:
-------------------------------------------------
"""
from time import perf_counter
from loguru import logger
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from apps.config.auth_conf import auth as auth_conf
from apps.response.json_response import ForbiddenException, InvalidTokenException, ExpiredTokenException, JSONResponse
from apps.ext.jwt.simple_auth import SimpleAuth as Auth
from fastapi import HTTPException
from starlette.status import HTTP_400_BAD_REQUEST
class AuthMiddleware(BaseHTTPMiddleware):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 认证的形式 1:使用默认的,2 使用自定义的
self.auth_type = 2
def check_auth_token(self, request):
'''
第一步:检测URL地址和携带认证请求头字段信息
:param request:
:return:
'''
# 如果是使用系统自带的认证的虎,它的需要的认证请求头是必须是Authorization的这个的,当然也可以自定义,不过还不知道咋玩
while_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE
# 只有不在白名单的地址需要进行认证的授权的校验
if request.url.path not in while_auth_ulr and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path:
if self.auth_type == 1:
token = request.headers.get('Authorization', None)
if not token:
return ForbiddenException()
else:
# 从头部提取关键的授权码信息
token = request.headers.get('X-Access-Token', None)
if not token:
# 从get里面进行提取
return ForbiddenException()
# 下面这种方式,会到全局异常捕获那进行处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')
return token
def authenticate_credentials(self, token):
'''
第2步:检测URL地址和携带认证请求头字段信息
:param token:
:return:
'''
isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token)
if not isok and state == 1:
return InvalidTokenException()
if not isok and state == 2:
return ExpiredTokenException()
return token_userinfo_result
async def authenticate_credentials_user_info(self, token_userinfo_result):
'''
进行TOken内部的包含的用户信息的验证
:param token:
:return:
'''
isok, isstatus = False, 2
if not isok:
return ForbiddenException(msg='该用户已经不存在,请联系管理员!')
# 用户状态(1-正常,2-冻结)
# if isstatus.get('status') == 2:
# return ForbiddenException(msg='该用户已经被冻结,请联系管理员!')
async def dispatch(self, request: Request, call_next):
# # ---协程对象的返回-使用方法封装后---返回值的处理需要使用这样方式进行---注意返回的时候处理
# if isinstance(token_result, JSONResponse):
#
# return token_result
# 1:检测是否协调认证信息,没有则返回错误提示,有则返回对应的Token的值
# 如果是使用系统自带的认证的虎,它的需要的认证请求头是必须是Authorization的这个的,当然也可以自定义,不过还不知道咋玩
while_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE
# print('while_auth_ulr',while_auth_ulr)
# 只有不在白名单的地址需要进行认证的授权的校验
# print("鉴权出来11111111111111111")
# print('aaaaaaaaawhile_auth_ulr', while_auth_ulr)
if request.scope["method"]!='OPTIONS' and request.url.path not in while_auth_ulr and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path:
if self.auth_type == 1:
token = request.headers.get('Authorization', None)
if not token:
return ForbiddenException()
else:
# 从头部提取关键的授权码信息
# print("鉴权出来11111111111111111")
token = request.headers.get('X-Access-Token', None)
# print("鉴权出来11111111111111111",token)
if not token:
# 从get里面进行提取
return ForbiddenException()
isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token)
if not isok and state == 1:
return InvalidTokenException()
if not isok and state == 2:
return ExpiredTokenException()
# 写入当前请求上下的当前对象
request.state.token_userinfo_result = token_userinfo_result
response = await call_next(request)
return response
复制代码
2.2.4.3 全局中间件-认证验证:
新增一个接口,没加入白名单:
开启我们的中间件热的认证:
访问没加白的地址:
2.2.5 注册全局的启动和关闭事件
这个地方其实没什么需要特殊的全局处理,所以没什么内容,当然如果后续的如数据库的地方,就需要处理下,但是不是使用这种方式来处理了!而是也是插件的方式来处理!所以这里没什么内容需要展开的,或者可以忽略!
2.2.6 注册全局第三方扩展插件实例
这里第三方的扩展的示例,可以根据自身的需要是否需要再这里进行注册实例化。
我这里的话其实可以选择实例化,比如实例化一个自定义实现了async_client,这样可以就可以全局再其他地址直接使用这个对象实例了!
2.2.6.1 插件 AsynClientSession 实例-位置:
2.2.6.2 插件 AsynClientSession 实例-注册:
2.2.6.3 插件 AsynClientSession 实例-类实现:
这个自定义实现,主要是增加第三方接口请求的时候相关的日志,
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/16
-------------------------------------------------
修改描述-2021/7/16: Http异步的客户端的请求日志封装
-------------------------------------------------
"""
from dataclasses import dataclass
from aiohttp import ClientSession
from dotmap import DotMap
import traceback
import aiohttp
# 加入日志记录
from apps.ext.logger.contexr_logger_route import ContextLogerRoute
from fastapi import Request,FastAPI
from apps.utils.singleton_helper import Singleton
from urllib.parse import parse_qs
from fastapi import FastAPI
@Singleton
@dataclass
class AsynClientSession():
pass
def __init__(self, aiohttp_session: ClientSession = None,app: FastAPI=None):
self.session = aiohttp_session
# 如果有APPC传入则直接的进行初始化的操作即可
if app is not None:
self.init_app(app)
def init_app( self,app: FastAPI):
self.app = app
async def request(self,api_url, method='GET', headers={},islogrecord=False, params=None):
try:
if islogrecord and not getattr(self.app.state,'curr_request'):
raise Exception('需传入FastapiApp对象,并需要注册全局设置请求体对象的上下文中间件')
if not self.session:
# 使用with会自动的关闭链接-Unclosed client session
async with aiohttp.ClientSession() as session:
async with session.request(url=api_url, method=method, headers=headers, params=params) as resp:
# 处理抛出异常状态又
resp.raise_for_status()
if resp.status in (401, 403):
raise Exception("接口请求异常!401或403错误")
# print('resp.content_type',resp.content_type)
try:
response = await resp.json()
except:
response = await resp.text()
# 日志记录
if islogrecord and self.app:
info_interface = {
'url': api_url,
'method': method,
'headers': str(headers) if headers else '',
'params': parse_qs(str(params)),
'state_code': str(resp.status),
'result': response,
}
await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface)
else:
async with self.session.request(url=api_url, method=method, headers=headers, params=params) as resp:
# 处理抛出异常状态又
resp.raise_for_status()
if resp.status in (401, 403):
raise Exception("接口请求异常!401或403错误")
response = await resp.json()
# 需要手动的进行关闭
await self.session.close()
return response
except Exception:
traceback.print_exc()
async_client= AsynClientSession()
if __name__ == '__main__':
from asyncio import run
async def main():
results = await async_client.request(api_url='http://127.0.0.1:8080/check',islogrecord=False)
print(results)
run(main())
复制代码
2.2.6.4 插件 AsynClientSession 实例-验证:
查看请求记录日志信息:
2.2.7 批量导入注册路由
因为我们的一个项目里面可能包含的路由比较多,如果一个一个的去
app.include_router(router)
复制代码
我个人是不太喜欢这种方式!所以我自己参考了以前flask的模式,进行批量的导入注册, 也就是说,寻找某个模块下某个实例对象属性,进行统一的
app.include_router(router)
复制代码
具体的实现方式如下步骤:
2.2.7.1 指定导入模块:
2.2.7.2 批量导入工具类-指定导入项目目录:
知道了要查询的目录,然后遍历下面有没有关于
bp
复制代码
实例属性,然后动态进行挂载:
router = getattr(module, key_attribute)
# 已经全局挂载还需要吗?
# router.route_class = ContextLogerRoute
app.include_router(router)
复制代码
2.2.7.3 批量导入工具类-实现:
from fastapi import FastAPI, FastAPI
from fastapi import APIRouter
from ZtjDirImport import DirImport
from apps.utils.modules_helper import find_modules, import_string
def print_all_routes_info(app: FastAPI):
for ro in app.routes:
print('name:', ro.name, '=====>', 'path:', ro.path)
# 通过模块的属性值来导入---注意是模块 不是__init
def register_nestable_blueprint(app=None, project_name=None, api_name='api', key_attribute='bp', hongtu='hongtu'):
'''
自动的导入的蓝图模块
:param app:
:return:
'''
if not app:
import warnings
warnings.warn('路由注册失败,需要传入Flask对象实例')
return None
if project_name:
# include_packages 这个设置为True很关键,它包含了 检测 对于的_init__内的属性,这个对于外层的遍历的来说很关键
modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True)
for name in modules:
module = import_string(name)
if hasattr(module, key_attribute):
# app.register_blueprint(module.mmpbp)
# lantu = getattr(module,key_attribute)
# print('sdasda',getattr(module,key_attribute).__dict__)
app.include_router(getattr(module, key_attribute))
# app.register_blueprint(getattr(module,key_attribute))
if hasattr(module, hongtu): pass
# print('符合紅土', name)
# getattr(module, hongtu).register(lantu)
else:
import warnings
warnings.warn('路由注册失败,外部项目名称还没定义')
def register_nestable_blueprint_for_log(app=None, project_name=None, api_name='api',scan_name='api', key_attribute='bp', hongtu='hongtu'):
'''
自动的导入的蓝图模块
:param app:
:return:
'''
if not app:
import warnings
warnings.warn('路由注册失败,需要传入Fastapi对象实例')
return None
if project_name:
# include_packages 这个设置为True很关键,它包含了 检测 对于的_init__内的属性,这个对于外层的遍历的来说很关键
modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True)
from apps.ext.logger.contexr_logger_route import ContextLogerRoute
for name in modules:
module = import_string(name)
# 只找某个模块开始的,避免无意义的其他扫描
if not name.endswith(scan_name):
continue
if hasattr(module, key_attribute):
# app.register_blueprint(module.mmpbp)
# lantu = getattr(module,key_attribute)
router = getattr(module, key_attribute)
# 已经全局挂载还需要吗?
# router.route_class = ContextLogerRoute
app.include_router(router)
# app.register_blueprint(getattr(module,key_attribute))
if hasattr(module, hongtu): pass
# print('符合紅土', name)
# getattr(module, hongtu).register(lantu)
else:
import warnings
warnings.warn('路由注册失败,外部项目名称还没定义')
复制代码
关于里面使用到的
from apps.utils.modules_helper import find_modules, import_string
复制代码
主要是使用flask的代码:
import sys
# from werkzeug.utils import find_modules, import_string
import pkgutil
def import_string(import_name, silent=False):
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).
If `silent` is True the return value will be `None` if the import fails.
:param import_name: the dotted name for the object to import.
:param silent: if set to `True` import errors are ignored and
`None` is returned instead.
:return: imported object
"""
# force the import name to automatically convert to strings
# __import__ is not able to handle unicode strings in the fromlist
# if the module is a package
import_name = str(import_name).replace(":", ".")
try:
try:
__import__(import_name)
except ImportError:
if "." not in import_name:
raise
else:
return sys.modules[import_name]
module_name, obj_name = import_name.rsplit(".", 1)
module = __import__(module_name, globals(), locals(), [obj_name])
try:
return getattr(module, obj_name)
except AttributeError as e:
raise ImportError(e)
except ImportError as e:
print("导入异常", e)
def find_modules(import_path, include_packages=False, recursive=False):
"""Finds all the modules below a package. This can be useful to
automatically import all views controllers so that their metaclasses
function decorators have a chance to register themselves on the
application.
Packages are not returned unless `include_packages` is `True`. This can
also recursively list modules but in that case it will import all the
packages to get the correct load path of that module.
:param import_path: the dotted name for the package to find child modules.
:param include_packages: set to `True` if packages should be returned, too.
:param recursive: set to `True` if recursion should happen.
:return: generator
"""
module = import_string(import_path)
path = getattr(module, "__path__", None)
if path is None:
raise ValueError("%r is not a package" % import_path)
basename = module.__name__ + "."
for _importer, modname, ispkg in pkgutil.iter_modules(path):
modname = basename + modname
if ispkg:
if include_packages:
yield modname
if recursive:
for item in find_modules(modname, include_packages, True):
yield item
else:
yield modname
def get_modules(package="."):
"""
获取包名下所有非__init__的模块名
"""
import os
modules = []
files = os.listdir(package)
for file in files:
if not file.startswith("__"):
name, ext = os.path.splitext(file)
modules.append(name)
print("名称", name)
return modules
复制代码
2.2.7.4 批量导入工具类-最终效果:
项目接口的定义:
接口定义示例:
不需要太多的干预处理直接的批量导入相关API接口定义:
以上就是关于路由批量导入的一些简要说明,后续有机会再针对这个定义API再展开一下!
2.3 插件异步redis示例扩展
我们的异步redis也是已插件的方式注册和实例化,主要的是以
上面这种方式引入的我们的app对象来接管注册对应的钩子函数,并在钩子函数处理相关事件处理。
2.3.1 插件定义位置:
2.3.1 插件类实现定义:
#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/27
-------------------------------------------------
修改描述-2021/7/27:
-------------------------------------------------
# await app.state.redis.set("my-key", "valueaaaaaaaaaaaa")
# value = await app.state.redis.get("my-key")
# print(value)
# print("HASH字典的操作")
# await self.hmset_dict("hash", key1="value1", key2="value2", key3=123)
# result = await self.hgetall("hash")
# print("HASH字典的操作",result)
# result = await self.add_str_ex('sdsds','sssssssssssssss')
# print(result)
# value = await app.state.redis.get("sdsds")
# print(value)
"""
from aioredis import Redis, create_redis_pool, create_sentinel
from apps.config.redis_conf import redisconf
from typing import Tuple, Any
from fastapi import FastAPI
from apps.utils.singleton_helper import Singleton
from contextlib import asynccontextmanager
import asyncio
import json
import datetime
from typing import Set, Any, Optional
# from functools import cached_property, lru_cache
# Python 3.8的cached_property
@Singleton
class AsyncRedisClient():
def __init__(self, app: FastAPI = None):
# 如果有APPC传入则直接的进行初始化的操作即可
self.redis = None
if app is not None:
self.init_app(app)
def init_app(self, app: FastAPI):
self.app = app
@app.on_event("startup")
async def startup_event():
app.state.redis = await self.init_redis_pool()
# 初始化缓冲器对象
from apps.ext.cache import FastAPICache
from apps.ext.cache.backends.redis import RedisBackend
FastAPICache.init(RedisBackend(self.redis_db), prefix="xxx-cache")
# 登入状态集
await self.setbit('login_status', 100010, 1)
await self.setbit('login_status', 100011, 1)
erer = await self.getbit('login_status', 100011)
print('100011的在线状态', erer)
erer = await self.getbit('login_status', 100012)
print('100012的在线状态', erer)
erer = await self.getbit('login_status', 100010)
print('100010的在线状态', erer)
await self.setbit('login_status', 100010, 0)
erer = await self.getbit('login_status', 100010)
print('100010的在线状态', erer)
# 签到处理
# key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 15, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 14, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 13, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 12, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 1)
erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 15)
print('100010在202105的16号的签到情况', erer)
erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 11)
print('100010在202105的16号的签到情况', erer)
erer = await self.bitcount('uid:sign:{0}:{1}'.format(100010, 202105))
print('100010在202105签到总次数', erer)
erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试一下看看', erer)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 0)
erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试xiugaiwehou一下看看', erer)
erer = await self.set_sign_status(100010)
print('100010在202105签到:', erer)
erer = await self.get_sign_status(100010)
print('100010在202105签到:', erer)
erer = await self.get_user_week_sign_status(100010)
print('100010这周的签到的情况:', erer)
erer = await self.get_user_month_sign_status(100010)
print('100010这月的签到的情况:', erer)
@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()
async def get_redis(self):
return self.redis
# @cached_property
async def init_redis_pool(self) -> Redis:
if redisconf.use_redis_sentinel:
# 创建哨兵机制模型的下的链接对象
sentinel = await create_sentinel(
[(redisconf.redis_sentinel_url, redisconf.redis_sentinel_port)],
db=redisconf.redis_db,
password=redisconf.redis_password,
encoding="utf-8",
)
self.redis_db = sentinel.master_for(redisconf.redis_sentinel_master_name)
else:
# 创建连接池的方式
self.redis_db = await create_redis_pool(
redisconf.redis_url,
# password=redisconf.redis_password,
# encoding="utf-8",
# db=redisconf.redis_db,
)
# result = await self.set_json('sdsds',{
# 'sdas':2323,
# 'sdas222': {
# '你':'唉是就是基22地'
# }
# })
# print(result)
# result = await self.get_json('sdsds')
# print(result)
return self.redis
async def get_with_ttl(self, key: str) -> Tuple[int, str]:
async with self.redis_db.pipeline(transaction=True) as pipe:
return await (pipe.ttl(key).get(key).execute())
async def get(self, key) -> str:
return await self.redis_db.get(key)
async def set(self, key: str, value: str, expire: int = None):
return await self.redis_db.set(key, value, ex=expire)
async def setex(self, key, seconds, value):
print("ssssssssss")
return await self.redis_db.setex(key, seconds, value)
async def pttl(self, key: str) -> int:
"""Get PTTL from a Key"""
return int(await self.redis_db.pttl(key))
async def ttl(self, key: str) -> int:
"""Get TTL from a Key"""
return int(await self.redis_db.ttl(key))
async def pexpire(self, key: str, pexpire: int) -> bool:
return bool(await self.redis_db.pexpire(key, pexpire))
async def expire(self, key: str, expire: int) -> bool:
return bool(await self.redis_db.expire(key, expire))
async def incr(self, key: str) -> int:
"""Increases an Int Key"""
return int(await self.redis_db.incr(key))
async def decr(self, key: str) -> int:
"""Decreases an Int Key"""
return int(await self.redis_db.decr(key))
async def hmset_dict(self, key, **val) -> str:
return await self.redis_db.hmset_dict(key, **val)
async def hgetall(self, key, ):
return await self.redis_db.hgetall(key, encoding="utf-8")
# 不存在则加入,否则不变
async def add_str_nx(self, key, values): # value可以为复杂的json
return await self.redis_db.setnx(key, values)
# 加入缓存,存在会替换,并加入过期时间
async def add_str_ex(self, key, values, time=10): # value可以为复杂的json
return await self.redis_db.setex(key, time, values)
async def clear(self, namespace: str = None, key: str = None) -> int:
if namespace:
lua = f"for i, name in ipairs(redis.call('KEYS', '{namespace}:*')) do redis.call('DEL', name); end"
return await self.redis_db.eval(lua, numkeys=0)
elif key:
return await self.redis_db.delete(key)
async def check_lock(self, key):
"""
检查当前KEY是否有锁
"""
key = 'lock:%s' % key
status = await self.redis_db.get(key)
if status:
return True
else:
return False
async def acquire_lock(self, key, expire=30, step=0.03):
"""
为当前KEY加锁, 默认30秒自动解锁
"""
key = 'lock:%s' % key
while 1:
get_stored = await self.redis_db.get(key)
if get_stored:
await asyncio.sleep(step)
else:
lock = await self.redis_db.setnx(key, 1)
if lock:
await self.redis_db.expire(key, expire)
return True
async def release_lock(self, key):
"""
释放当前KEY的锁
"""
key = 'lock:%s' % key
await self.safe_delete(key)
@asynccontextmanager
async def with_lock(self, key, expire=30, step=0.03):
"""
@desc redis分布式锁封装
:param key: 缓存key
:param expire: 锁失效时间
:param step: 每次尝试获取锁的间隔
:return:
for example:
with RedisCacheProxy().with_lock("key_name") as lock:
"do something"
"""
try:
t = await self.acquire_lock(key, expire, step)
yield t
finally:
await self.release_lock(key)
async def get_many(self, keys: list) -> list:
"""
@desc 批量获取字符串
:params keys: [chan1, char2]
"""
data = await self.redis_db.mget(*keys, encoding="utf-8")
return data
async def set_many(self, data: dict):
"""批量设置字符串缓存"""
data = await self.redis_db.mset(data)
return data
async def get_data(self, key: str) -> str:
"""获取字符串数据并尝试转换json"""
value = await self.redis_db.get(key)
if value:
try:
value = json.loads(value.decode("utf-8"))
except:
pass
return value
async def set_data(self, key: str, value, ex: int = None):
"""尝试转正json字符串存储"""
try:
value = json.dumps(value)
except:
pass
return self.redis_db.set(key, value, ex=ex)
async def delete(self, key):
"""直接删除一个key"""
await self.redis_db.delete(key)
async def safe_delete(self, key: str):
"""失效一个key"""
await self.redis_db.expire(key, -1)
async def delete_many(self, keys: list) -> None:
"""批量key失效"""
await self.redis_db.delete(*keys)
async def exists(self, key: str) -> bool:
"""查询key是否存在"""
data = await self.redis_db.exists(key)
return data
def hget(self, key: str, field: str):
"""获取hash类型一个键值"""
return self.redis_db.hget(key, field)
def hmget(self, key: str, fields: list):
"""
批量获取hash类型键值
:param key:
:param fields:
:return:
"""
return self.redis_db.hmget(key, fields)
async def hget_data(self, key: str, field: str) -> Any:
"""获取hash的单个key"""
data = await self.redis_db.hget(key, field)
return json.loads(data) if data else None
async def hmget_data(self, key: str, fields: list) -> list:
"""
@desc hash类型获取缓存返回一个list
"""
data = await self.redis_db.hmget(key, *fields)
return [json.loads(i) if i is not None else None for i in data]
async def hmget2dict_data(self, key: str, fields: list) -> dict:
"""
@desc hash类型获取缓存返回一个dict,尝试转换json格式
"""
cache_list = await self.redis_db.hmget(key, fields)
return dict(zip(fields, [json.loads(i) if i is not None else None for i in cache_list]))
async def get_json(self, key: str) -> dict:
"""
@desc 获取json格式的字典数据
"""
data = await self.redis_db.hgetall(key)
if data:
return {k: json.loads(v) for k, v in dict(data).items()}
return {}
async def set_json(self, key: str, value: dict, ex: int = None):
"""
@desc 使用hash存贮json结构的数据
:return:
"""
cache_data = []
for k, v in value.items():
cache_data.extend([k, json.dumps(v)])
if not cache_data:
return True
pipe = self.redis_db.pipeline()
pipe.hmset(key, *cache_data)
if ex:
pipe.expire(key, int(ex))
res = await pipe.execute()
return res
async def sadd(self, key: str, values: list) -> int:
"""添加元素"""
if not values:
return 0
count = await self.redis_db.sadd(key, *values)
return count
async def spop(self, key: str, count: int = None) -> list:
"""从集合弹出元素"""
count = 1 if not count else count
values = await self.redis_db.spop(key, count=count)
return values if values else []
async def smembers(self, key: str) -> list:
"""返回一个集合所有元素"""
values = await self.redis_db.smembers(key)
return values if values else []
async def smembers_back_set(self, key: str) -> Set:
"""Gets Set Members"""
return set(await self.redis_connection.smembers(key))
async def scard(self, key: str) -> int:
"""获取一个集合的元素个数"""
count = await self.redis_db.scard(key)
return count
async def zadd(self,key, *args, **kwargs):
# redis zadd操作(批量设置值至args有序集合中)
if not (args or kwargs):
return False
count = await self.redis_db.zadd(key, *args, **kwargs)
return count
async def zrem(self,key, member, *members):
# redis zrem操作(删除name有序集合中的特定元素)
if not key:
return False
count = await self.redis_db.zrem(key,member, *members)
return count
async def zincrby(self,key, name, value, amount=1):
# 如果在key为name的zset中已经存在元素value,则该元素的score增加amount,否则向该集合中添加该元素,其score的值为amount
if not (name or value):
return False
return await self.redis_db.zincrby(key, value, amount)
async def zrevrank(self,key, value):
if not value:
return False
return await self.redis_db.zrevrank(key, value)
async def zscore(self, key,member):
if not member:
return False
return self.redis_db.zscore(key, member)
async def setbit(self, key: str, offset: int, value: int) -> int:
"""
1:设置或者清空 key 的 value 在 offset 处的 bit 值(只能是 0 或者 1)
2:只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。
3:需要注意的是 offset 从 0 开始
"""
count = await self.redis_db.setbit(key, offset, value)
return count
async def getbit(self, key: str, offset: int) -> int:
"""
1:获取 key 的 value 在 offset 处的 bit 位的值,当 key 不存在时,返回 0。
"""
count = await self.redis_db.getbit(key, offset)
return count
async def bitcount(self, key: Any) -> int:
"""
该指令用于统计给定的 bit 数组中,值 = 1 的 bit 位的数量。
"""
count = await self.redis_db.bitcount(key)
return count
async def bitpos(self, key: Any, bit: Any, start=None, end=None) -> int:
"""
1:返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。
2:在默认情况下, 命令将检测整个位图, 用户可以通过可选的 start 参数和 end 参数指定要检测的范围。
"""
count = await self.redis_db.bitpos(key, bit, start=start, end=end)
return count
# 签到功能的处理
async def set_sign_status(self, user_id: int, _singe_key='sign_in:', day=None, statue=1) -> int:
# 用户签到: 使用日期的来做key
if not day:
day = str(datetime.datetime.now())[:10]
return await self.setbit('{}:{}'.format(_singe_key, day), user_id, statue)
# 获取用户签到的状-当前日志用户今日签到状态,默认是当前的日期
async def get_sign_status(self, user_id: int, _singe_key='sign_in:', day=None) -> int:
if not day:
day = str(datetime.datetime.now())[:10]
return await self.getbit('{}:{}'.format(_singe_key, day), user_id)
# 查询用户求出这个周的签到状况,和总数
async def get_user_week_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
now = datetime.datetime.now()
# 周一是1 周日是7 now.weekday()则是周一是0,周日是6
weekday = now.isoweekday()
pipe = self.redis_db.pipeline()
for d in range(weekday):
check_day = str(now - datetime.timedelta(days=1) * d)[:10]
pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
res = await pipe.execute()
return res[::-1],sum(res[::-1])
# 查询用户求出这个月的签到状和总数
async def get_user_month_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
now = datetime.datetime.now()
# 周一是1 周日是7 now.weekday()则是周一是0,周日是6
day = now.day
pipe = self.redis_db.pipeline()
for d in range(day):
check_day = str(now - datetime.timedelta(days=1) * d)[:10]
pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
res = await pipe.execute()
return res[::-1],sum(res[::-1])
async_redis_client = AsyncRedisClient()
复制代码
2.3.1 插件类初始化和验证:
我们再app启动的时候就示例这个redis异步客户端实例,并且进行相关一些验证测试:
3.总结
关于脚手架内容概要提到几个点,上面讲述的几乎已全部包含了!鉴于文章太长!后续如果可以的话,再继续展开!!!
以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!
结尾
END
简书:www.jianshu.com/u/d6960089b…
掘金:juejin.cn/user/296393…
公众号:微信搜【小儿来一壶枸杞酒泡茶】
小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822