需求:在已有的flask项目上添加权限控制系统, 不同的用户组有不同的访问权限,用户不单独设置权限(可以设) 效果: 用户进入主页自动跳转到登陆界面, 登陆后前端将后端返回的token写入session,每次访问时将token放入headers 中,后台通过token判断用户是否登陆以及是哪个用户、有什么权限
1. 创建用户、用户组数据库
首先,我们需要在数据库中创建两个表, User和PermissionGroup, 不命名为group是因为group在mysql中时关键字,会造成很多麻烦。为了方便的管理User和PermissionGroup多对多的关系,再创建一个UserGroupMap的对应关系表 ``` from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from itsdangerous import BadSignature, SignatureExpired from passlib.apps import custom_app_context as pwd_context
class PermissionGroup(db.Model): id = db.Column(db.Integer, primary_key=True) group_name = db.Column(db.String(255), unique=True) status = db.Column(db.Integer, default=0) # 0 inactive, 1 active create_date = db.Column(db.DateTime, default=datetime.datetime.now()) permission_list = db.Column(db.String(255), default=’’)
class User(db.Model): id = db.Column(db.Integer, primary_key=True) user_name = db.Column(db.String(255), unique=True) password_hash = db.Column(db.String(128), default=’’) email = db.Column(db.String(255), unique=True) status = db.Column(db.Integer, default=0) # 0 inactive, 1 active create_date = db.Column(db.DateTime, default=datetime.datetime.now())
1
2
3
4
5
6
7
8
9
10
11
def hash_password(self, password):
self.password_hash = pwd_context.encrypt(password)
def verify_password(self, password):
return pwd_context.verify(password, self.password_hash)
def generate_auth_token(self, expiration=3600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
print app.config['SECRET_KEY']
print s
return s.dumps({'id': self.id})
class UserGroupMap(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer) permission_group_id = db.Column(db.Integer)
1
2
3
4
5
6
7
8
9
10
11
12
13
可以看到User 下有三个类函数, hash_password用来存储用户密码加密后的字段,verify_password用来验证用户密码。generate_auth_token 用来生成访问令牌。
出于安全原因,用户的原始密码将不被存储,密码在注册时被散列后存储到数据库中。使用散列密码的话,如果用户数据库不小心落入恶意攻击者的手里,他们也很难从散列中解析到真实的密码。
>密码 决不能 很明确地存储在用户数据库中。
为了创建密码散列,我将会使用 PassLib 库,一个专门用于密码散列的 Python 包。
PassLib 提供了多种散列算法供选择。custom_app_context 是一个易于使用的基于 sha256_crypt 的散列算法。
## 2. token的生成、获取和判断
generate_auth_token() 方法生成一个以用户 id 值为值,’id’ 为关键字的字典的加密令牌。令牌中同时加入了一个过期时间,默认为十分钟(600 秒)。验证令牌是在 verify_auth_token() 静态方法中实现的。静态方法被使用在这里,是因为一旦令牌被解码了用户才可得知。如果令牌被解码了,相应的用户将会被查询出来并且返回。
生成令牌的代码如下:
class AuthToken(Resource):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@login_required
def get(self):
token = g.user.generate_auth_token(21600) # 默认token有效期为6小时
return jsonify(
{'data': {'token': token.decode('ascii'),
'duration': 21600, 'user_id': int(g.user.id)}
},
res=ErrorCode.OPERATOR_SUCCESS,
msg=ErrorCode.OPERATOR_SUCCESS_MSG) ``` @login_required 为修饰器, 要想获得令牌需要先登录 ``` def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
user = ''
if 'token' in request.headers:
token = request.headers['token']
user = User.verify_auth_token(token)
if not user:
return jsonify({'res': ErrorCode.OPERATOR_FAILED,
'msg':'useless token'})
elif 'username' in request.headers and 'password' in
request.headers:
username = request.headers['username']
password = request.headers['password']
user = User.query.
filter(or_(User.user_name ==username,
User.email == username)).first()
if not user or not user.verify_password(password) or
user.status == 0:
return jsonify({'res': ErrorCode.OPERATOR_FAILED,
'msg': 'wrong password or user is
inactive'})
g.user = user
return func(*args, **kwargs)
return wrapper ``` 用户访问受权限限制的资源时需要在header中加入'token'字段, 服务器这边验证token的代码为: ``` class ResourceTest(Resource):
@token_required
def get(self):
user_info = get_user_info(g.user)
return dict(res=ErrorCode.OPERATOR_SUCCESS,
msg=ErrorCode.OPERATOR_SUCCESS_MSG, data=user_info) ``` 要访问ResourceTest资源需要先验证token ``` from functools import wraps from flask import request, jsonify, g from itsdangerous import TimedJSONWebSignatureSerializer
as Serializer from itsdangerous import BadSignature, SignatureExpired
def token_required(func): “”” 通过token验证用户的合法性 :param func: :param token: 放在header中(下载时为解决前段问题, token放在url中) :return: “”” @wraps(func) def wrapper(args, **kwargs): if ‘is_download’ in request.args and ‘token’ in request.args: token = request.args.get(‘token’) else: if ‘token’ not in request.headers: return jsonify({‘res’: ErrorCode.OPERATOR_FAILED, ‘msg’: ‘token is in need, please login to get token’}) token = request.headers[‘token’] s = Serializer(app.config[‘SECRET_KEY’]) try: data = s.loads(token) except SignatureExpired: return jsonify({‘res’: ErrorCode.OPERATOR_FAILED, ‘msg’: ‘expired token’}) except BadSignature: return jsonify({‘res’: ErrorCode.OPERATOR_FAILED, ‘msg’: ‘useless token’}) g.user = User.query.get(data[‘id’]) return func(args, **kwargs) return wrapper
1
2
## 3. 权限控制修饰器
通过上边的几个函数我们已经可以在用户访问某个资源前先验证他的身份, 接下来就是根据他的身份判断他是否有这个权限,同样, 我们用函数修饰器。
def verify_permissions(permission_list): “”” 用来验证用户是否拥有permisson_list中的权限, 需要在验证token后使用, 用作各个功能的装饰器 :param permission_list: 功能列表, 如[111, 112] :return: “”” def _verify_permission(func): @wraps(func) def wrapper(args, **kwargs): if g.user.status == 0: return jsonify({‘res’: ErrorCode.OPERATOR_FAILED, ‘msg’: ‘user status is inactive’}) user_permission_list = get_user_info(g.user)[‘permission_list’] if set(permission_list).issubset(set(user_permission_list)): return func(args, **kwargs) else: return jsonify({‘res’: ErrorCode.PERMISSION_DENIED, ‘msg’: ErrorCode.PERMISSION_DENIED_MSG}) return wrapper return _verify_permission
1
由于一个用户可以有多个权限,所以我们这里用列表的形式判断用户是否拥有此权限, 比如访问一个函数需要权限1和2, 而用户的权限列表里有1, 2, 则可以访问。
class UserList(Resource):
1
2
3
4
5
@token_required
@verify_permissions(['admin']])
def get(self):
...
return 'success' ```