Fix a bug and add a new thing

- Add support for logging Arcaea's errors
- Fix a bug when world maps' data don't have some unnecessary parts the client of iOS may break down
This commit is contained in:
Lost-MSth
2022-07-16 19:50:07 +08:00
parent 6801af197f
commit 47f05cdf1e
23 changed files with 549 additions and 613 deletions

View File

@@ -1,5 +1,5 @@
import base64
import functools
from functools import wraps
from core.error import ArcError, NoAccess
from core.sql import Connect
@@ -7,44 +7,40 @@ from core.user import UserAuth, UserLogin
from flask import Blueprint, jsonify, request
from setting import Config
from .func import error_return
from .func import arc_try, error_return
bp = Blueprint('auth', __name__, url_prefix='/auth')
@bp.route('/login', methods=['POST']) # 登录接口
@arc_try
def login():
if 'AppVersion' in request.headers: # 版本检查
if Config.ALLOW_APPVERSION:
if request.headers['AppVersion'] not in Config.ALLOW_APPVERSION:
return error_return(NoAccess('Wrong app version.', 1203))
raise NoAccess('Wrong app version.', 1203)
headers = request.headers
request.form['grant_type']
with Connect() as c:
try:
id_pwd = headers['Authorization']
id_pwd = base64.b64decode(id_pwd[6:]).decode()
name, password = id_pwd.split(':', 1)
if 'DeviceId' in headers:
device_id = headers['DeviceId']
else:
device_id = 'low_version'
id_pwd = headers['Authorization']
id_pwd = base64.b64decode(id_pwd[6:]).decode()
name, password = id_pwd.split(':', 1)
if 'DeviceId' in headers:
device_id = headers['DeviceId']
else:
device_id = 'low_version'
user = UserLogin(c)
user.login(name, password, device_id, request.remote_addr)
user = UserLogin(c)
user.login(name, password, device_id, request.remote_addr)
return jsonify({"success": True, "token_type": "Bearer", 'user_id': user.user_id, 'access_token': user.token})
except ArcError as e:
return error_return(e)
return error_return()
return jsonify({"success": True, "token_type": "Bearer", 'user_id': user.user_id, 'access_token': user.token})
def auth_required(request):
# arcaea登录验证写成了修饰器
def decorator(view):
@functools.wraps(view)
@wraps(view)
def wrapped_view(*args, **kwargs):
headers = request.headers

View File

@@ -7,27 +7,24 @@ from core.user import UserOnline
from flask import Blueprint, request
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('course', __name__, url_prefix='/course')
@bp.route('/me', methods=['GET'])
@auth_required(request)
@arc_try
def course_me(user_id):
with Connect() as c:
try:
user = UserOnline(c, user_id)
core = ItemCore(c)
core.item_id = 'core_course_skip_purchase'
core.select(user)
x = UserCourseList(c, user)
x.select_all()
return success_return({
'courses': x.to_dict_list(),
"stamina_cost": Constant.COURSE_STAMINA_COST,
"course_skip_purchase_ticket": core.amount
})
except ArcError as e:
return error_return(e)
return error_return()
user = UserOnline(c, user_id)
core = ItemCore(c)
core.item_id = 'core_course_skip_purchase'
core.select(user)
x = UserCourseList(c, user)
x.select_all()
return success_return({
'courses': x.to_dict_list(),
"stamina_cost": Constant.COURSE_STAMINA_COST,
"course_skip_purchase_ticket": core.amount
})

View File

@@ -1,49 +1,43 @@
from flask import Blueprint, request
from core.sql import Connect
from core.error import ArcError
from core.user import UserOnline, code_get_id
from .func import error_return, success_return
from flask import Blueprint, request
from .auth import auth_required
from .func import arc_try, success_return
bp = Blueprint('friend', __name__, url_prefix='/friend')
@bp.route('/me/add', methods=['POST']) # 加好友
@auth_required(request)
@arc_try
def add_friend(user_id):
with Connect() as c:
try:
friend_code = request.form['friend_code']
friend_id = code_get_id(c, friend_code)
user = UserOnline(c, user_id)
user.add_friend(friend_id)
friend_code = request.form['friend_code']
friend_id = code_get_id(c, friend_code)
user = UserOnline(c, user_id)
user.add_friend(friend_id)
return success_return({
"user_id": user.user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": user.friends
})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({
"user_id": user.user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": user.friends
})
@bp.route('/me/delete', methods=['POST']) # 删好友
@auth_required(request)
@arc_try
def delete_friend(user_id):
with Connect() as c:
try:
friend_id = int(request.form['friend_id'])
user = UserOnline(c, user_id)
user.delete_friend(friend_id)
friend_id = int(request.form['friend_id'])
user = UserOnline(c, user_id)
user.delete_friend(friend_id)
return success_return({
"user_id": user.user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": user.friends
})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({
"user_id": user.user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": user.friends
})

