mirror of
https://github.com/Lost-MSth/Arcaea-server.git
synced 2026-02-04 13:17:27 +08:00
- For Arcaea 6.8.2 - Add support for "skill_vita_arc". - Now `/users/<user_id> PUT` API endpoint can change the user's custom banner. #223 - Fix a bug that the playing result of beyond world map shows zero value at partner multiply term.
234 lines
8.6 KiB
Python
234 lines
8.6 KiB
Python
from flask import Blueprint, request
|
||
|
||
from core.api_user import APIUser
|
||
from core.config_manager import Config
|
||
from core.error import InputError, NoAccess, NoData
|
||
from core.score import Potential, UserScoreList
|
||
from core.sql import Connect, Query, Sql
|
||
from core.user import UserChanger, UserInfo, UserRegister
|
||
from core.util import get_today_timestamp
|
||
|
||
from .api_auth import api_try, request_json_handle, role_required
|
||
from .api_code import error_return, success_return
|
||
from .constant import Constant
|
||
|
||
bp = Blueprint('users', __name__, url_prefix='/users')
|
||
|
||
|
||
@bp.route('', methods=['POST'])
|
||
@role_required(request, ['change'])
|
||
@request_json_handle(request, ['name', 'password', 'email'])
|
||
@api_try
|
||
def users_post(data, user):
|
||
'''注册一个用户'''
|
||
with Connect() as c:
|
||
new_user = UserRegister(c)
|
||
new_user.set_name(data['name'])
|
||
new_user.set_password(data['password'])
|
||
new_user.set_email(data['email'])
|
||
new_user.register()
|
||
return success_return({'user_id': new_user.user_id, 'user_code': new_user.user_code})
|
||
|
||
|
||
@bp.route('', methods=['GET'])
|
||
@role_required(request, ['select'])
|
||
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
|
||
@api_try
|
||
def users_get(data, user):
|
||
'''查询全用户信息'''
|
||
A = ['user_id', 'name', 'user_code']
|
||
B = ['user_id', 'name', 'user_code', 'join_date',
|
||
'rating_ptt', 'time_played', 'ticket', 'world_rank_score']
|
||
with Connect() as c:
|
||
query = Query(A, A, B).from_dict(data)
|
||
x = Sql(c).select('user', query=query)
|
||
r = []
|
||
for i in x:
|
||
r.append(UserInfo(c).from_list(i))
|
||
|
||
if not r:
|
||
raise NoData(api_error_code=-2)
|
||
|
||
return success_return([{
|
||
'user_id': x.user_id,
|
||
'name': x.name,
|
||
'join_date': x.join_date,
|
||
'user_code': x.user_code,
|
||
'rating_ptt': x.rating_ptt,
|
||
'character_id': x.character.character_id,
|
||
'is_char_uncapped': x.character.is_uncapped,
|
||
'is_char_uncapped_override': x.character.is_uncapped_override,
|
||
'is_hide_rating': x.is_hide_rating,
|
||
'ticket': x.ticket
|
||
} for x in r])
|
||
|
||
|
||
@bp.route('/<int:user_id>', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@api_try
|
||
def users_user_get(user, user_id):
|
||
'''查询用户信息'''
|
||
if user_id <= 0:
|
||
return error_return(InputError(api_error_code=-110))
|
||
# 查别人需要select权限
|
||
if user_id != user.user_id and not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
with Connect() as c:
|
||
u = UserInfo(c, user_id)
|
||
return success_return(u.to_dict())
|
||
|
||
|
||
@bp.route('/<int:user_id>', methods=['PUT'])
|
||
@role_required(request, ['change'])
|
||
@request_json_handle(request, optional_keys=['name', 'password', 'user_code', 'ticket', 'email', 'custom_banner'], must_change=True)
|
||
@api_try
|
||
def users_user_put(data, user, user_id):
|
||
'''修改一个用户'''
|
||
with Connect() as c:
|
||
u = UserChanger(c, user_id)
|
||
r = {}
|
||
r['user_id'] = user_id
|
||
if 'name' in data:
|
||
u.set_name(data['name'])
|
||
r['name'] = u.name
|
||
if 'password' in data:
|
||
if data['password'] == '':
|
||
u.password = ''
|
||
r['password'] = ''
|
||
else:
|
||
u.set_password(data['password'])
|
||
r['password'] = u.hash_pwd
|
||
if 'email' in data:
|
||
u.set_email(data['email'])
|
||
r['email'] = u.email
|
||
if 'user_code' in data:
|
||
u.set_user_code(data['user_code'])
|
||
r['user_code'] = u.user_code
|
||
if 'ticket' in data:
|
||
if not isinstance(data['ticket'], int):
|
||
raise InputError('Ticket must be int')
|
||
u.ticket = data['ticket']
|
||
r['ticket'] = u.ticket
|
||
if 'custom_banner' in data:
|
||
if not isinstance(data['custom_banner'], str):
|
||
raise InputError('Value `custom_banner` must be str')
|
||
u.custom_banner = data['custom_banner']
|
||
r['custom_banner'] = u.custom_banner
|
||
u.update_columns(d=r)
|
||
return success_return(r)
|
||
|
||
|
||
@bp.route('/<int:user_id>/b30', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@api_try
|
||
def users_user_b30_get(user, user_id):
|
||
'''查询用户b30'''
|
||
if user_id <= 0:
|
||
return error_return(InputError(api_error_code=-110))
|
||
# 查别人需要select权限
|
||
if user_id != user.user_id and not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
with Connect() as c:
|
||
x = UserScoreList(c, UserInfo(c, user_id))
|
||
x.query.limit = 30
|
||
x.select_from_user()
|
||
if not x.scores:
|
||
raise NoData(
|
||
f'No best30 data of user `{user_id}`', api_error_code=-3)
|
||
x.select_song_name()
|
||
r = x.to_dict_list()
|
||
rating_sum = sum(i.rating for i in x.scores)
|
||
return success_return({'user_id': user_id, 'b30_ptt': rating_sum / 30, 'data': r})
|
||
|
||
|
||
@bp.route('/<int:user_id>/best', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
|
||
@api_try
|
||
def users_user_best_get(data, user, user_id):
|
||
'''查询用户所有best成绩'''
|
||
if user_id <= 0:
|
||
return error_return(InputError(api_error_code=-110))
|
||
# 查别人需要select权限
|
||
if user_id != user.user_id and not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
with Connect() as c:
|
||
x = UserScoreList(c, UserInfo(c, user_id))
|
||
x.query.from_dict(data)
|
||
x.select_from_user()
|
||
if not x.scores:
|
||
raise NoData(
|
||
f'No best score data of user `{user_id}`', api_error_code=-3)
|
||
r = x.to_dict_list()
|
||
return success_return({'user_id': user_id, 'data': r})
|
||
|
||
|
||
@bp.route('/<int:user_id>/r30', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@api_try
|
||
def users_user_r30_get(user, user_id):
|
||
'''查询用户r30'''
|
||
|
||
if user_id <= 0:
|
||
return error_return(InputError(api_error_code=-110))
|
||
# 查别人需要select权限
|
||
if user_id != user.user_id and not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
with Connect() as c:
|
||
p = Potential(c, UserInfo(c, user_id))
|
||
return success_return({'user_id': user_id, 'r10_ptt': p.recent_10 / 10, 'data': p.recent_30_to_dict_list()})
|
||
|
||
|
||
@bp.route('/<int:user_id>/role', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@api_try
|
||
def users_user_role_get(user, user_id):
|
||
'''查询用户role和powers'''
|
||
|
||
if user_id <= 0:
|
||
return error_return(InputError(api_error_code=-110))
|
||
|
||
if user_id == user.user_id:
|
||
return success_return({'user_id': user.user_id, 'role': user.role.role_id, 'powers': [i.power_id for i in user.role.powers]})
|
||
# 查别人需要select权限
|
||
if not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
with Connect() as c:
|
||
x = APIUser(c, user_id)
|
||
x.select_role_and_powers()
|
||
return success_return({'user_id': x.user_id, 'role': x.role.role_id, 'powers': [i.power_id for i in x.role.powers]})
|
||
|
||
|
||
@bp.route('/<int:user_id>/rating', methods=['GET'])
|
||
@role_required(request, ['select', 'select_me'])
|
||
@request_json_handle(request, optional_keys=['start_timestamp', 'end_timestamp', 'duration'])
|
||
@api_try
|
||
def users_user_rating_get(data, user, user_id):
|
||
'''查询用户历史rating,`duration`是相对于今天的天数'''
|
||
# 查别人需要select权限
|
||
if user_id != user.user_id and not user.role.has_power('select'):
|
||
return error_return(NoAccess('No permission', api_error_code=-1), 403)
|
||
|
||
start_timestamp = data.get('start_timestamp', None)
|
||
end_timestamp = data.get('end_timestamp', None)
|
||
duration = data.get('duration', None)
|
||
sql = '''select time, rating_ptt from user_rating where user_id = ?'''
|
||
sql_data = [user_id]
|
||
if start_timestamp is not None and end_timestamp is not None:
|
||
sql += ''' and time between ? and ?'''
|
||
sql_data += [start_timestamp, end_timestamp]
|
||
elif duration is not None:
|
||
sql += ''' and time between ? and ?'''
|
||
t = get_today_timestamp()
|
||
sql_data += [t - duration * 24 * 3600, t]
|
||
|
||
with Connect(Config.SQLITE_LOG_DATABASE_PATH) as c:
|
||
c.execute(sql, sql_data)
|
||
r = c.fetchall()
|
||
return success_return({'user_id': user_id, 'data': [{'time': i[0], 'rating_ptt': i[1]} for i in r]})
|