mirror of
https://github.com/Lost-MSth/Arcaea-server.git
synced 2025-12-14 08:06:23 +08:00
- Add an API endpoint for getting the user's rating records. - more detailed INFO messages for login and register - Add `finale/finale_end` endpoint #110
153 lines
5.1 KiB
Python
153 lines
5.1 KiB
Python
from hashlib import sha256
|
||
from os import urandom
|
||
from time import time
|
||
|
||
from .config_manager import Config
|
||
from .error import NoAccess, NoData, RateLimit, UserBan
|
||
from .limiter import ArcLimiter
|
||
from .user import UserOnline
|
||
|
||
|
||
class Power:
|
||
def __init__(self, c=None):
|
||
self.c = c
|
||
self.power_id: str = None
|
||
self.caption: str = None
|
||
|
||
@classmethod
|
||
def from_dict(cls, d: dict, c=None) -> 'Power':
|
||
p = cls(c)
|
||
p.power_id = d['power_id']
|
||
p.caption = d['caption']
|
||
return p
|
||
|
||
|
||
class Role:
|
||
def __init__(self, c=None):
|
||
self.c = c
|
||
self.role_id: str = None
|
||
self.caption: str = None
|
||
|
||
self.powers: list = None
|
||
|
||
def has_power(self, power_id: str) -> bool:
|
||
'''判断role是否有power'''
|
||
return any(power_id == i.power_id for i in self.powers)
|
||
|
||
def only_has_powers(self, power_ids: list, anti_power_ids: list = None) -> bool:
|
||
'''判断role是否全有power_ids里的power,且没有anti_power_ids里的任何一个power'''
|
||
flags = [False] * len(power_ids)
|
||
if anti_power_ids is None:
|
||
anti_power_ids = []
|
||
for i in self.powers:
|
||
if i.power_id in anti_power_ids:
|
||
return False
|
||
for j, k in enumerate(power_ids):
|
||
if i.power_id == k:
|
||
flags[j] = True
|
||
return all(flags)
|
||
|
||
def select_from_id(self, role_id: int = None) -> 'Role':
|
||
'''用role_id查询role'''
|
||
if role_id is not None:
|
||
self.role_id = role_id
|
||
self.c.execute('''select caption from role where role_id = :a''',
|
||
{'a': self.role_id})
|
||
x = self.c.fetchone()
|
||
if x is None:
|
||
raise NoData(
|
||
f'The role `{self.role_id}` does not exist.', api_error_code=-200)
|
||
self.caption = x[0]
|
||
return self
|
||
|
||
def select_powers(self) -> None:
|
||
'''查询role的全部powers'''
|
||
self.powers = []
|
||
self.c.execute('''select * from power where power_id in (select power_id from role_power where role_id=:a)''', {
|
||
'a': self.role_id})
|
||
x = self.c.fetchall()
|
||
for i in x:
|
||
self.powers.append(Power.from_dict(
|
||
{'power_id': i[0], 'caption': i[1]}, self.c))
|
||
|
||
|
||
class APIUser(UserOnline):
|
||
limiter = ArcLimiter(Config.API_LOGIN_RATE_LIMIT, 'api_login')
|
||
|
||
def __init__(self, c=None, user_id=None) -> None:
|
||
super().__init__(c, user_id)
|
||
self.api_token: str = None
|
||
self.role: 'Role' = None
|
||
|
||
self.ip: str = None
|
||
|
||
def set_role_system(self) -> None:
|
||
'''设置为最高权限用户,API接口'''
|
||
self.user_id = 0
|
||
self.role = Role(self.c)
|
||
self.role.role_id = 'system'
|
||
self.role.select_powers()
|
||
|
||
def select_role(self) -> None:
|
||
'''查询user的role'''
|
||
self.c.execute('''select role_id from user_role where user_id = :a''',
|
||
{'a': self.user_id})
|
||
x = self.c.fetchone()
|
||
self.role = Role(self.c)
|
||
if x is None:
|
||
# 默认role为user
|
||
self.role.role_id = 'user'
|
||
else:
|
||
self.role.role_id = x[0]
|
||
|
||
def select_role_and_powers(self) -> None:
|
||
'''查询user的role,以及role的powers'''
|
||
self.select_role()
|
||
self.role.select_powers()
|
||
|
||
def select_user_id_from_api_token(self, api_token: str = None) -> None:
|
||
if api_token is not None:
|
||
self.api_token = api_token
|
||
self.c.execute('''select user_id from api_login where token = :token''', {
|
||
'token': self.api_token})
|
||
x = self.c.fetchone()
|
||
if x is None:
|
||
raise NoAccess('No token', api_error_code=-1)
|
||
self.user_id = x[0]
|
||
|
||
def logout(self) -> None:
|
||
self.c.execute(
|
||
'''delete from api_login where user_id=?''', (self.user_id,))
|
||
|
||
def login(self, name: str = None, password: str = None, ip: str = None) -> None:
|
||
if name is not None:
|
||
self.name = name
|
||
if password is not None:
|
||
self.password = password
|
||
if ip is not None:
|
||
self.ip = ip
|
||
if not self.limiter.hit(name):
|
||
raise RateLimit(
|
||
f'Too many login attempts of username {name}', api_error_code=-205)
|
||
|
||
self.c.execute('''select user_id, password from user where name = :a''', {
|
||
'a': self.name})
|
||
x = self.c.fetchone()
|
||
if x is None:
|
||
raise NoData(
|
||
f'The user `{self.name}` does not exist.', api_error_code=-201, status=401)
|
||
if x[1] == '':
|
||
raise UserBan(f'The user `{x[0]}` is banned.')
|
||
if self.hash_pwd != x[1]:
|
||
raise NoAccess(f'The password of user `{x[0]}` is incorrect.',
|
||
api_error_code=-201, status=401)
|
||
|
||
self.user_id = x[0]
|
||
now = int(time() * 1000)
|
||
self.api_token = sha256(
|
||
(str(self.user_id) + str(now)).encode("utf8") + urandom(8)).hexdigest()
|
||
|
||
self.logout()
|
||
self.c.execute('''insert into api_login values(?,?,?,?)''',
|
||
(self.user_id, self.api_token, now, self.ip))
|