View File

@@ -1,5 +1,9 @@
from flask import jsonify
from functools import wraps
from traceback import format_exc
from core.error import ArcError
from flask import current_app, jsonify
from setting import Config
default_error = ArcError('Unknown Error')
@@ -56,7 +60,7 @@ def error_return(e: ArcError = default_error): # 错误返回
if e.extra_data:
r['extra'] = e.extra_data
return jsonify(r)
return jsonify(r), e.status
def success_return(value=None):
@@ -64,3 +68,21 @@ def success_return(value=None):
if value is not None:
r['value'] = value
return jsonify(r)
def arc_try(view):
'''替代try/except记录`ArcError`为warning'''
@wraps(view)
def wrapped_view(*args, **kwargs):
try:
data = view(*args, **kwargs)
if data is None:
return error_return()
else:
return data
except ArcError as e:
if Config.ALLOW_WARNING_LOG:
current_app.logger.warning(format_exc())
return error_return(e)
return wrapped_view

View File

@@ -7,7 +7,7 @@ from flask import Blueprint, request
from setting import Config
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('multiplayer', __name__, url_prefix='/multiplayer')
@@ -16,66 +16,57 @@ conn1, conn2 = Pipe()
@bp.route('/me/room/create', methods=['POST']) # 创建房间
@auth_required(request)
@arc_try
def room_create(user_id):
if not Config.UDP_PORT or Config.UDP_PORT == '':
return error_return(ArcError('The local udp server is down.', 151)), 404
raise ArcError('The local udp server is down.', 151, status=404)
with Connect() as c:
try:
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.get_song_unlock(request.json['clientSongMap'])
x.create_room(user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)
except ArcError as e:
return error_return(e), 400
return error_return()
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.get_song_unlock(request.json['clientSongMap'])
x.create_room(user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)
@bp.route('/me/room/join/<room_code>', methods=['POST']) # 加入房间
@auth_required(request)
@arc_try
def room_join(user_id, room_code):
if not Config.UDP_PORT or Config.UDP_PORT == '':
return error_return(ArcError('The local udp server is down.', 151)), 404
raise ArcError('The local udp server is down.', 151, status=404)
with Connect() as c:
try:
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.get_song_unlock(request.json['clientSongMap'])
room = Room()
room.room_code = room_code
x.join_room(room, user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)
except ArcError as e:
return error_return(e), 400
return error_return()
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.get_song_unlock(request.json['clientSongMap'])
room = Room()
room.room_code = room_code
x.join_room(room, user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)
@bp.route('/me/update', methods=['POST']) # 更新房间
@auth_required(request)
@arc_try
def multiplayer_update(user_id):
if not Config.UDP_PORT or Config.UDP_PORT == '':
return error_return(ArcError('The local udp server is down.', 151)), 404
raise ArcError('The local udp server is down.', 151, status=404)
with Connect() as c:
try:
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.token = int(request.json['token'])
x.update_room(user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)
except ArcError as e:
return error_return(e), 400
return error_return()
x = LocalMultiPlayer(conn1)
user = Player(c, user_id)
user.token = int(request.json['token'])
x.update_room(user)
r = x.to_dict()
r['endPoint'] = request.host.split(
':')[0] if Config.LINK_PLAY_HOST == '' else Config.LINK_PLAY_HOST
r['port'] = int(Config.UDP_PORT)
return success_return(r)

View File

