暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flask + Ant Vue 实战 (10)-JWT鉴权中间件

小儿来一壶枸杞酒泡茶 2021-02-25
1929

前言

一个接口通常我们需要做相关鉴权认证才可以执行后续相关逻辑处理。类似后台系统,用户需登入成功后, 只有验证通过才赋予相关访问接口的权限。

我们的后台就是基于jwt token的登入认证机制完成。

什么是JWT

JWT是JSON Web Token的缩写,目前我们的主流的无状态认证方案,多数是基于此,而不是基于session的方式。

后台的JWT Token生成过程

  1. 用户输入账号和密码信息, 服务器认证匹配
  2. 服务器认证以后,服务器不保存任何 session 数据,而是生成一个JSON格式的对象

(为了防止用户篡改数据,服务器在生成这个对象的时候,会加上签名,这个签名是JWT必须具备的)

  1. 返回token给到客户端
  2. 客户端后续相关其他接口认证都需要通过token提交到服务器认证(需要鉴权认证接口)

客户端提交token的方式可以根据自己来订,通常是通过 请求的头信息Authorization字段里进行提交

JWT Token 优缺点

  • 优点
  1. 无服务状态,服务器不需要保存任何 session 数据,可扩展性好
  2. 可以有效避免了CSRF 攻击( 跨站请求伪造,属于网络攻击领域范围)
  3. 适用于单点登入
  4. 支持跨域访问

  • 缺点
  1. 安全性问题,无法在使用过程中废止某个 token,或者更改 token 的权限,无法在服务端注销。

JWT 签发后在到有效期前始终有效,除非服务器部署额外的逻辑(一次作废,做JWT判断)

  1. 通常JTW本身包含用户一些认证信息,看我们需要写入的信息来定,所以用户信息我们一般不写入JWT中。
  2. 性能问题,可能JWT的信息过多。

JWT Token 延伸, token失效续签问题

用户输入账号和密码信息, 服务器认证匹配成功后返回两个 token。

  • acessToken 它的过期时间设置比较短一些。
  • refreshToken   它的过期时间设置比acessToken长一些。

具体流程是:

  1. 服务端认证成功后返回acessToken和refreshToken
  2. 客户端提交acessToken服务端
  3. 服务端校验 accessToken 的有效性,如已过期,客户端再将 refreshToken 传给服务端
  4. 服务端校验 refreshToken 的有效性,生成新的accessToken和refreshToken 给客户端

FLask 中的JWT的应用

1.安装pyjwt库

PS: 安装版本为1.7.1的版本

2.common包的ext下定义一个Jwt模块

主要职责是,token的签发和校验认证

#!/usr/bin/evn python
# coding=utf-8
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +
#        ┏┓   ┏┓+ +
#    ┏┛┻━━━┛┻┓ + +
#    ┃       ┃  
#    ┃   ━   ┃ ++ + + +
#    ████━████ ┃+
#    ┃       ┃ +
#    ┃   ┻   ┃
#    ┃       ┃ + +
#    ┗━┓   ┏━┛
#      ┃   ┃           
#      ┃   ┃ + + + +
#      ┃   ┃    Codes are far away from bugs with the animal protecting   
#      ┃   ┃ +     神兽保佑,代码无bug  
#      ┃   ┃
#      ┃   ┃  +         
#      ┃    ┗━━━┓ + +
#      ┃        ┣┓
#      ┃        ┏┛
#      ┗┓┓┏━┳┓┏┛ + + + +
#       ┃┫┫ ┃┫┫
#       ┗┻┛ ┗┻┛+ + + +
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +"""
"""
# 版权说明
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +

