Files
Arcaea-server/latest version/core/api_user.py
Lost-MSth e548d5639f [Enhance] API for getting PTT record
- Add an API endpoint for getting the user's rating records.
- more detailed INFO messages for  login and register
- Add `finale/finale_end` endpoint #110
2023-05-23 21:25:27 +08:00

153 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from hashlib import sha256
from os import urandom
from time import time
from .config_manager import Config
from .error import NoAccess, NoData, RateLimit, UserBan
from .limiter import ArcLimiter
from .user import UserOnline
class Power:
def __init__(self, c=None):
self.c = c
self.power_id: str = None
self.caption: str = None
@classmethod
def from_dict(cls, d: dict, c=None) -> 'Power':
p = cls(c)
p.power_id = d['power_id']
p.caption = d['caption']
return p
class Role:
def __init__(self, c=None):
self.c = c
self.role_id: str = None
self.caption: str = None
self.powers: list = None
def has_power(self, power_id: str) -> bool:
'''判断role是否有power'''
return any(power_id == i.power_id for i in self.powers)
def only_has_powers(self, power_ids: list, anti_power_ids: list = None) -> bool:
'''判断role是否全有power_ids里的power且没有anti_power_ids里的任何一个power'''
flags = [False] * len(power_ids)
if anti_power_ids is None:
anti_power_ids = []
for i in self.powers:
if i.power_id in anti_power_ids:
return False
for j, k in enumerate(power_ids):
if i.power_id == k:
flags[j] = True
return all(flags)
def select_from_id(self, role_id: int = None) -> 'Role':
'''用role_id查询role'''
if role_id is not None:
self.role_id = role_id
self.c.execute('''select caption from role where role_id = :a''',
{'a': self.role_id})
x = self.c.fetchone()
if x is None:
raise NoData(
f'The role `{self.role_id}` does not exist.', api_error_code=-200)
self.caption = x[0]
return self
def select_powers(self) -> None:
'''查询role的全部powers'''
self.powers = []
self.c.execute('''select * from power where power_id in (select power_id from role_power where role_id=:a)''', {
'a': self.role_id})
x = self.c.fetchall()
for i in x:
self.powers.append(Power.from_dict(
{'power_id': i[0], 'caption': i[1]}, self.c))
class APIUser(UserOnline):
limiter = ArcLimiter(Config.API_LOGIN_RATE_LIMIT, 'api_login')
def __init__(self, c=None, user_id=None) -> None:
super().__init__(c, user_id)
self.api_token: str = None
self.role: 'Role' = None
self.ip: str = None
def set_role_system(self) -> None:
'''设置为最高权限用户API接口'''
self.user_id = 0
self.role = Role(self.c)
self.role.role_id = 'system'
self.role.select_powers()
def select_role(self) -> None:
'''查询user的role'''
self.c.execute('''select role_id from user_role where user_id = :a''',
{'a': self.user_id})
x = self.c.fetchone()
self.role = Role(self.c)
if x is None:
# 默认role为user
self.role.role_id = 'user'
else:
self.role.role_id = x[0]
def select_role_and_powers(self) -> None:
'''查询user的role以及role的powers'''
self.select_role()
self.role.select_powers()
def select_user_id_from_api_token(self, api_token: str = None) -> None:
if api_token is not None:
self.api_token = api_token
self.c.execute('''select user_id from api_login where token = :token''', {
'token': self.api_token})
x = self.c.fetchone()
if x is None:
raise NoAccess('No token', api_error_code=-1)
self.user_id = x[0]
def logout(self) -> None:
self.c.execute(
'''delete from api_login where user_id=?''', (self.user_id,))
def login(self, name: str = None, password: str = None, ip: str = None) -> None:
if name is not None:
self.name = name
if password is not None:
self.password = password
if ip is not None:
self.ip = ip
if not self.limiter.hit(name):
raise RateLimit(
f'Too many login attempts of username {name}', api_error_code=-205)
self.c.execute('''select user_id, password from user where name = :a''', {
'a': self.name})
x = self.c.fetchone()
if x is None:
raise NoData(
f'The user `{self.name}` does not exist.', api_error_code=-201, status=401)
if x[1] == '':
raise UserBan(f'The user `{x[0]}` is banned.')
if self.hash_pwd != x[1]:
raise NoAccess(f'The password of user `{x[0]}` is incorrect.',
api_error_code=-201, status=401)
self.user_id = x[0]
now = int(time() * 1000)
self.api_token = sha256(
(str(self.user_id) + str(now)).encode("utf8") + urandom(8)).hexdigest()
self.logout()
self.c.execute('''insert into api_login values(?,?,?,?)''',
(self.user_id, self.api_token, now, self.ip))