@@ -10,7 +10,7 @@ from flask import Blueprint, jsonify, request
from werkzeug.datastructures import ImmutableMultiDict
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, error_return, success_return
from .present import present_info
from .purchase import bundle_pack, bundle_bundle
from .score import song_score_friend
@@ -27,21 +27,18 @@ def game_info():
@bp.route('/serve/download/me/song', methods=['GET']) # 歌曲下载
@auth_required(request)
@arc_try
def download_song(user_id):
with Connect() as c:
try:
x = DownloadList(c, UserOnline(c, user_id))
x.song_ids = request.args.getlist('sid')
x.url_flag = json.loads(request.args.get('url', 'true'))
x.clear_user_download()
if x.is_limited and x.url_flag:
raise ArcError('You have reached the download limit.', 903)
x = DownloadList(c, UserOnline(c, user_id))
x.song_ids = request.args.getlist('sid')
x.url_flag = json.loads(request.args.get('url', 'true'))
x.clear_user_download()
if x.is_limited and x.url_flag:
raise ArcError('You have reached the download limit.', 903)
x.add_songs()
return success_return(x.urls)
except ArcError as e:
return error_return(e)
return error_return()
x.add_songs()
return success_return(x.urls)
@bp.route('/finale/progress', methods=['GET'])

View File

