[Enhance] Add limiter & Building API

- Add a custom limiter and use it for limiting users' download instead of using database
> So add a requirement `limits`.

- Fix a character's value
- Change the email max length to 64
- Change something about API's roles and powers
- Add an API endpoint for getting users' roles and powers
This commit is contained in:
Lost-MSth
2022-10-12 15:27:45 +08:00
parent a04df8bba6
commit 68a83a29d2
13 changed files with 140 additions and 104 deletions

View File

@@ -9,50 +9,40 @@ from .user import UserOnline
class Power:
def __init__(self, c=None):
self.c = c
self.power_id: int = None
self.power_name: str = None
self.power_id: str = None
self.caption: str = None
@classmethod
def from_dict(cls, d: dict, c=None) -> 'Power':
p = cls(c)
p.power_id = d['power_id']
p.power_name = d['power_name']
p.caption = d['caption']
return p
def select_from_name(self, power_name: str) -> 'Power':
pass
class Role:
def __init__(self, c=None):
self.c = c
self.role_id: int = None
self.role_name: str = None
self.role_id: str = None
self.caption: str = None
self.powers: list = None
def has_power(self, power_name: str) -> bool:
def has_power(self, power_id: str) -> bool:
'''判断role是否有power'''
for i in self.powers:
if i.power_name == power_name:
return True
return False
return any(power_id == i.power_id for i in self.powers)
def select_from_id(self, role_id: int = None) -> 'Role':
'''用role_id查询role'''
if role_id is not None:
self.role_id = role_id
self.c.execute('''select role_name, caption from role where role_id = :a''',
self.c.execute('''select caption from role where role_id = :a''',
{'a': self.role_id})
x = self.c.fetchone()
if x is None:
raise NoData('The role `%s` does not exist.' %
self.role_id, api_error_code=-200)
self.role_name = x[0]
self.caption = x[1]
self.caption = x[0]
return self
def select_powers(self) -> None:
@@ -63,7 +53,7 @@ class Role:
x = self.c.fetchall()
for i in x:
self.powers.append(Power.from_dict(
{'power_id': i[0], 'power_name': i[1], 'caption': i[2]}, self.c))
{'power_id': i[0], 'caption': i[1]}, self.c))
class APIUser(UserOnline):
@@ -74,6 +64,13 @@ class APIUser(UserOnline):
self.ip: str = None
def set_role_system(self) -> None:
'''设置为最高权限用户API接口'''
self.user_id = 0
self.role = Role(self.c)
self.role.role_id = 'system'
self.role.select_powers()
def select_role(self) -> None:
'''查询user的role'''
self.c.execute('''select role_id from user_role where user_id = :a''',
@@ -82,10 +79,9 @@ class APIUser(UserOnline):
self.role = Role(self.c)
if x is None:
# 默认role为user
self.role.role_id = 1
self.role.role_id = 'user'
else:
self.role.role_id = int(x[0])
self.role.select_from_id()
self.role.role_id = x[0]
def select_role_and_powers(self) -> None:
'''查询user的role以及role的powers'''

View File

@@ -7,6 +7,7 @@ from flask import url_for
from .constant import Constant
from .error import NoAccess
from .limiter import ArcLimiter
from .user import User
from .util import get_file_md5, md5
@@ -50,6 +51,9 @@ class UserDownload:
properties: `user` - `User`类或子类的实例
'''
limiter = ArcLimiter(
str(Constant.DOWNLOAD_TIMES_LIMIT) + '/day', 'download')
def __init__(self, c=None, user=None) -> None:
self.c = c
self.user = user
@@ -60,19 +64,13 @@ class UserDownload:
self.token: str = None
self.token_time: int = None
def clear_user_download(self) -> None:
self.c.execute(
'''delete from user_download where user_id = :a and time <= :b''', {'a': self.user.user_id, 'b': int(time()) - 24*3600})
@property
def is_limited(self) -> bool:
'''是否达到用户最大下载量'''
if self.user is None:
self.select_for_check()
self.c.execute(
'''select count(*) from user_download where user_id = :a''', {'a': self.user.user_id})
y = self.c.fetchone()
return y is not None and y[0] > Constant.DOWNLOAD_TIMES_LIMIT
return not self.limiter.test(str(self.user.user_id))
@property
def is_valid(self) -> bool:
@@ -81,10 +79,9 @@ class UserDownload:
self.select_for_check()
return int(time()) - self.token_time <= Constant.DOWNLOAD_TIME_GAP_LIMIT
def insert_user_download(self) -> None:
'''记录下载信息'''
self.c.execute('''insert into user_download values(:a,:b,:c)''', {
'a': self.user.user_id, 'c': self.token, 'b': int(time())})
def download_hit(self) -> bool:
'''下载次数+1返回成功与否bool值'''
return self.limiter.hit(str(self.user.user_id))
def select_for_check(self) -> None:
'''利用token、song_id、file_name查询其它信息'''
@@ -93,7 +90,8 @@ class UserDownload:
x = self.c.fetchone()
if not x:
raise NoAccess('The token `%s` is not valid.' % self.token, status=403)
raise NoAccess('The token `%s` is not valid.' %
self.token, status=403)
self.user = User()
self.user.user_id = x[0]
self.token_time = x[1]

View File

@@ -0,0 +1,28 @@
from limits import parse, strategies
from limits.storage import storage_from_string
class ArcLimiter:
storage = storage_from_string("memory://")
strategy = strategies.FixedWindowRateLimiter(storage)
def __init__(self, limit: str = None, namespace: str = None):
self._limit = None
self.limit = limit
self.namespace = namespace
@property
def limit(self):
return self._limit
@limit.setter
def limit(self, value):
if value is None:
return
self._limit = parse(value)
def hit(self, key: str, cost: int = 1) -> bool:
return self.strategy.hit(self.limit, self.namespace, key, cost=cost)
def test(self, key: str) -> bool:
return self.strategy.test(self.limit, self.namespace, key)

View File

@@ -76,7 +76,7 @@ class UserRegister(User):
def set_email(self, email: str):
# 邮箱格式懒得多判断
if 4 <= len(email) <= 32 and '@' in email and '.' in email:
if 4 <= len(email) <= 64 and '@' in email and '.' in email:
self.c.execute(
'''select exists(select * from user where email = :email)''', {'email': email})
if self.c.fetchone() == (0,):