Licensed under the Apache License, Version 2.0 (the "
License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "
AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
# + + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + ++ + + +

# @Time  : 2020/12/22 13:03

# @Author : mayn

# @Project : ZyxFlaskadminSystem

# @FileName: __init__.py.py

# @Software: PyCharm

# 作者:小钟同学

# 著作权归作者所有

# 文件功能描述: 
"
""

import jwt
import datetime


class Auth():
    # 签名秘钥
    secret = 'super-man$&123das%qzq'
    # iss: 该JWT的签发者,是否使用是可选的;
    iss = 'xiaozhongtongxue'
    aud = 'www.xiaozhong.com'

    # 1000天有效期
    @classmethod
    def create_token_by_data(cls, sub='', data={}, scopes=['open'], exp_time=60 * 60 * 24 * 5):
        """
        生成对应的JWT的token值
        :param sub:    参数名称
        :param data:      参与签名的参数信息
        :param secret:   是否要求进行空检测,True必须检测
        :param exp_time:  token过期时间,按秒来计算
        :return:        返回处理后的参数
        "
""
        if not data:
            return False, {'token''''meg''需要签名信息不能为空'}

        payload = {
            "iss": cls.iss,  # iss: 该JWT的签发者,是否使用是可选的;
            "exp": datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=exp_time),  # 什么时候过期,这里是一个Unix时间戳,是否使用是可选的;
            "iat": datetime.datetime.utcnow(),  # 在什么时候签发的(UNIX时间),是否使用是可选的;
            "aud": cls.aud,  # 接收该JWT的一方,是否使用是可选的;#  如果在生成token的时候使用了aud参数,那么校验的时候也需要添加此参数
            "sub": sub,  # sub: 该JWT所面向的用户,是否使用是可选的;
            "scopes": scopes,  # 用户授权的作用域,使用逗号(,)分隔
            "data": data
        }
        # 不参与进行签名计算
        if not sub:
            payload.pop('sub')
        # token生成处理
        token = jwt.encode(payload, cls.secret, algorithm='HS256')
        # 返回授权token
        return True, str(token, 'utf-8')

    @classmethod
    def verify_bearer_token(cls, ischecck_sub=False, sub_in='', token=''):
        #  如果在生成token的时候使用了aud参数,那么校验的时候也需要添加此参数
        try:
            payload = jwt.decode(token, cls.secret, audience=cls.aud, algorithms=['HS256'])
            if ischecck_sub and sub_in != '':
                sub = payload['sub']
                if sub != sub_in:
                    return False, "无效的Token"

            if payload and ('data' in payload):
                # 验证通过返回对应的参与签名的字段信息
                return True, payload['data']
            else:
                raise jwt.InvalidTokenError

        except jwt.ExpiredSignatureError:
            return False, "Token过期"

        except jwt.InvalidTokenError:
            return False, "无效的Token"
        except:
            return False, "无效的Token"

    @classmethod
    def verify_bearer_token_state(cls, ischecck_sub=False,sub_in='', token=''):
        #  如果在生成token的时候使用了aud参数,那么校验的时候也需要添加此参数
        try:
            payload = jwt.decode(token, cls.secret, audience=cls.aud, algorithms=['HS256'])
            if ischecck_sub and sub_in != '':
                sub = payload['sub']
                if sub != sub_in:
                    return False, 1, "无效的Token"

            if payload and ('data' in payload):
                # 验证通过返回对应的参与签名的字段信息
                return True, 0, payload['data']
            else:
                raise jwt.InvalidTokenError

        except jwt.ExpiredSignatureError:
            return False, 2, "Token过期"

        except jwt.InvalidTokenError:
            return False, 1, "无效的Token"
        except:
            return False, 1, "无效的Token"



3.定义认证中间件MiddlewareJwtAuth

负责在请求前做拦截鉴权认证。

#!/usr/bin/evn python
# coding=utf-8
from flask import request
from common.exc import ForbiddenException, InvalidTokenException,ExpiredTokenException
from common.ext.jwt import Auth


class MiddlewareJwtAuth():

    def __init__(self, *args, **kwargs):
        pass
        # 定义头部提交自定义的请求头
        self.token_key = 'X-Access-Token'

    def init_app(self, app, _is_handlers_aout=True):
        self.app = app
        if _is_handlers_aout:
            self.register_handlers()
        return self

    def __before_request(self):
        pass
        # 对指定的URL进行需要鉴权认证校验
        if request.method.upper() != 'OPTIONS':
            while_auth_ulr = self.app.config.get('ADMIN_WHILE_ROUTE'"")
            if request.path not in while_auth_ulr and '/login' not in request.path:
                # 头部提取的token信息进行校验
                 if self.token_key in request.headers.keys():
                    token = request.headers[self.token_key]
                    if not token:
                        raise ForbiddenException(api_code=0, message='TOKEN信息不存在!', result=None, success=False)
                else:
                    raise ForbiddenException(api_code=0, message='TOKEN信息不存在!', result=None, success=False)

                # 如果存在TOKEN,验证TOEKN
                isok, state, token_result = Auth.verify_bearer_token_state(token=token)
                if not isok and state == 1:
                    raise InvalidTokenException()
                if not isok and state == 2:
                    raise ExpiredTokenException()

            # 执行其他逻辑验证处理

    def teardown_request(self, exc):
        '''
        请求结束的时候提交相关的日志
        :param exc:
        :return:
        '
''
        pass

    def register_handlers(self):
        self.app.before_request(self.__before_request)



PS:注意上面中间件我们的为了测试过滤了'/login' not in request.path,这个地址不需要认证鉴权,所以我们的直接让这请求放下跑了!

定义两个测试接口,一个用户登入之后签发Token.另一个用户测试中间件的鉴权拦截验证

from common.ext.xhttp import XHttp
if __name__ == '__main__':
    app = create_app()

    @app.route('/login')
    def login():
        # 模拟用户登入,并签发我们Token
        # 把用户信息和ID写入token
        date = {
            'username''xiaozhong',
            'userid''1',
        }
        from common.ext.jwt import Auth
        # 生成对于的此用户信息的token, 创建授权码,并设置过期的时间--授权码有效期为 24 * 31天
        _, token = Auth.create_token_by_data(sub='zyx', data=date, exp_time=60 * 60 * 24 * 2)
        # 返回Token
        return Success(result=token, message='登出成功')


    @app.route('/pay')
    def pay():
        # 模拟用户支付,并在我们的中间件里面做了相关的认证

        XHttp.get(url='http://www.baidu.com',isback_json=False,is_log=True)
        return '支付成功'

    app.run(host='127.0.0.1', port='8808', debug=True)



启动服务进行接口请求访问http://127.0.0.1:8808/login:

返回的内容:

{
    "message""登出成功",
    "code": 200,
    "success"true,
    "result""eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJ4aWFvemhvbmd0b25neHVlIiwiZXhwIjoxNjE0NDExMTE4LCJpYXQiOjE2MTQyMzgzMTgsImF1ZCI6Ind3dy54aWFvemhvbmcuY29tIiwic3ViIjoienl4Iiwic2NvcGVzIjpbIm9wZW4iXSwiZGF0YSI6eyJ1c2VybmFtZSI6InhpYW96aG9uZyIsInVzZXJpZCI6IjEifX0.lDSBGm6houMka35uS_JFBbQYs1EYn4EpBjygfUS0cLU",
    "timestamp": 1614238317193
}

请求我们的支付接口http://127.0.0.1:8808/pay:

不带请求头的情况:

带请求头但是不是正确的头的情况:

带请求头且传入上次返回的正确Toekn情况:

综上我们的已经晚上了相关的JWT的生产和鉴权认证,后续需要完善的就是鉴权白名单的配置。

END

以上是Jwt做鉴权一些实践记录。

小钟同学 | 文  【原创】【转载请联系本人】| QQ:308711822



文章转载自小儿来一壶枸杞酒泡茶,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论