@@ -5,34 +5,28 @@ from core.user import UserOnline
from flask import Blueprint, request
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, error_return, success_return
bp = Blueprint('present', __name__, url_prefix='/present')
@bp.route('/me', methods=['GET']) # 用户奖励信息
@auth_required(request)
@arc_try
def present_info(user_id):
with Connect() as c:
try:
x = UserPresentList(c, UserOnline(c, user_id))
x.select_user_presents()
x = UserPresentList(c, UserOnline(c, user_id))
x.select_user_presents()
return success_return(x.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
return success_return(x.to_dict_list())
@bp.route('/me/claim/<present_id>', methods=['POST']) # 礼物确认
@auth_required(request)
@arc_try
def claim_present(user_id, present_id):
with Connect() as c:
try:
x = UserPresent(c, UserOnline(c, user_id))
x.claim_user_present(present_id)
x = UserPresent(c, UserOnline(c, user_id))
x.claim_user_present(present_id)
return success_return()
except ArcError as e:
return error_return(e)
return error_return()
return success_return()

View File

@@ -1,6 +1,6 @@
from time import time
from core.error import ArcError, ItemUnavailable
from core.error import InputError, ItemUnavailable, PostError
from core.item import ItemFactory, Stamina6
from core.purchase import Purchase, PurchaseList
from core.redeem import UserRedeem
@@ -9,35 +9,29 @@ from core.user import UserOnline
from flask import Blueprint, request
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('purchase', __name__, url_prefix='/purchase')
@bp.route('/bundle/pack', methods=['GET']) # 曲包信息
@auth_required(request)
@arc_try
def bundle_pack(user_id):
with Connect() as c:
try:
x = PurchaseList(c, UserOnline(c, user_id)
).select_from_type('pack')
return success_return(x.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
x = PurchaseList(c, UserOnline(c, user_id)
).select_from_type('pack')
return success_return(x.to_dict_list())
@bp.route('/bundle/single', methods=['GET']) # 单曲购买信息获取
@auth_required(request)
@arc_try
def get_single(user_id):
with Connect() as c:
try:
x = PurchaseList(c, UserOnline(c, user_id)
).select_from_type('single')
return success_return(x.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
x = PurchaseList(c, UserOnline(c, user_id)
).select_from_type('single')
return success_return(x.to_dict_list())
@bp.route('/bundle/bundle', methods=['GET']) # 捆绑包
@@ -72,99 +66,87 @@ def bundle_bundle():
@bp.route('/me/pack', methods=['POST']) # 曲包和单曲购买
@auth_required(request)
@arc_try
def buy_pack_or_single(user_id):
with Connect() as c:
try:
if 'pack_id' in request.form:
purchase_name = request.form['pack_id']
elif 'single_id' in request.form:
purchase_name = request.form['single_id']
else:
return success_return()
if 'pack_id' in request.form:
purchase_name = request.form['pack_id']
elif 'single_id' in request.form:
purchase_name = request.form['single_id']
else:
return success_return()
x = Purchase(c, UserOnline(c, user_id)).select(purchase_name)
x.buy()
x = Purchase(c, UserOnline(c, user_id)).select(purchase_name)
x.buy()
return success_return({
'user_id': x.user.user_id,
'ticket': x.user.ticket,
'packs': x.user.packs,
'singles': x.user.singles,
'characters': x.user.characters_list
})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({
'user_id': x.user.user_id,
'ticket': x.user.ticket,
'packs': x.user.packs,
'singles': x.user.singles,
'characters': x.user.characters_list
})
@bp.route('/me/item', methods=['POST']) # 特殊购买world模式boost和stamina
@auth_required(request)
@arc_try
def buy_special(user_id):
with Connect() as c:
try:
if 'item_id' not in request.form:
return error_return()
item_id = request.form['item_id']
if 'item_id' not in request.form:
raise PostError('`item_id` is required')
item_id = request.form['item_id']
x = Purchase(c, UserOnline(c, user_id))
x.purchase_name = item_id
x.price = 50
x.orig_price = 50
x.discount_from = -1
x.discount_to = -1
x.items = [ItemFactory(c).get_item(item_id)]
x.buy()
x = Purchase(c, UserOnline(c, user_id))
x.purchase_name = item_id
x.price = 50
x.orig_price = 50
x.discount_from = -1
x.discount_to = -1
x.items = [ItemFactory(c).get_item(item_id)]
x.buy()
r = {'user_id': x.user.user_id, 'ticket': x.user.ticket}
if item_id == 'stamina6':
r['stamina'] = x.user.stamina.stamina
r['max_stamina_ts'] = x.user.stamina.max_stamina_ts
r['world_mode_locked_end_ts'] = -1
return success_return(r)
except ArcError as e:
return error_return(e)
return error_return()
r = {'user_id': x.user.user_id, 'ticket': x.user.ticket}
if item_id == 'stamina6':
r['stamina'] = x.user.stamina.stamina
r['max_stamina_ts'] = x.user.stamina.max_stamina_ts
r['world_mode_locked_end_ts'] = -1
return success_return(r)
@bp.route('/me/stamina/<buy_stamina_type>', methods=['POST']) # 购买体力
@auth_required(request)
@arc_try
def purchase_stamina(user_id, buy_stamina_type):
with Connect() as c:
try:
if buy_stamina_type != 'fragment':
return error_return()
if buy_stamina_type != 'fragment':
raise InputError('Invalid type of buying stamina')
user = UserOnline(c, user_id)
user.select_user_one_column('next_fragstam_ts', -1)
now = int(time()*1000)
if user.next_fragstam_ts > now:
return ItemUnavailable('Buying stamina by fragment is not available yet.', 905)
user = UserOnline(c, user_id)
user.select_user_one_column('next_fragstam_ts', -1)
now = int(time()*1000)
if user.next_fragstam_ts > now:
return ItemUnavailable('Buying stamina by fragment is not available yet.', 905)
user.update_user_one_column(
'next_fragstam_ts', now + 24 * 3600 * 1000)
s = Stamina6(c)
s.user_claim_item(user)
return success_return({
"user_id": user.user_id,
"stamina": user.stamina.stamina,
"max_stamina_ts": user.stamina.max_stamina_ts,
"next_fragstam_ts": user.next_fragstam_ts,
'world_mode_locked_end_ts': -1
})
except ArcError as e:
return error_return(e)
return error_return()
user.update_user_one_column(
'next_fragstam_ts', now + 24 * 3600 * 1000)
s = Stamina6(c)
s.user_claim_item(user)
return success_return({
"user_id": user.user_id,
"stamina": user.stamina.stamina,
"max_stamina_ts": user.stamina.max_stamina_ts,
"next_fragstam_ts": user.next_fragstam_ts,
'world_mode_locked_end_ts': -1
})
@bp.route('/me/redeem', methods=['POST']) # 兑换码
@auth_required(request)
@arc_try
def redeem(user_id):
with Connect() as c:
try:
x = UserRedeem(c, UserOnline(c, user_id))
x.claim_user_redeem(request.form['code'])
x = UserRedeem(c, UserOnline(c, user_id))
x.claim_user_redeem(request.form['code'])
return success_return({"coupon": "fragment" + str(x.fragment) if x.fragment > 0 else ""})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({"coupon": "fragment" + str(x.fragment) if x.fragment > 0 else ""})

View File

@@ -1,7 +1,7 @@
from time import time
from core.course import CoursePlay
from core.error import ArcError, InputError
from core.error import InputError
from core.rank import RankList
from core.score import UserPlay
from core.sql import Connect
@@ -9,7 +9,7 @@ from core.user import UserOnline
from flask import Blueprint, request
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('score', __name__, url_prefix='/score')
@@ -21,6 +21,7 @@ def score_token():
@bp.route('/token/world', methods=['GET']) # 世界模式成绩上传所需的token
@auth_required(request)
@arc_try
def score_token_world(user_id):
stamina_multiply = int(
@@ -30,127 +31,108 @@ def score_token_world(user_id):
prog_boost_multiply = int(
request.args['prog_boost_multiply']) if 'prog_boost_multiply' in request.args else 0
with Connect() as c:
try:
x = UserPlay(c, UserOnline(c, user_id))
x.song.set_chart(request.args['song_id'], int(
request.args['difficulty']))
x.set_play_state_for_world(stamina_multiply,
fragment_multiply, prog_boost_multiply)
return success_return({
"stamina": x.user.stamina.stamina,
"max_stamina_ts": x.user.stamina.max_stamina_ts,
"token": x.song_token
}
)
except ArcError as e:
return error_return(e)
return error_return()
x = UserPlay(c, UserOnline(c, user_id))
x.song.set_chart(request.args['song_id'], int(
request.args['difficulty']))
x.set_play_state_for_world(stamina_multiply,
fragment_multiply, prog_boost_multiply)
return success_return({
"stamina": x.user.stamina.stamina,
"max_stamina_ts": x.user.stamina.max_stamina_ts,
"token": x.song_token
}
)
@bp.route('/token/course', methods=['GET']) # 课题模式成绩上传所需的token
@auth_required(request)
@arc_try
def score_token_course(user_id):
with Connect() as c:
try:
use_course_skip_purchase = request.args.get(
'use_course_skip_purchase', 'false') == 'true'
use_course_skip_purchase = request.args.get(
'use_course_skip_purchase', 'false') == 'true'
user = UserOnline(c, user_id)
user_play = UserPlay(c, user)
user_play.song_token = request.args.get('previous_token', None)
user_play.get_play_state()
user = UserOnline(c, user_id)
user_play = UserPlay(c, user)
user_play.song_token = request.args.get('previous_token', None)
user_play.get_play_state()
status = 'created'
if user_play.course_play_state == -1:
# 没有token课题模式刚开始
course_play = CoursePlay(c, user, user_play)
course_play.course_id = request.args['course_id']
user_play.course_play = course_play
user_play.set_play_state_for_course(
use_course_skip_purchase)
elif 0 <= user_play.course_play_state <= 3:
# 验证token
user_play.update_token_for_course()
else:
# 课题模式已经结束
user_play.clear_play_state()
user.select_user_about_stamina()
status = 'cleared' if user_play.course_play_state == 4 else 'failed'
status = 'created'
if user_play.course_play_state == -1:
# 没有token课题模式刚开始
course_play = CoursePlay(c, user, user_play)
course_play.course_id = request.args['course_id']
user_play.course_play = course_play
user_play.set_play_state_for_course(
use_course_skip_purchase)
elif 0 <= user_play.course_play_state <= 3:
# 验证token
user_play.update_token_for_course()
else:
# 课题模式已经结束
user_play.clear_play_state()
user.select_user_about_stamina()
status = 'cleared' if user_play.course_play_state == 4 else 'failed'
return success_return({
"stamina": user.stamina.stamina,
"max_stamina_ts": user.stamina.max_stamina_ts,
"token": user_play.song_token,
'status': status
})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({
"stamina": user.stamina.stamina,
"max_stamina_ts": user.stamina.max_stamina_ts,
"token": user_play.song_token,
'status': status
})
@bp.route('/song', methods=['POST']) # 成绩上传
@auth_required(request)
@arc_try
def song_score_post(user_id):
with Connect() as c:
try:
x = UserPlay(c, UserOnline(c, user_id))
x.song_token = request.form['song_token']
x.song_hash = request.form['song_hash']
x.song.set_chart(
request.form['song_id'], request.form['difficulty'])
x.set_score(request.form['score'], request.form['shiny_perfect_count'], request.form['perfect_count'], request.form['near_count'],
request.form['miss_count'], request.form['health'], request.form['modifier'], int(time() * 1000), request.form['clear_type'])
x.beyond_gauge = int(request.form['beyond_gauge'])
x.submission_hash = request.form['submission_hash']
if not x.is_valid:
raise InputError('Invalid score.', 107)
x.upload_score()
return success_return(x.to_dict())
except ArcError as e:
return error_return(e)
return error_return()
x = UserPlay(c, UserOnline(c, user_id))
x.song_token = request.form['song_token']
x.song_hash = request.form['song_hash']
x.song.set_chart(
request.form['song_id'], request.form['difficulty'])
x.set_score(request.form['score'], request.form['shiny_perfect_count'], request.form['perfect_count'], request.form['near_count'],
request.form['miss_count'], request.form['health'], request.form['modifier'], int(time() * 1000), request.form['clear_type'])
x.beyond_gauge = int(request.form['beyond_gauge'])
x.submission_hash = request.form['submission_hash']
if not x.is_valid:
raise InputError('Invalid score.', 107)
x.upload_score()
return success_return(x.to_dict())
@bp.route('/song', methods=['GET']) # TOP20
@auth_required(request)
@arc_try
def song_score_top(user_id):
with Connect() as c:
try:
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_top()
return success_return(rank_list.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_top()
return success_return(rank_list.to_dict_list())
@bp.route('/song/me', methods=['GET']) # 我的排名默认最多20
@auth_required(request)
@arc_try
def song_score_me(user_id):
with Connect() as c:
try:
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_me(UserOnline(c, user_id))
return success_return(rank_list.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_me(UserOnline(c, user_id))
return success_return(rank_list.to_dict_list())
@bp.route('/song/friend', methods=['GET']) # 好友排名默认最多50
@auth_required(request)
@arc_try
def song_score_friend(user_id):
with Connect() as c:
try:
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_friend(UserOnline(c, user_id))
return success_return(rank_list.to_dict_list())
except ArcError as e:
return error_return(e)
return error_return()
rank_list = RankList(c)
rank_list.song.set_chart(request.args.get(
'song_id'), request.args.get('difficulty'))
rank_list.select_friend(UserOnline(c, user_id))
return success_return(rank_list.to_dict_list())

View File

@@ -8,186 +8,160 @@ from flask import Blueprint, request
from setting import Config
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('user', __name__, url_prefix='/user')
@bp.route('', methods=['POST']) # 注册接口
@arc_try
def register():
if 'AppVersion' in request.headers: # 版本检查
if Config.ALLOW_APPVERSION:
if request.headers['AppVersion'] not in Config.ALLOW_APPVERSION:
return error_return(NoAccess('Wrong app version.', 1203))
raise NoAccess('Wrong app version.', 1203)
with Connect() as c:
try:
new_user = UserRegister(c)
new_user.set_name(request.form['name'])
new_user.set_password(request.form['password'])
new_user.set_email(request.form['email'])
if 'device_id' in request.form:
device_id = request.form['device_id']
else:
device_id = 'low_version'
new_user = UserRegister(c)
new_user.set_name(request.form['name'])
new_user.set_password(request.form['password'])
new_user.set_email(request.form['email'])
if 'device_id' in request.form:
device_id = request.form['device_id']
else:
device_id = 'low_version'
new_user.register()
new_user.register()
# 注册后自动登录
user = UserLogin(c)
user.login(new_user.name, new_user.password,
device_id, request.remote_addr)
return success_return({'user_id': user.user_id, 'access_token': user.token})
except ArcError as e:
return error_return(e)
return error_return()
# 注册后自动登录
user = UserLogin(c)
user.login(new_user.name, new_user.password,
device_id, request.remote_addr)
return success_return({'user_id': user.user_id, 'access_token': user.token})
@bp.route('/me', methods=['GET']) # 用户信息
@auth_required(request)
@arc_try
def user_me(user_id):
with Connect() as c:
try:
return success_return(UserOnline(c, user_id).to_dict())
except ArcError as e:
return error_return(e)
return error_return()
return success_return(UserOnline(c, user_id).to_dict())
@bp.route('/me/character', methods=['POST']) # 角色切换
@auth_required(request)
@arc_try
def character_change(user_id):
with Connect() as c:
try:
user = UserOnline(c, user_id)
user.change_character(
int(request.form['character']), request.form['skill_sealed'] == 'true')
user = UserOnline(c, user_id)
user.change_character(
int(request.form['character']), request.form['skill_sealed'] == 'true')
return success_return({'user_id': user.user_id, 'character': user.character.character_id})
except ArcError as e:
return error_return(e)
return error_return()
return success_return({'user_id': user.user_id, 'character': user.character.character_id})
# 角色觉醒切换
@bp.route('/me/character/<int:character_id>/toggle_uncap', methods=['POST'])
@auth_required(request)
@arc_try
def toggle_uncap(user_id, character_id):
with Connect() as c:
try:
user = User()
user.user_id = user_id
character = UserCharacter(c, character_id)
character.change_uncap_override(user)
character.select_character_info(user)
return success_return({'user_id': user.user_id, 'character': [character.to_dict()]})
except ArcError as e:
return error_return(e)
return error_return()
user = User()
user.user_id = user_id
character = UserCharacter(c, character_id)
character.change_uncap_override(user)
character.select_character_info(user)
return success_return({'user_id': user.user_id, 'character': [character.to_dict()]})
# 角色觉醒
@bp.route('/me/character/<int:character_id>/uncap', methods=['POST'])
@auth_required(request)
@arc_try
def character_first_uncap(user_id, character_id):
with Connect() as c:
try:
user = UserOnline(c, user_id)
character = UserCharacter(c, character_id)
character.select_character_info(user)
character.character_uncap(user)
return success_return({'user_id': user.user_id, 'character': [character.to_dict()], 'cores': user.cores})
except ArcError as e:
return error_return(e)
return error_return()
user = UserOnline(c, user_id)
character = UserCharacter(c, character_id)
character.select_character_info(user)
character.character_uncap(user)
return success_return({'user_id': user.user_id, 'character': [character.to_dict()], 'cores': user.cores})
# 角色使用以太之滴
@bp.route('/me/character/<int:character_id>/exp', methods=['POST'])
@auth_required(request)
@arc_try
def character_exp(user_id, character_id):
with Connect() as c:
try:
user = UserOnline(c, user_id)
character = UserCharacter(c, character_id)
character.select_character_info(user)
core = ItemCore(c)
core.amount = - int(request.form['amount'])
core.item_id = 'core_generic'
character.upgrade_by_core(user, core)
return success_return({'user_id': user.user_id, 'character': [character.to_dict], 'cores': user.cores})
except ArcError as e:
return error_return(e)
return error_return()
user = UserOnline(c, user_id)
character = UserCharacter(c, character_id)
character.select_character_info(user)
core = ItemCore(c)
core.amount = - int(request.form['amount'])
core.item_id = 'core_generic'
character.upgrade_by_core(user, core)
return success_return({'user_id': user.user_id, 'character': [character.to_dict], 'cores': user.cores})
@bp.route('/me/save', methods=['GET']) # 从云端同步
@auth_required(request)
@arc_try
def cloud_get(user_id):
with Connect() as c:
try:
user = User()
user.user_id = user_id
save = SaveData(c)
save.select_all(user)
return success_return(save.to_dict())
except ArcError as e:
return error_return(e)
return error_return()
user = User()
user.user_id = user_id
save = SaveData(c)
save.select_all(user)
return success_return(save.to_dict())
@bp.route('/me/save', methods=['POST']) # 向云端同步
@auth_required(request)
@arc_try
def cloud_post(user_id):
with Connect() as c:
try:
user = User()
user.user_id = user_id
save = SaveData(c)
save.set_value(
'scores_data', request.form['scores_data'], request.form['scores_checksum'])
save.set_value(
'clearlamps_data', request.form['clearlamps_data'], request.form['clearlamps_checksum'])
save.set_value(
'clearedsongs_data', request.form['clearedsongs_data'], request.form['clearedsongs_checksum'])
save.set_value(
'unlocklist_data', request.form['unlocklist_data'], request.form['unlocklist_checksum'])
save.set_value(
'installid_data', request.form['installid_data'], request.form['installid_checksum'])
save.set_value('devicemodelname_data',
request.form['devicemodelname_data'], request.form['devicemodelname_checksum'])
save.set_value(
'story_data', request.form['story_data'], request.form['story_checksum'])
save.set_value(
'finalestate_data', request.form['finalestate_data'], request.form['finalestate_checksum'])
user = User()
user.user_id = user_id
save = SaveData(c)
save.set_value(
'scores_data', request.form['scores_data'], request.form['scores_checksum'])
save.set_value(
'clearlamps_data', request.form['clearlamps_data'], request.form['clearlamps_checksum'])
save.set_value(
'clearedsongs_data', request.form['clearedsongs_data'], request.form['clearedsongs_checksum'])
save.set_value(
'unlocklist_data', request.form['unlocklist_data'], request.form['unlocklist_checksum'])
save.set_value(
'installid_data', request.form['installid_data'], request.form['installid_checksum'])
save.set_value('devicemodelname_data',
request.form['devicemodelname_data'], request.form['devicemodelname_checksum'])
save.set_value(
'story_data', request.form['story_data'], request.form['story_checksum'])
save.set_value(
'finalestate_data', request.form['finalestate_data'], request.form['finalestate_checksum'])
save.update_all(user)
return success_return({'user_id': user.user_id})
except ArcError as e:
return error_return(e)
return error_return()
save.update_all(user)
return success_return({'user_id': user.user_id})
@bp.route('/me/setting/<set_arg>', methods=['POST']) # 三个设置
@auth_required(request)
@arc_try
def sys_set(user_id, set_arg):
with Connect() as c:
try:
value = request.form['value']
user = UserOnline(c, user_id)
if 'favorite_character' == set_arg:
user.change_favorite_character(int(value))
else:
value = 'true' == value
if 'is_hide_rating' == set_arg or 'max_stamina_notification_enabled' == set_arg:
user.update_user_one_column(set_arg, value)
return success_return(user.to_dict())
except ArcError as e:
return error_return(e)
return error_return()
value = request.form['value']
user = UserOnline(c, user_id)
if 'favorite_character' == set_arg:
user.change_favorite_character(int(value))
else:
value = 'true' == value
if 'is_hide_rating' == set_arg or 'max_stamina_notification_enabled' == set_arg:
user.update_user_one_column(set_arg, value)
return success_return(user.to_dict())
@bp.route('/me/request_delete', methods=['POST']) # 删除账号
@auth_required(request)
@arc_try
def user_delete(user_id):
return error_return(ArcError('Cannot delete the account.', 151)), 404
raise ArcError('Cannot delete the account.', 151, status=404)

View File

@@ -1,57 +1,47 @@
from core.error import ArcError
from core.sql import Connect
from core.user import UserOnline
from core.world import UserMap, get_world_all
from flask import Blueprint, request
from .auth import auth_required
from .func import error_return, success_return
from .func import arc_try, success_return
bp = Blueprint('world', __name__, url_prefix='/world')
@bp.route('/map/me', methods=['GET']) # 获得世界模式信息,所有地图
@auth_required(request)
@arc_try
def world_all(user_id):
with Connect() as c:
try:
user = UserOnline(c, user_id)
user.select_user_about_current_map()
return success_return({
"current_map": user.current_map.map_id,
"user_id": user_id,
"maps": [x.to_dict(has_map_info=True, has_rewards=True) for x in get_world_all(c, user)]
})
except ArcError as e:
return error_return(e)
return error_return()
user = UserOnline(c, user_id)
user.select_user_about_current_map()
return success_return({
"current_map": user.current_map.map_id,
"user_id": user_id,
"maps": [x.to_dict(has_map_info=True, has_rewards=True) for x in get_world_all(c, user)]
})
@bp.route('/map/me', methods=['POST']) # 进入地图
@auth_required(request)
@arc_try
def world_in(user_id):
with Connect() as c:
try:
arcmap = UserMap(c, request.form['map_id'], UserOnline(c, user_id))
if arcmap.unlock():
return success_return(arcmap.to_dict())
except ArcError as e:
return error_return(e)
return error_return()
arcmap = UserMap(c, request.form['map_id'], UserOnline(c, user_id))
if arcmap.unlock():
return success_return(arcmap.to_dict())
@bp.route('/map/me/<map_id>', methods=['GET']) # 获得单个地图完整信息
@auth_required(request)
@arc_try
def world_one(user_id, map_id):
with Connect() as c:
try:
arcmap = UserMap(c, map_id, UserOnline(c, user_id))
arcmap.change_user_current_map()
return success_return({
"user_id": user_id,
"current_map": map_id,
"maps": [arcmap.to_dict(has_map_info=True, has_steps=True)]
})
except ArcError as e:
return error_return(e)
return error_return()
arcmap = UserMap(c, map_id, UserOnline(c, user_id))
arcmap.change_user_current_map()
return success_return({
"user_id": user_id,
"current_map": map_id,
"maps": [arcmap.to_dict(has_map_info=True, has_steps=True)]
})