Merge pull request #94 from Lost-MSth/dev

Update to v2.11.0
This commit is contained in:
Lost-MSth
2023-03-05 22:58:19 +08:00
committed by GitHub
40 changed files with 1611 additions and 498 deletions

View File

@@ -73,17 +73,18 @@ It is just so interesting. What it can do is under exploration.
>
> Tips: When updating, please keep the original database in case of data loss.
### Version 2.10.3
### Version 2.11.0
- 适用于Arcaea 4.2.0版本 For Arcaea 4.2.0
- 新搭档 **拉格兰Aria** 已解锁 Unlock the character **Lagrange(Aria)**. (Lack of its values)
- 搭档 **忘却Apophenia**解锁 Unlock the character **Lethe(Apophenia)**.
- 新增选项取消歌曲文件哈希预计算 Add an option to disable song file hash pre-calculation.
- 新增对世界模式中地图本地限制歌曲解锁或挑战解锁以及地图中台阶上限制歌曲难度的支持 Add support for restricting songs' difficulty in the map's steps of world mode and locally restricting unlocking songs or challenges in the map of world mode.
- 恢复使用云存档覆盖成绩的功能 Restore the feature that cloud save can be used to cover best scores.
- 捕获`Authorization`不在请求头导致的报错 Capture error that the request does not have `Authorization` in header.
- 修复客户端版本校验中请求头不存在`AppVersion`也能通过校验的逻辑错误 Fix a bug that headers without `AppVersion` are allowed in client version checking.
- 新增增删改歌曲信息的API接口 Add some API endpoints, including creating, changing, deleting song info.
- 适用于Arcaea 4.3.0版本 For Arcaea 4.3.0
- 新搭档 **霞玛(大~宇~宙)**、**米露可(大~宇~宙)**、**紫黑**、**百合咲美香** 已解锁 Unlock the character **Shama(UNiVERSE)**, **Milk(UNiVERSE)**, **Shikoku**, **Mika Yurisaki**.
- 搭档 **依莉丝**觉醒 Uncap the character **Ilith**.
- 为觉醒 **依莉丝** 以及 **百合咲美香** 的技能提供支持 Add support for the skills of uncapped **Ilith** and **Mika Yurisaki**.
- 为 Beyond 图倍增提供支持 Add support for beyond gauge boost.
- 为 Beyond 连锁图提供支持 Add support for beyond chain maps.
- 修复联机时无人房间仍可进入的问题 Fix a logic bug that the room without anyone can be entered in multiplayer.
- 对一些数值的算法进行了更改 Some changes in some values' algorithms.
- 小重构 Link Play 子程序 Refactor simply for Link Play subprogram.
- 新增增删改兑换码、购买项目、登陆奖励、物品的API接口 Add some API endpoints, including creating, changing, deleting about redeem, purchase, login present and item.
## 运行环境与依赖 Running environment and requirements

View File

@@ -1,10 +1,14 @@
from flask import Blueprint
from . import users
from . import songs
from . import token
from . import (users, songs, token, system, items,
purchases, presents, redeems)
bp = Blueprint('api', __name__, url_prefix='/api/v1')
bp.register_blueprint(users.bp)
bp.register_blueprint(songs.bp)
bp.register_blueprint(token.bp)
bp.register_blueprint(system.bp)
bp.register_blueprint(items.bp)
bp.register_blueprint(purchases.bp)
bp.register_blueprint(presents.bp)
bp.register_blueprint(redeems.bp)

View File

@@ -1,15 +1,17 @@
from functools import wraps
from traceback import format_exc
from base64 import b64decode
from functools import wraps
from json import loads
from traceback import format_exc
from flask import current_app
from core.api_user import APIUser
from core.config_manager import Config
from core.error import ArcError, NoAccess, PostError
from core.error import ArcError, InputError, NoAccess, PostError
from core.sql import Connect
from flask import current_app
from .api_code import error_return
from .constant import Constant
def role_required(request, powers=[]):
@@ -48,7 +50,7 @@ def role_required(request, powers=[]):
return decorator
def request_json_handle(request, required_keys: list = [], optional_keys: list = [], must_change: bool = False):
def request_json_handle(request, required_keys: list = [], optional_keys: list = [], must_change: bool = False, is_batch: bool = False):
'''
提取post参数返回dict写成了修饰器\
parameters: \
@@ -64,7 +66,7 @@ def request_json_handle(request, required_keys: list = [], optional_keys: list =
data = {}
if request.data:
json_data = request.json
json_data = request.get_json()
else:
if request.method == 'GET' and 'query' in request.args:
# 处理axios没法GET传data的问题
@@ -78,15 +80,24 @@ def request_json_handle(request, required_keys: list = [], optional_keys: list =
for key in required_keys:
if key not in json_data:
return error_return(PostError(f'Missing parameter: {key}', api_error_code=-100))
return error_return(InputError(f'Missing parameter: {key}', api_error_code=-100))
data[key] = json_data[key]
for key in optional_keys:
if key in json_data:
data[key] = json_data[key]
if is_batch:
for key in Constant.PATCH_KEYS:
if key in json_data:
data[key] = json_data[key]
if not isinstance(data[key], list):
return error_return(InputError(f'Parameter {key} must be a list', api_error_code=-100))
if not data:
return error_return(InputError('No change', api_error_code=-100))
else:
for key in optional_keys:
if key in json_data:
data[key] = json_data[key]
if must_change and not data:
return error_return(PostError('No change', api_error_code=-100))
if must_change and not data:
return error_return(InputError('No change', api_error_code=-100))
return view(data, *args, **kwargs)

View File

@@ -18,6 +18,11 @@ CODE_MSG = {
-104: 'Invalid sort order parameter',
-105: 'Invalid URL parameter',
-110: 'Invalid user_id',
-120: 'Invalid item type',
-121: 'No such item',
-122: 'Item already exists',
-123: 'The collection already has this item',
-124: 'The collection does not have this item',
-200: 'No permission', # 2xx用户相关错误
-201: 'Wrong username or password',
-202: 'User is banned',

View File

@@ -1,2 +1,4 @@
class Constant:
QUERY_KEYS = ['limit', 'offset', 'query', 'fuzzy_query', 'sort']
PATCH_KEYS = ['create', 'update', 'remove']

115
latest version/api/items.py Normal file
View File

@@ -0,0 +1,115 @@
from flask import Blueprint, request
from core.error import DataExist, InputError, NoData
from core.item import Item, ItemFactory
from core.sql import Connect, Query, Sql
from .api_auth import api_try, request_json_handle, role_required
from .api_code import success_return
from .constant import Constant
bp = Blueprint('items', __name__, url_prefix='/items')
@bp.route('', methods=['GET'])
@role_required(request, ['select'])
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
@api_try
def items_get(data, user):
'''查询全物品信息'''
with Connect() as c:
query = Query(['item_id', 'type'], ['item_id'],
['item_id']).from_dict(data)
x = Sql(c).select('item', query=query)
r: 'list[Item]' = []
for i in x:
r.append(ItemFactory.from_dict({
'item_id': i[0],
'type': i[1],
'is_available': i[2] == 1
}))
if not r:
raise NoData(api_error_code=-2)
return success_return([x.to_dict(has_is_available=True, has_amount=False) for x in r])
ALLOW_ITEM_TYPE = ['pack', 'single', 'world_song', 'character']
@bp.route('', methods=['POST'])
@role_required(request, ['change'])
@request_json_handle(request, required_keys=['item_id', 'type'], optional_keys=['is_available'])
@api_try
def items_post(data, user):
'''新增物品'''
if data['type'] not in ALLOW_ITEM_TYPE:
raise InputError(
f'Invalid item type: `{data["type"]}`', api_error_code=-120)
with Connect() as c:
item = ItemFactory.from_dict(data, c=c)
if item.select_exists():
raise DataExist(
f'Item `{item.item_type}`: `{item.item_id}` already exists', api_error_code=-122)
item.insert()
return success_return(item.to_dict(has_is_available=True, has_amount=False))
@bp.route('/<string:item_type>/<string:item_id>', methods=['DELETE'])
@role_required(request, ['change'])
@api_try
def items_item_delete(user, item_type, item_id):
'''删除物品'''
if item_type not in ALLOW_ITEM_TYPE:
raise InputError(
f'Invalid item type: `{item_type}`', api_error_code=-120)
with Connect() as c:
item = ItemFactory.from_dict({
'item_id': item_id,
'type': item_type
}, c=c)
if not item.select_exists():
raise NoData(
f'No such item `{item_type}`: `{item_id}`', api_error_code=-121)
item.delete()
return success_return()
@bp.route('/<string:item_type>/<string:item_id>', methods=['PUT'])
@role_required(request, ['change'])
@request_json_handle(request, optional_keys=['is_available'], must_change=True)
@api_try
def items_item_put(data, user, item_type, item_id):
'''修改物品'''
if item_type not in ALLOW_ITEM_TYPE:
raise InputError(
f'Invalid item type: `{item_type}`', api_error_code=-120)
if not isinstance(data['is_available'], bool):
raise InputError('`is_available` must be a boolean',
api_error_code=-101)
with Connect() as c:
item = ItemFactory.from_dict({
'item_id': item_id,
'type': item_type,
'is_available': data['is_available']
}, c=c)
if not item.select_exists():
raise NoData(
f'No such item `{item_type}`: `{item_id}`', api_error_code=-121)
item.update()
return success_return(item.to_dict(has_is_available=True, has_amount=False))
@bp.route('/<string:item_type>/<string:item_id>', methods=['GET'])
@role_required(request, ['select'])
@api_try
def items_item_get(user, item_type, item_id):
'''查询单个物品信息'''
with Connect() as c:
item = ItemFactory.from_dict({
'item_id': item_id,
'type': item_type
}, c=c)
item.select()
return success_return(item.to_dict(has_is_available=True, has_amount=False))

View File

@@ -0,0 +1,122 @@
from flask import Blueprint, request
from core.error import DataExist, InputError, NoData
from core.item import ItemFactory
from core.present import Present
from core.sql import Connect, Query, Sql
from .api_auth import api_try, request_json_handle, role_required
from .api_code import success_return
from .constant import Constant
bp = Blueprint('presents', __name__, url_prefix='/presents')
@bp.route('', methods=['GET'])
@role_required(request, ['select'])
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
@api_try
def presents_get(data, user):
'''查询全present信息'''
with Connect() as c:
query = Query(['present_id'], ['present_id', 'description'], [
'present_id', 'expire_ts']).from_dict(data)
x = Sql(c).select('present', query=query)
r = [Present().from_list(i) for i in x]
if not r:
raise NoData(api_error_code=-2)
return success_return([x.to_dict(has_items=False) for x in r])
@bp.route('', methods=['POST'])
@role_required(request, ['insert'])
@request_json_handle(request, required_keys=['present_id', 'description', 'expire_ts'], optional_keys=['items'])
@api_try
def presents_post(data, user):
'''添加present注意可以有items不存在的item会自动创建'''
with Connect() as c:
p = Present(c).from_dict(data)
if p.select_exists():
raise DataExist(
f'Present `{p.present_id}` already exists')
p.insert_all()
return success_return(p.to_dict(has_items='items' in data))
@bp.route('/<string:present_id>', methods=['GET'])
@role_required(request, ['select'])
@api_try
def presents_present_get(user, present_id: str):
'''查询单个present信息'''
with Connect() as c:
p = Present(c).select(present_id)
p.select_items()
return success_return(p.to_dict())
@bp.route('/<string:present_id>', methods=['DELETE'])
@role_required(request, ['delete'])
@api_try
def presents_present_delete(user, present_id: str):
'''删除present会连带删除present_item'''
with Connect() as c:
Present(c).select(present_id).delete_all()
return success_return()
@bp.route('/<string:present_id>', methods=['PUT'])
@role_required(request, ['change'])
@request_json_handle(request, optional_keys=['description', 'expire_ts'], must_change=True)
@api_try
def presents_present_put(data, user, present_id: str):
'''更新present信息注意不能有items'''
with Connect() as c:
p = Present(c).select(present_id)
if 'description' in data:
p.description = str(data['description'])
if 'expire_ts' in data:
p.expire_ts = int(data['expire_ts'])
p.update()
return success_return(p.to_dict(has_items=False))
@bp.route('/<string:present_id>/items', methods=['GET'])
@role_required(request, ['select'])
@api_try
def presents_present_items_get(user, present_id: str):
'''查询present的items'''
with Connect() as c:
p = Present(c)
p.present_id = present_id
p.select_items()
return success_return([x.to_dict(has_is_available=True) for x in p.items])
@bp.route('/<string:present_id>/items', methods=['PATCH'])
@role_required(request, ['change'])
@request_json_handle(request, is_batch=True)
@api_try
def presents_present_items_patch(data, user, present_id: str):
'''增删改单个present的items'''
with Connect() as c:
p = Present(c)
p.present_id = present_id
p.select_items()
p.remove_items([ItemFactory.from_dict(x, c=c)
for x in data.get('remove', [])])
p.add_items([ItemFactory.from_dict(x, c=c)
for x in data.get('create', [])])
updates = data.get('update', [])
for x in updates:
if 'amount' not in x:
raise InputError('`amount` is required in `update`')
if not isinstance(x['amount'], int) or x['amount'] <= 0:
raise InputError(
'`amount` must be a positive integer', api_error_code=-101)
p.update_items([ItemFactory.from_dict(x, c=c) for x in updates])
return success_return([x.to_dict(has_is_available=True) for x in p.items])

View File

@@ -0,0 +1,163 @@
from flask import Blueprint, request
from core.error import DataExist, InputError, NoData
from core.item import ItemFactory
from core.purchase import Purchase
from core.sql import Connect, Query, Sql
from .api_auth import api_try, request_json_handle, role_required
from .api_code import success_return
from .constant import Constant
bp = Blueprint('purchases', __name__, url_prefix='/purchases')
@bp.route('', methods=['GET'])
@role_required(request, ['select'])
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
@api_try
def purchases_get(data, user):
'''查询全购买信息'''
with Connect() as c:
query = Query(['purchase_name', 'discount_reason'], ['purchase_name'], [
'purchase_name', 'price', 'orig_price', 'discount_from', 'discount_to']).from_dict(data)
x = Sql(c).select('purchase', query=query)
r = [Purchase().from_list(i) for i in x]
if not r:
raise NoData(api_error_code=-2)
return success_return([x.to_dict(has_items=False, show_real_price=False) for x in r])
@bp.route('', methods=['POST'])
@role_required(request, ['change'])
@request_json_handle(request, required_keys=['purchase_name', 'orig_price'], optional_keys=['price', 'discount_from', 'discount_to', 'discount_reason', 'items'])
@api_try
def purchases_post(data, user):
'''新增购买注意可以有items不存在的item会自动创建'''
with Connect() as c:
purchase = Purchase(c).from_dict(data)
if purchase.select_exists():
raise DataExist(
f'Purchase `{purchase.purchase_name}` already exists')
purchase.insert_all()
return success_return(purchase.to_dict(has_items='items' in data, show_real_price=False))
@bp.route('/<string:purchase_name>', methods=['GET'])
@role_required(request, ['select'])
@api_try
def purchases_purchase_get(user, purchase_name: str):
'''查询单个购买信息'''
with Connect() as c:
return success_return(Purchase(c).select(purchase_name).to_dict(show_real_price=False))
@bp.route('/<string:purchase_name>', methods=['DELETE'])
@role_required(request, ['change'])
@api_try
def purchases_purchase_delete(user, purchase_name: str):
'''删除单个购买信息会连带删除purchase_item'''
with Connect() as c:
Purchase(c).select(purchase_name).delete_all()
return success_return()
@bp.route('/<string:purchase_name>', methods=['PUT'])
@role_required(request, ['change'])
@request_json_handle(request, optional_keys=['price', 'orig_price', 'discount_from', 'discount_to', 'discount_reason'], must_change=True)
@api_try
def purchases_purchase_put(data, user, purchase_name: str):
'''修改单个购买信息注意不能有items'''
with Connect() as c:
purchase = Purchase(c).select(purchase_name)
t = ['price', 'orig_price', 'discount_from', 'discount_to']
for i in t:
if i in data:
setattr(purchase, i, int(data[i]))
if 'discount_reason' in data:
purchase.discount_reason = str(data['discount_reason'])
purchase.update()
return success_return(purchase.to_dict(has_items=False, show_real_price=False))
@bp.route('/<string:purchase_name>/items', methods=['GET'])
@role_required(request, ['select'])
@api_try
def purchases_purchase_items_get(user, purchase_name: str):
'''查询单个购买的所有items'''
with Connect() as c:
p = Purchase(c)
p.purchase_name = purchase_name
p.select_items()
return success_return([x.to_dict(has_is_available=True) for x in p.items])
# @bp.route('/<string:purchase_name>/items', methods=['POST'])
# @role_required(request, ['change'])
# @request_json_handle(request, required_keys=['item_id', 'type'], optional_keys=['amount'])
# @api_try
# def purchases_purchase_items_post(data, user, purchase_name: str):
# '''新增单个购买的批量items'''
# with Connect() as c:
# p = Purchase(c)
# p.purchase_name = purchase_name
# p.select_items()
# p.add_items([ItemFactory().from_dict(data)])
# return success_return([x.to_dict(has_is_available=True) for x in p.items])
@bp.route('/<string:purchase_name>/items', methods=['PATCH'])
@role_required(request, ['change'])
@request_json_handle(request, is_batch=True)
@api_try
def purchases_purchase_items_patch(data, user, purchase_name: str):
'''增删改单个购买的批量items'''
with Connect() as c:
p = Purchase(c)
p.purchase_name = purchase_name
p.select_items()
p.remove_items([ItemFactory.from_dict(x, c=c)
for x in data.get('remove', [])])
p.add_items([ItemFactory.from_dict(x, c=c)
for x in data.get('create', [])])
updates = data.get('update', [])
for x in updates:
if 'amount' not in x:
raise InputError('`amount` is required in `update`')
if not isinstance(x['amount'], int) or x['amount'] <= 0:
raise InputError(
'`amount` must be a positive integer', api_error_code=-101)
p.update_items([ItemFactory.from_dict(x, c=c) for x in updates])
return success_return([x.to_dict(has_is_available=True) for x in p.items])
# @bp.route('/<string:purchase_name>/items/<string:item_type>/<string:item_id>', methods=['DELETE'])
# @role_required(request, ['change'])
# @api_try
# def purchases_purchase_items_item_delete(user, purchase_name: str, item_type: str, item_id: str):
# '''删除单个购买的单个item'''
# with Connect() as c:
# p = Purchase(c)
# p.purchase_name = purchase_name
# p.select_items()
# p.delete_items([ItemFactory().from_dict(
# {'item_type': item_type, 'item_id': item_id})])
# return success_return()
# @bp.route('/<string:purchase_name>/items/<string:item_type>/<string:item_id>', methods=['PUT'])
# @role_required(request, ['change'])
# @request_json_handle(request, optional_keys=['amount', 'is_available'], must_change=True)
# @api_try
# def purchases_purchase_items_item_put(data, user, purchase_name: str, item_type: str, item_id: str):
# '''修改单个购买的单个item'''
# with Connect() as c:
# p = Purchase(c)
# p.purchase_name = purchase_name
# pass
# return success_return()

View File

@@ -0,0 +1,119 @@
from flask import Blueprint, request
from core.error import DataExist, InputError, NoData
from core.item import ItemFactory
from core.redeem import Redeem
from core.sql import Connect, Query, Sql
from .api_auth import api_try, request_json_handle, role_required
from .api_code import success_return
from .constant import Constant
bp = Blueprint('redeems', __name__, url_prefix='/redeems')
@bp.route('', methods=['GET'])
@role_required(request, ['select'])
@request_json_handle(request, optional_keys=Constant.QUERY_KEYS)
@api_try
def redeems_get(data, user):
'''查询全redeem信息'''
with Connect() as c:
query = Query(['code', 'type'], ['code'], ['code']).from_dict(data)
x = Sql(c).select('redeem', query=query)
r = [Redeem().from_list(i) for i in x]
if not r:
raise NoData(api_error_code=-2)
return success_return([x.to_dict(has_items=False) for x in r])
@bp.route('', methods=['POST'])
@role_required(request, ['insert'])
@request_json_handle(request, required_keys=['code', 'type'], optional_keys=['items'])
@api_try
def redeems_post(data, user):
'''添加redeem注意可以有items不存在的item会自动创建'''
with Connect() as c:
r = Redeem(c).from_dict(data)
if r.select_exists():
raise DataExist(
f'redeem `{r.code}` already exists')
r.insert_all()
return success_return(r.to_dict(has_items='items' in data))
@bp.route('/<string:code>', methods=['GET'])
@role_required(request, ['select'])
@api_try
def redeems_redeem_get(user, code: str):
'''查询单个redeem信息'''
with Connect() as c:
r = Redeem(c).select(code)
r.select_items()
return success_return(r.to_dict())
@bp.route('/<string:code>', methods=['DELETE'])
@role_required(request, ['delete'])
@api_try
def redeems_redeem_delete(user, code: str):
'''删除redeem会连带删除redeem_item'''
with Connect() as c:
Redeem(c).select(code).delete_all()
return success_return()
@bp.route('/<string:code>', methods=['PUT'])
@role_required(request, ['change'])
@request_json_handle(request, optional_keys=['type'], must_change=True)
@api_try
def redeems_redeem_put(data, user, code: str):
'''更新redeem信息注意不能有items'''
with Connect() as c:
r = Redeem(c).select(code)
if 'type' in data:
r.redeem_type = int(data['type'])
r.update()
return success_return(r.to_dict(has_items=False))
@bp.route('/<string:code>/items', methods=['GET'])
@role_required(request, ['select'])
@api_try
def redeems_redeem_items_get(user, code: str):
'''查询redeem的items'''
with Connect() as c:
r = Redeem(c)
r.code = code
r.select_items()
return success_return([x.to_dict(has_is_available=True) for x in r.items])
@bp.route('/<string:code>/items', methods=['PATCH'])
@role_required(request, ['change'])
@request_json_handle(request, is_batch=True)
@api_try
def redeems_redeem_items_patch(data, user, code: str):
'''增删改单个redeem的items'''
with Connect() as c:
r = Redeem(c)
r.code = code
r.select_items()
r.remove_items([ItemFactory.from_dict(x, c=c)
for x in data.get('remove', [])])
r.add_items([ItemFactory.from_dict(x, c=c)
for x in data.get('create', [])])
updates = data.get('update', [])
for x in updates:
if 'amount' not in x:
raise InputError('`amount` is required in `update`')
if not isinstance(x['amount'], int) or x['amount'] <= 0:
raise InputError(
'`amount` must be a positive integer', api_error_code=-101)
r.update_items([ItemFactory.from_dict(x, c=c) for x in updates])
return success_return([x.to_dict(has_is_available=True) for x in r.items])

View File

@@ -0,0 +1,32 @@
from flask import Blueprint, request
from core.error import ArcError
from core.operation import BaseOperation
from .api_auth import api_try, role_required
from .api_code import success_return
bp = Blueprint('system', __name__, url_prefix='/system')
operation_dict = {i._name: i for i in BaseOperation.__subclasses__()}
@bp.route('/operations', methods=['GET'])
@role_required(request, ['system'])
@api_try
def operations_get(user):
return success_return(list(operation_dict.keys()))
@bp.route('/operations/<string:operation_name>', methods=['POST'])
@role_required(request, ['system'])
@api_try
def operations_operation_post(user, operation_name: str):
if operation_name not in operation_dict:
raise ArcError(
f'No such operation: `{operation_name}`', api_error_code=-1, status=404)
x = operation_dict[operation_name]()
x.set_params(**request.get_json())
x.run()
return success_return()

View File

@@ -1,9 +1,10 @@
from base64 import b64decode
from flask import Blueprint, request
from core.api_user import APIUser
from core.error import PostError
from core.sql import Connect
from flask import Blueprint, request
from .api_auth import api_try, request_json_handle, role_required
from .api_code import success_return

View File

@@ -1,9 +1,10 @@
from flask import Blueprint, request
from core.api_user import APIUser
from core.error import InputError, NoAccess, NoData
from core.score import Potential, UserScoreList
from core.sql import Connect, Query, Sql
from core.user import UserChanger, UserInfo, UserRegister
from core.api_user import APIUser
from flask import Blueprint, request
from .api_auth import api_try, request_json_handle, role_required
from .api_code import error_return, success_return

View File

@@ -70,16 +70,24 @@ class CharacterValue:
self.set_parameter(start, mid, end)
@staticmethod
def _calc_char_value_20(level, stata, statb, lva=1, lvb=20):
# 计算1~20级搭档数值的核心函数返回浮点数来自https://redive.estertion.win/arcaea/calc/
n = [0, 0, 0.0005831753900000081, 0.004665403120000065, 0.015745735529959858, 0.03732322495992008, 0.07289692374980007, 0.12596588423968, 0.2000291587694801, 0.29858579967923987, 0.42513485930893946,
0.5748651406910605, 0.7014142003207574, 0.7999708412305152, 0.8740341157603029, 0.9271030762501818, 0.962676775040091, 0.9842542644700301, 0.9953345968799998, 0.9994168246100001, 1]
e = n[lva] - n[lvb]
a = stata - statb
r = a / e
d = stata - n[lva] * r
def _calc_char_value_20_math(level: int, value_1: float, value_20: float) -> float:
# by Lost-MSth
# 4/6859 = 0.00058317539
if level <= 10:
return 0.00058317539 * (level - 1) ** 3 * (value_20 - value_1) + value_1
else:
return - 0.00058317539 * (20 - level) ** 3 * (value_20 - value_1) + value_20
return d + r * n[level]
# @staticmethod
# def _calc_char_value_20(level: int, stata, statb, lva=1, lvb=20) -> float:
# # 计算1~20级搭档数值的核心函数返回浮点数来自https://redive.estertion.win/arcaea/calc/
# n = [0, 0, 0.0005831753900000081, 0.004665403120000065, 0.015745735529959858, 0.03732322495992008, 0.07289692374980007, 0.12596588423968, 0.2000291587694801, 0.29858579967923987, 0.42513485930893946,
# 0.5748651406910605, 0.7014142003207574, 0.7999708412305152, 0.8740341157603029, 0.9271030762501818, 0.962676775040091, 0.9842542644700301, 0.9953345968799998, 0.9994168246100001, 1]
# e = n[lva] - n[lvb]
# a = stata - statb
# r = a / e
# d = stata - n[lva] * r
# return d + r * n[level]
@staticmethod
def _calc_char_value_30(level, stata, statb, lva=20, lvb=30):
@@ -93,7 +101,7 @@ class CharacterValue:
def get_value(self, level: Level):
if level.min_level <= level.level <= level.mid_level:
return self._calc_char_value_20(level.level, self.start, self.mid)
return self._calc_char_value_20_math(level.level, self.start, self.mid)
elif level.mid_level < level.level <= level.max_level:
return self._calc_char_value_30(level.level, self.mid, self.end)
else:
@@ -103,7 +111,9 @@ class CharacterValue:
class Character:
database_table_name = None
def __init__(self) -> None:
def __init__(self, c=None) -> None:
self.c = c
self.character_id = None
self.name = None
self.char_type = None
@@ -134,10 +144,16 @@ class Character:
# 应该是只有对立这样
return self.character_id == 1
def to_dict(self) -> dict:
pass
def from_list(self, l: list) -> 'Character':
pass
class UserCharacter(Character):
'''
用户角色类\
用户角色类
property: `user` - `User`类或子类的实例
'''
database_table_name = 'user_char_full' if Config.CHARACTER_FULL_UNLOCK else 'user_char'
@@ -332,9 +348,10 @@ class UserCharacter(Character):
def upgrade_by_core(self, user=None, core=None):
'''
以太之滴升级注意这里core.amount应该是负数\
parameter: `user` - `User`类或子类的实例\
`core` - `ItemCore`类或子类的实例
以太之滴升级注意这里core.amount应该是负数
parameter: `user` - `User`类或子类的实例
`core` - `ItemCore`类或子类的实例
'''
if user:
self.user = user
@@ -353,7 +370,7 @@ class UserCharacter(Character):
class UserCharacterList:
'''
用户拥有角色列表类\
用户拥有角色列表类
properties: `user` - `User`类或子类的实例
'''
database_table_name = 'user_char_full' if Config.CHARACTER_FULL_UNLOCK else 'user_char'

View File

@@ -1,6 +1,6 @@
from .config_manager import Config
ARCAEA_SERVER_VERSION = 'v2.10.3'
ARCAEA_SERVER_VERSION = 'v2.11.0'
class Constant:
@@ -22,6 +22,8 @@ class Constant:
LUNA_UNCAP_BONUS_PROGRESS = 7
AYU_UNCAP_BONUS_PROGRESS = 5
SKILL_FATALIS_WORLD_LOCKED_TIME = 3600000
SKILL_MIKA_SONGS = ['aprilshowers', 'seventhsense', 'oshamascramble',
'amazingmightyyyy', 'cycles', 'maxrage', 'infinity', 'temptation']
MAX_FRIEND_COUNT = Config.MAX_FRIEND_COUNT
@@ -99,4 +101,4 @@ class Constant:
DATABASE_MIGRATE_TABLES = ['user', 'friend', 'best_score', 'recent30', 'user_world', 'item', 'user_item', 'purchase', 'purchase_item', 'user_save',
'login', 'present', 'user_present', 'present_item', 'redeem', 'user_redeem', 'redeem_item', 'api_login', 'chart', 'user_course', 'user_char', 'user_role']
UPDATE_WITH_NEW_CHARACTER_DATA = Config.UPDATE_WITH_NEW_CHARACTER_DATA
UPDATE_WITH_NEW_CHARACTER_DATA = Config.UPDATE_WITH_NEW_CHARACTER_DATA

View File

@@ -50,7 +50,7 @@ class DatabaseInit:
'''初始化搭档信息'''
for i in range(0, len(self.init_data.char)):
skill_requires_uncap = 1 if i == 2 else 0
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21, 42, 43, 11, 12, 19, 5]:
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21, 42, 43, 11, 12, 19, 5, 10]:
max_level = 30
uncapable = 1
else:
@@ -97,12 +97,6 @@ class DatabaseInit:
with open(self.single_path, 'rb') as f:
self.insert_purchase_item(load(f))
self.c.execute(
'''select exists(select * from item where item_id='epilogue')''')
if self.c.fetchone() == (0,):
self.c.execute(
'''insert into item values('epilogue','pack',1)''')
def course_init(self) -> None:
'''初始化课题信息'''
courses = []

View File

@@ -5,10 +5,14 @@ from .error import InputError, ItemNotEnough, ItemUnavailable, NoData
class Item:
item_type = None
def __init__(self) -> None:
def __init__(self, c=None) -> None:
self.item_id = None
self.__amount = None
self.is_available = None
self.c = c
def __eq__(self, other: 'Item') -> bool:
return self.item_id == other.item_id and self.item_type == other.item_type
@property
def amount(self):
@@ -18,12 +22,13 @@ class Item:
def amount(self, value: int):
self.__amount = int(value)
def to_dict(self, has_is_available: bool = False) -> dict:
def to_dict(self, has_is_available: bool = False, has_amount: bool = True) -> dict:
r = {
'id': self.item_id,
'amount': self.amount,
'type': self.item_type
}
if has_amount:
r['amount'] = self.amount
if has_is_available:
r['is_available'] = self.is_available
return r
@@ -32,6 +37,32 @@ class Item:
# parameter: user - User类或子类的实例
pass
def select_exists(self):
self.c.execute('''select exists(select * from item where item_id=? and type=?)''',
(self.item_id, self.item_type))
return bool(self.c.fetchone()[0])
def insert(self, ignore: bool = False):
sql = '''insert into item values(?,?,?)''' if not ignore else '''insert or ignore into item values(?,?,?)'''
self.c.execute(sql, (self.item_id, self.item_type, self.is_available))
def delete(self):
self.c.execute('''delete from item where item_id=? and type=?''',
(self.item_id, self.item_type))
def update(self):
self.c.execute('''update item set is_available=? where item_id=? and type=?''',
(self.is_available, self.item_id, self.item_type))
def select(self):
self.c.execute('''select is_available from item where item_id=? and type=?''',
(self.item_id, self.item_type))
x = self.c.fetchone()
if not x:
raise NoData(
f'No such item `{self.item_type}`: `{self.item_id}`', api_error_code=-121)
self.is_available = x[0]
class UserItem(Item):
@@ -40,7 +71,7 @@ class UserItem(Item):
self.c = c
self.user = None
def select(self, user=None):
def select_user_item(self, user=None):
'''
查询用户item\
parameter: `user` - `User`类或子类的实例
@@ -302,7 +333,8 @@ class ItemFactory:
elif item_type == 'course_banner':
return CourseBanner(self.c)
else:
raise InputError('The item type `%s` is wrong.' % item_type)
raise InputError(
f'The item type `{item_type}` is invalid.', api_error_code=-120)
@classmethod
def from_dict(cls, d: dict, c=None):

View File

@@ -116,8 +116,8 @@ class RemoteMultiPlayer:
self.data_recv = received.split('|')
if self.data_recv[0] != '0':
raise ArcError('Link Play error.',
int(self.data_recv[0]), status=400)
code = int(self.data_recv[0])
raise ArcError(f'Link Play error code: {code}', code, status=400)
def create_room(self, user: 'Player' = None) -> None:
'''创建房间'''

View File

@@ -1,4 +1,5 @@
from .download import DownloadList
from .error import NoData
from .save import SaveData
from .score import Score
from .sql import Connect, Sql
@@ -214,6 +215,11 @@ class UnlockUserItem(BaseOperation):
def _one_user_insert(self):
with Connect() as c:
c.execute(
'''select exists(select * from user where user_id = ?)''', (self.user.user_id,))
if not c.fetchone()[0]:
raise NoData(
f'No such user: `{self.user.user_id}`', api_error_code=-110)
c.execute(
f'''select item_id, type from item where type in ({','.join(['?'] * len(self.item_types))})''', self.item_types)
sql_list = [(self.user.user_id, i[0], i[1])

View File

@@ -1,8 +1,7 @@
from time import time
from core.item import ItemFactory
from .error import ArcError, NoData
from .error import ArcError, DataExist, NoData
from .item import ItemFactory
class Present:
@@ -18,15 +17,39 @@ class Present:
def is_expired(self) -> bool:
return self.expire_ts < int(time() * 1000)
def to_dict(self) -> dict:
return {
def to_dict(self, has_items: bool = True) -> dict:
r = {
'present_id': self.present_id,
'expire_ts': self.expire_ts,
'description': self.description,
'items': [x.to_dict() for x in self.items]
'description': self.description
}
if has_items:
r['items'] = [x.to_dict() for x in self.items]
return r
def select(self, present_id: str = None) -> None:
def from_dict(self, d: dict) -> 'Present':
self.present_id = d['present_id']
self.expire_ts = int(d['expire_ts'])
self.description = d.get('description', '')
self.items = [ItemFactory.from_dict(
i, c=self.c) for i in d.get('items', [])]
return self
def from_list(self, l: list) -> 'Present':
self.present_id = l[0]
self.expire_ts = int(l[1]) if l[1] else 0
self.description = l[2] if l[2] else ''
return self
def select_exists(self) -> bool:
'''
查询present是否存在
'''
self.c.execute(
'''select exists(select * from present where present_id=?)''', (self.present_id,))
return bool(self.c.fetchone()[0])
def select(self, present_id: str = None) -> 'Present':
'''
用present_id查询信息
'''
@@ -39,8 +62,8 @@ class Present:
if x is None:
raise NoData('The present `%s` does not exist.' % self.present_id)
self.expire_ts = x[1] if x[1] else 0
self.description = x[2] if x[2] else ''
self.from_list(x)
return self
def select_items(self) -> None:
'''
@@ -48,15 +71,79 @@ class Present:
'''
self.c.execute(
'''select * from present_item where present_id=:a''', {'a': self.present_id})
x = self.c.fetchall()
if not x:
raise NoData('The present `%s` does not have any items.' %
self.present_id)
self.items = [ItemFactory.from_dict({
'item_id': i[1],
'type': i[2],
'amount': i[3] if i[3] else 1
}, self.c) for i in x]
}, self.c) for i in self.c.fetchall()]
def insert_items(self) -> None:
for i in self.items:
self.c.execute('''insert or ignore into item values(?,?,?)''',
(i.item_id, i.item_type, i.is_available))
self.c.execute('''insert or ignore into present_item values(?,?,?,?)''',
(self.present_id, i.item_id, i.item_type, i.amount))
def insert(self) -> None:
self.c.execute('''insert into present values(?,?,?)''',
(self.present_id, self.expire_ts, self.description))
def insert_all(self) -> None:
self.insert()
self.insert_items()
def delete(self) -> None:
self.c.execute(
'''delete from present where present_id=?''', (self.present_id,))
def delete_present_item(self) -> None:
self.c.execute(
'''delete from present_item where present_id=?''', (self.present_id,))
def delete_all(self) -> None:
self.delete_present_item()
self.delete()
def update(self) -> None:
self.c.execute('''update present set expire_ts=?, description=? where present_id=?''',
(self.expire_ts, self.description, self.present_id))
def remove_items(self, items: list) -> None:
'''删除present_item表中的物品'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in present `{self.present_id}`', api_error_code=-124)
self.c.executemany('''delete from present_item where present_id=? and item_id=? and type=?''', [
(self.present_id, i.item_id, i.item_type) for i in items])
for i in items:
self.items.remove(i)
def add_items(self, items: list) -> None:
'''添加物品到present_item表'''
for i in items:
if not i.select_exists():
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
if i in self.items:
raise DataExist(
f'Item `{i.item_type}`: `{i.item_id}` already exists in present `{self.present_id}`', api_error_code=-123)
self.c.executemany('''insert into present_item values(?,?,?,?)''', [
(self.present_id, i.item_id, i.item_type, i.amount) for i in items])
self.items.extend(items)
def update_items(self, items: list) -> None:
'''更新present_item表中的物品'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in present `{self.present_id}`', api_error_code=-124)
self.c.executemany('''update present_item set amount=? where present_id=? and item_id=? and type=?''', [
(i.amount, self.present_id, i.item_id, i.item_type) for i in items])
for i in items:
self.items[self.items.index(i)].amount = i.amount
class UserPresent(Present):

View File

@@ -1,6 +1,6 @@
from time import time
from .error import NoData, TicketNotEnough
from .error import DataExist, InputError, NoData, TicketNotEnough
from .item import ItemFactory
@@ -34,31 +34,35 @@ class Purchase:
if self.discount_reason == 'anni5tix':
x = ItemFactory(self.c).get_item('anni5tix')
x.item_id = 'anni5tix'
x.select(self.user)
x.select_user_item(self.user)
if x.amount >= 1:
return 0
return self.price
return self.orig_price
def to_dict(self) -> dict:
price = self.price_displayed
def to_dict(self, has_items: bool = True, show_real_price: bool = True) -> dict:
if show_real_price:
price = self.price_displayed
r = {
'name': self.purchase_name,
'price': price,
'price': price if show_real_price else self.price,
'orig_price': self.orig_price,
'items': [x.to_dict(has_is_available=True) for x in self.items]
}
if has_items:
r['items'] = [x.to_dict(has_is_available=True) for x in self.items]
if self.discount_from > 0 and self.discount_to > 0:
r['discount_from'] = self.discount_from
r['discount_to'] = self.discount_to
if self.discount_reason == 'anni5tix' and price == 0:
if not show_real_price or (self.discount_reason == 'anni5tix' and price == 0):
r['discount_reason'] = self.discount_reason
return r
def from_dict(self, d: dict) -> 'Purchase':
self.purchase_name = d['name']
self.price = d['price']
self.orig_price = d['orig_price']
self.purchase_name = d.get('name') or d.get('purchase_name')
if not self.purchase_name:
raise InputError('purchase_name is required')
self.orig_price = int(d['orig_price'])
self.price = d.get('price', self.orig_price)
self.discount_from = d.get('discount_from', -1)
self.discount_to = d.get('discount_to', -1)
self.discount_reason = d.get('discount_reason', '')
@@ -67,24 +71,50 @@ class Purchase:
return self
def from_list(self, l: list) -> 'Purchase':
self.purchase_name = l[0]
self.price = l[1]
self.orig_price = l[2]
self.discount_from = l[3] if l[3] else -1
self.discount_to = l[4] if l[4] else -1
self.discount_reason = l[5] if l[5] else ''
return self
def insert_all(self) -> None:
'''向数据库插入包括item表和purchase_item表'''
self.insert()
self.insert_items()
def insert(self) -> None:
'''向数据库插入不包括item表和purchase_item表'''
self.c.execute('''insert into purchase values(?,?,?,?,?,?)''',
(self.purchase_name, self.price, self.orig_price, self.discount_from, self.discount_to, self.discount_reason))
self.insert_items()
# def insert_only_purchase_item(self) -> None:
# '''向数据库插入purchase_item表'''
# for i in self.items:
# self.c.execute('''insert into purchase_item values(?,?,?,?)''',
# (self.purchase_name, i.item_id, i.item_type, i.amount))
def insert_items(self) -> None:
'''向数据库插入物品,注意已存在的物品不会变更'''
for i in self.items:
self.c.execute(
'''select exists(select * from item where item_id=?)''', (i.item_id,))
if self.c.fetchone() == (0,):
self.c.execute('''insert into item values(?,?,?)''',
(i.item_id, i.item_type, i.is_available))
self.c.execute('''insert or ignore into item values(?,?,?)''',
(i.item_id, i.item_type, i.is_available))
self.c.execute('''insert into purchase_item values(?,?,?,?)''',
self.c.execute('''insert or ignore into purchase_item values(?,?,?,?)''',
(self.purchase_name, i.item_id, i.item_type, i.amount))
def select_exists(self, purchase_name: str = None) -> bool:
'''
用purchase_name查询存在性
'''
if purchase_name:
self.purchase_name = purchase_name
self.c.execute(
'''select exists(select * from purchase where purchase_name=?)''', (self.purchase_name,))
return self.c.fetchone() == (1,)
def select(self, purchase_name: str = None) -> 'Purchase':
'''
用purchase_name查询信息
@@ -93,11 +123,11 @@ class Purchase:
self.purchase_name = purchase_name
self.c.execute(
'''select * from purchase where purchase_name=:name''', {'name': purchase_name})
'''select * from purchase where purchase_name=:name''', {'name': self.purchase_name})
x = self.c.fetchone()
if not x:
raise NoData('The purchase `%s` does not exist.' %
purchase_name, 501)
raise NoData(
f'Purchase `{self.purchase_name}` does not exist.', 501)
self.price = x[1]
self.orig_price = x[2]
@@ -112,9 +142,9 @@ class Purchase:
self.c.execute(
'''select item_id, type, amount from purchase_item where purchase_name=:a''', {'a': self.purchase_name})
x = self.c.fetchall()
if not x:
raise NoData('The items of the purchase `%s` does not exist.' %
self.purchase_name, 501)
# if not x:
# raise NoData(
# f'The items of the purchase `{self.purchase_name}` does not exist.', 501)
self.items = []
t = None
@@ -162,6 +192,61 @@ class Purchase:
for i in self.items:
i.user_claim_item(self.user)
def delete_purchase_item(self) -> None:
'''删除purchase_item表'''
self.c.execute(
'''delete from purchase_item where purchase_name=?''', (self.purchase_name, ))
def delete(self) -> None:
'''删除purchase表'''
self.c.execute(
'''delete from purchase where purchase_name=?''', (self.purchase_name, ))
def delete_all(self) -> None:
'''删除purchase表和purchase_item表'''
self.delete_purchase_item()
self.delete()
def update(self) -> None:
'''更新purchase表'''
self.c.execute('''update purchase set price=:a, orig_price=:b, discount_from=:c, discount_to=:d, discount_reason=:e where purchase_name=:f''', {
'a': self.price, 'b': self.orig_price, 'c': self.discount_from, 'd': self.discount_to, 'e': self.discount_reason, 'f': self.purchase_name})
def add_items(self, items: list) -> None:
'''添加purchase_item表'''
for i in items:
if not i.select_exists():
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
if i in self.items:
raise DataExist(
f'Item `{i.item_type}`: `{i.item_id}` already exists in purchase `{self.purchase_name}`', api_error_code=-123)
self.c.executemany('''insert into purchase_item values (?, ?, ?, ?)''', [
(self.purchase_name, i.item_id, i.item_type, i.amount) for i in items])
self.items.extend(items)
def remove_items(self, items: list) -> None:
'''删除purchase_item表'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in purchase `{self.purchase_name}`', api_error_code=-124)
self.c.executemany('''delete from purchase_item where purchase_name=? and item_id=? and type=?''', [
(self.purchase_name, i.item_id, i.item_type) for i in items])
for i in items:
self.items.remove(i)
def update_items(self, items: list) -> None:
'''更新purchase_item表只能更新amount'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in purchase `{self.purchase_name}`', api_error_code=-124)
self.c.executemany('''update purchase_item set amount=? where purchase_name=? and item_id=? and type=?''', [
(i.amount, self.purchase_name, i.item_id, i.item_type) for i in items])
for i in items:
self.items[self.items.index(i)].amount = i.amount
class PurchaseList:
'''

View File

@@ -1,4 +1,4 @@
from .error import NoData, RedeemUnavailable
from .error import DataExist, NoData, RedeemUnavailable
from .item import ItemFactory
@@ -11,29 +11,117 @@ class Redeem:
self.items: list = []
self.fragment: int = None
def select(self, code: str = None) -> None:
def to_dict(self, has_items: bool = True) -> dict:
r = {
'code': self.code,
'type': self.redeem_type
}
if has_items:
r['items'] = [x.to_dict() for x in self.items]
return r
def from_dict(self, d: dict) -> 'Redeem':
self.code = str(d['code'])
self.redeem_type = int(d.get('type') or d.get('redeem_type', 0))
self.items = [ItemFactory.from_dict(
i, c=self.c) for i in d.get('items', [])]
return self
def from_list(self, l: list) -> 'Redeem':
self.code = l[0]
self.redeem_type = l[1]
return self
def select_exists(self) -> bool:
self.c.execute(
'''select exists(select * from redeem where code=?)''', (self.code,))
return bool(self.c.fetchone()[0])
def select(self, code: str = None) -> 'Redeem':
if code:
self.code = code
self.c.execute('''select * from redeem where code=:a''',
{'a': self.code})
x = self.c.fetchone()
if x is None:
raise NoData('The redeem `%s` does not exist.' % self.code, 504)
raise NoData(f'The redeem `{self.code}` does not exist.', 504)
self.redeem_type = x[1]
return self
def select_items(self) -> None:
self.c.execute('''select * from redeem_item where code=:a''',
{'a': self.code})
x = self.c.fetchall()
if not x:
raise NoData(
'The redeem `%s` does not have any items.' % self.code)
self.items = [ItemFactory.from_dict({
'item_id': i[1],
'type': i[2],
'amount': i[3] if i[3] else 1
}, self.c) for i in x]
}, self.c) for i in self.c.fetchall()]
def insert(self) -> None:
self.c.execute('''insert into redeem values(?,?)''',
(self.code, self.redeem_type))
def insert_items(self) -> None:
for i in self.items:
i.insert(ignore=True)
self.c.execute('''insert into redeem_item values(?,?,?,?)''', (
self.code, i.item_id, i.item_type, i.amount))
def insert_all(self) -> None:
self.insert()
self.insert_items()
def delete(self) -> None:
self.c.execute('''delete from redeem where code=?''', (self.code,))
def delete_redeem_item(self) -> None:
self.c.execute(
'''delete from redeem_item where code=?''', (self.code,))
def delete_all(self) -> None:
self.delete_redeem_item()
self.delete()
def update(self) -> None:
self.c.execute('''update redeem set type=? where code=?''',
(self.redeem_type, self.code))
def remove_items(self, items: list) -> None:
'''删除redeem_item表中的物品'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in redeem `{self.code}`', api_error_code=-124)
self.c.executemany('''delete from redeem_item where code=? and item_id=? and type=?''', [
(self.code, i.item_id, i.item_type) for i in items])
for i in items:
self.items.remove(i)
def add_items(self, items: list) -> None:
'''添加物品到redeem_item表'''
for i in items:
if not i.select_exists():
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}`', api_error_code=-121)
if i in self.items:
raise DataExist(
f'Item `{i.item_type}`: `{i.item_id}` already exists in redeem `{self.code}`', api_error_code=-123)
self.c.executemany('''insert into redeem_item values(?,?,?,?)''', [
(self.code, i.item_id, i.item_type, i.amount) for i in items])
self.items.extend(items)
def update_items(self, items: list) -> None:
'''更新redeem_item表中的物品'''
for i in items:
if i not in self.items:
raise NoData(
f'No such item `{i.item_type}`: `{i.item_id}` in redeem `{self.code}`', api_error_code=-124)
self.c.executemany('''update redeem_item set amount=? where code=? and item_id=? and type=?''', [
(i.amount, self.code, i.item_id, i.item_type) for i in items])
for i in items:
self.items[self.items.index(i)].amount = i.amount
class UserRedeem(Redeem):

View File

@@ -209,6 +209,7 @@ class UserPlay(UserScore):
self.stamina_multiply: int = None
self.fragment_multiply: int = None
self.prog_boost_multiply: int = None
self.beyond_boost_gauge_usage: int = None
self.ptt: Potential = None # 临时用来计算用户ptt的
self.world_play: 'WorldPlay' = None
@@ -228,7 +229,7 @@ class UserPlay(UserScore):
r['user_rating'] = self.user.rating_ptt
r['finale_challenge_higher'] = self.rating > self.ptt.value
r['global_rank'] = self.user.global_rank
r['finale_play_value'] = self.rating * 5 # emmmm
r['finale_play_value'] = 9.065 * self.rating ** 0.5 # by Lost-MSth
return r
@property
@@ -285,24 +286,29 @@ class UserPlay(UserScore):
self.stamina_multiply = int(x[8])
self.fragment_multiply = int(x[9])
self.prog_boost_multiply = int(x[10])
self.beyond_boost_gauge_usage = int(x[11])
self.is_world_mode = True
self.course_play_state = -1
def set_play_state_for_world(self, stamina_multiply: int = 1, fragment_multiply: int = 100, prog_boost_multiply: int = 0) -> None:
def set_play_state_for_world(self, stamina_multiply: int = 1, fragment_multiply: int = 100, prog_boost_multiply: int = 0, beyond_boost_gauge_usage: int = 0) -> None:
self.song_token = b64encode(urandom(64)).decode()
self.stamina_multiply = int(stamina_multiply)
self.fragment_multiply = int(fragment_multiply)
self.prog_boost_multiply = int(prog_boost_multiply)
if self.prog_boost_multiply != 0:
self.c.execute('''select prog_boost from user where user_id=:a''', {
self.beyond_boost_gauge_usage = int(beyond_boost_gauge_usage)
if self.prog_boost_multiply != 0 or self.beyond_boost_gauge_usage != 0:
self.c.execute('''select prog_boost, beyond_boost_gauge from user where user_id=:a''', {
'a': self.user.user_id})
x = self.c.fetchone()
if x and x[0] == 300:
self.prog_boost_multiply = 300
if x:
self.prog_boost_multiply = 300 if x[0] == 300 else 0
if x[1] < self.beyond_boost_gauge_usage or (self.beyond_boost_gauge_usage != 100 and self.beyond_boost_gauge_usage != 200):
# 注意偷懒了没判断是否是beyond图
self.beyond_boost_gauge_usage = 0
self.clear_play_state()
self.c.execute('''insert into songplay_token values(:t,:a,:b,:c,'',-1,0,0,:d,:e,:f)''', {
'a': self.user.user_id, 'b': self.song.song_id, 'c': self.song.difficulty, 'd': self.stamina_multiply, 'e': self.fragment_multiply, 'f': self.prog_boost_multiply, 't': self.song_token})
self.c.execute('''insert into songplay_token values(:t,:a,:b,:c,'',-1,0,0,:d,:e,:f,:g)''', {
'a': self.user.user_id, 'b': self.song.song_id, 'c': self.song.difficulty, 'd': self.stamina_multiply, 'e': self.fragment_multiply, 'f': self.prog_boost_multiply, 'g': self.beyond_boost_gauge_usage, 't': self.song_token})
self.user.select_user_about_current_map()
self.user.current_map.select_map_info()
@@ -334,8 +340,8 @@ class UserPlay(UserScore):
self.course_play.score = 0
self.course_play.clear_type = 3 # 设置为PM即最大值
self.c.execute('''insert into songplay_token values(?,?,?,?,?,?,?,?,1,100,0)''', (self.song_token, self.user.user_id, self.song.song_id,
self.song.difficulty, self.course_play.course_id, self.course_play_state, self.course_play.score, self.course_play.clear_type))
self.c.execute('''insert into songplay_token values(?,?,?,?,?,?,?,?,1,100,0,0)''', (self.song_token, self.user.user_id, self.song.song_id,
self.song.difficulty, self.course_play.course_id, self.course_play_state, self.course_play.score, self.course_play.clear_type))
self.user.select_user_about_stamina()
if use_course_skip_purchase:
x = ItemCore(self.c)

View File

@@ -363,8 +363,10 @@ class DatabaseMigrator:
if db1_pk != db2_pk:
return False
sql2.insert_many(table_name, [], sql1.select(
table_name, list(filter(lambda x: x in db2_name, db1_name))), insert_type='replace')
public_column = list(filter(lambda x: x in db2_name, db1_name))
sql2.insert_many(table_name, public_column, sql1.select(
table_name, public_column), insert_type='replace')
return True
@@ -381,13 +383,6 @@ class DatabaseMigrator:
c.executemany('''insert into user_char_full values(?,?,?,?,?,?)''', [
(j[0], i[0], i[1], exp, i[2], 0) for j in y])
@staticmethod
def update_user_epilogue(c) -> None:
'''给用户添加epilogue包'''
c.execute('''select user_id from user''')
Sql(c).insert_many('user_item', [], [(i[0], 'epilogue', 'pack', 1)
for i in c.fetchall()], insert_type='ignore')
def update_database(self) -> None:
'''
将c1数据库不存在数据加入或覆盖到c2数据库上
@@ -402,7 +397,6 @@ class DatabaseMigrator:
self.update_one_table(c1, c2, 'character')
self.update_user_char_full(c2) # 更新user_char_full
self.update_user_epilogue(c2) # 更新user的epilogue
class MemoryDatabase:

View File

@@ -305,7 +305,8 @@ class UserInfo(User):
self.recent_score = Score()
self.favorite_character = None
self.max_stamina_notification_enabled = False
self.prog_boost = 0
self.prog_boost: int = 0
self.beyond_boost_gauge: float = 0
self.next_fragstam_ts: int = None
self.world_mode_locked_end_ts: int = None
@@ -496,6 +497,7 @@ class UserInfo(User):
"is_skill_sealed": self.is_skill_sealed,
"current_map": self.current_map.map_id,
"prog_boost": self.prog_boost,
"beyond_boost_gauge": self.beyond_boost_gauge,
"next_fragstam_ts": self.next_fragstam_ts,
"max_stamina_ts": self.stamina.max_stamina_ts,
"stamina": self.stamina.stamina,
@@ -553,6 +555,7 @@ class UserInfo(User):
self.stamina = UserStamina(self.c, self)
self.stamina.set_value(x[32], x[33])
self.world_mode_locked_end_ts = x[34] if x[34] else -1
self.beyond_boost_gauge = x[35] if x[35] else 0
return self
@@ -610,7 +613,7 @@ class UserInfo(User):
查询user表有关世界模式打歌的信息
'''
self.c.execute(
'''select character_id, max_stamina_ts, stamina, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, current_map, world_mode_locked_end_ts from user where user_id=?''', (self.user_id,))
'''select character_id, max_stamina_ts, stamina, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, current_map, world_mode_locked_end_ts, beyond_boost_gauge from user where user_id=?''', (self.user_id,))
x = self.c.fetchone()
if not x:
raise NoData('No user.', 108, -3)
@@ -623,6 +626,7 @@ class UserInfo(User):
self.character.is_uncapped_override = x[5] == 1
self.current_map = UserMap(self.c, x[6], self)
self.world_mode_locked_end_ts = x[7] if x[7] else -1
self.beyond_boost_gauge = x[8] if x[8] else 0
@property
def global_rank(self) -> int:

View File

@@ -117,6 +117,7 @@ class Map:
self.require_localunlock_songid: str = None
self.require_localunlock_challengeid: str = None
self.chain_info: dict = None
@property
def rewards(self) -> list:
@@ -140,7 +141,7 @@ class Map:
def to_dict(self) -> dict:
if self.chapter is None:
self.select_map_info()
return {
r = {
'map_id': self.map_id,
'is_legacy': self.is_legacy,
'is_beyond': self.is_beyond,
@@ -162,6 +163,9 @@ class Map:
'require_localunlock_challengeid': self.require_localunlock_challengeid,
'steps': [s.to_dict() for s in self.steps],
}
if self.chain_info is not None:
r['chain_info'] = self.chain_info
return r
def from_dict(self, raw_dict: dict) -> 'Map':
self.is_legacy = raw_dict.get('is_legacy')
@@ -179,8 +183,11 @@ class Map:
self.coordinate = raw_dict.get('coordinate')
self.custom_bg = raw_dict.get('custom_bg', '')
self.stamina_cost = raw_dict.get('stamina_cost')
self.require_localunlock_songid = raw_dict.get('require_localunlock_songid', '')
self.require_localunlock_challengeid = raw_dict.get('require_localunlock_challengeid', '')
self.require_localunlock_songid = raw_dict.get(
'require_localunlock_songid', '')
self.require_localunlock_challengeid = raw_dict.get(
'require_localunlock_challengeid', '')
self.chain_info = raw_dict.get('chain_info')
self.steps = [Step().from_dict(s) for s in raw_dict.get('steps')]
return self
@@ -301,7 +308,7 @@ class UserMap(Map):
if self.require_type in ['pack', 'single']:
item = ItemFactory(self.c).get_item(self.require_type)
item.item_id = self.require_id
item.select(self.user)
item.select_user_item(self.user)
if not item.amount:
self.is_locked = True
@@ -447,8 +454,9 @@ class WorldPlay:
self.step_value: float = None
self.prog_tempest: float = None
self.overdrive_extra: float = None
self.character_bonus_progress: float = None
self.prog_skill_increase: float = None
self.over_skill_increase: float = None
def to_dict(self) -> dict:
arcmap: 'UserMap' = self.user.current_map
@@ -476,14 +484,17 @@ class WorldPlay:
},
"current_stamina": self.user.stamina.stamina,
"max_stamina_ts": self.user.stamina.max_stamina_ts,
'world_mode_locked_end_ts': self.user.world_mode_locked_end_ts
'world_mode_locked_end_ts': self.user.world_mode_locked_end_ts,
'beyond_boost_gauge': self.user.beyond_boost_gauge
}
if self.overdrive_extra is not None:
r['char_stats']['overdrive'] += self.overdrive_extra
if self.over_skill_increase is not None:
r['char_stats']['over_skill_increase'] = self.over_skill_increase
if self.prog_skill_increase is not None:
r['char_stats']['prog_skill_increase'] = self.prog_skill_increase
if self.prog_tempest is not None:
r['char_stats']['prog'] += self.prog_tempest
r['char_stats']['prog'] += self.prog_tempest # 没试过要不要这样
r['char_stats']['prog_tempest'] = self.prog_tempest
if self.character_bonus_progress is not None:
@@ -503,16 +514,45 @@ class WorldPlay:
r['fragment_multiply'] = self.user_play.fragment_multiply
if self.user_play.prog_boost_multiply != 0:
r['prog_boost_multiply'] = self.user_play.prog_boost_multiply
if self.user_play.beyond_boost_gauge_usage != 0:
r['beyond_boost_gauge_usage'] = self.user_play.beyond_boost_gauge_usage
return r
@property
def beyond_boost_gauge_addition(self) -> float:
# guessed by Lost-MSth
return 2.45 * self.user_play.rating ** 0.5 + 27
@property
def step_times(self) -> float:
return self.user_play.stamina_multiply * self.user_play.fragment_multiply / 100 * (self.user_play.prog_boost_multiply+100) / 100
prog_boost_multiply = self.user_play.prog_boost_multiply + 100
beyond_boost_times = 1
if self.user_play.beyond_gauge == 1:
if prog_boost_multiply > 100:
prog_boost_multiply -= 100
if self.user_play.beyond_boost_gauge_usage == 100:
beyond_boost_times = 2
elif self.user_play.beyond_boost_gauge_usage == 200:
beyond_boost_times = 3
return self.user_play.stamina_multiply * self.user_play.fragment_multiply / 100 * prog_boost_multiply / 100 * beyond_boost_times
@property
def exp_times(self) -> float:
return self.user_play.stamina_multiply * (self.user_play.prog_boost_multiply+100) / 100
prog_boost_multiply = self.user_play.prog_boost_multiply + 100
beyond_boost_times = 1
if self.user_play.beyond_gauge == 1:
if prog_boost_multiply > 100:
prog_boost_multiply -= 100
if self.user_play.beyond_boost_gauge_usage == 100:
beyond_boost_times = 2
elif self.user_play.beyond_boost_gauge_usage == 200:
beyond_boost_times = 3
return self.user_play.stamina_multiply * prog_boost_multiply / 100 * beyond_boost_times
def get_step(self) -> None:
if self.user_play.beyond_gauge == 0:
@@ -521,6 +561,8 @@ class WorldPlay:
self.character_used.level)
if self.prog_tempest:
prog += self.prog_tempest
if self.prog_skill_increase:
prog += self.prog_skill_increase
self.step_value = self.base_step_value * prog / 50 * self.step_times
else:
@@ -539,8 +581,8 @@ class WorldPlay:
overdrive = self.character_used.overdrive.get_value(
self.character_used.level)
if self.overdrive_extra:
overdrive += self.overdrive_extra
if self.over_skill_increase:
overdrive += self.over_skill_increase
self.step_value = self.base_step_value * overdrive / \
50 * self.step_times * affinity_multiplier
@@ -553,6 +595,20 @@ class WorldPlay:
self.user_play.clear_play_state()
self.user.select_user_about_world_play()
if self.user_play.beyond_gauge == 0:
# 更新byd大招蓄力条
self.user.beyond_boost_gauge += self.beyond_boost_gauge_addition
if self.user.beyond_boost_gauge > 200:
self.user.beyond_boost_gauge = 200
self.user.update_user_one_column(
'beyond_boost_gauge', self.user.beyond_boost_gauge)
elif self.user_play.beyond_boost_gauge_usage != 0 and self.user_play.beyond_boost_gauge_usage <= self.user.beyond_boost_gauge:
self.user.beyond_boost_gauge -= self.user_play.beyond_boost_gauge_usage
if abs(self.user.beyond_boost_gauge) <= 1e-5:
self.user.beyond_boost_gauge = 0
self.user.update_user_one_column(
'beyond_boost_gauge', self.user.beyond_boost_gauge)
self.character_used = Character()
self.user.character.select_character_info()
@@ -598,9 +654,13 @@ class WorldPlay:
if self.user_play.beyond_gauge == 0:
if self.character_used.character_id == 35 and self.character_used.skill_id_displayed:
self._special_tempest()
elif self.character_used.skill_id_displayed == 'ilith_awakened_skill':
self._ilith_awakened_skill()
else:
if self.character_used.skill_id_displayed == 'skill_vita':
self._skill_vita()
if self.character_used.skill_id_displayed == 'skill_mika':
self._skill_mika()
def after_climb(self) -> None:
factory_dict = {'eto_uncap': self._eto_uncap, 'ayu_uncap': self._ayu_uncap,
@@ -627,9 +687,9 @@ class WorldPlay:
vita技能overdrive随回忆率提升提升量最多为10\
此处采用线性函数
'''
self.overdrive_extra = 0
self.over_skill_increase = 0
if 0 < self.user_play.health <= 100:
self.overdrive_extra = self.user_play.health / 10
self.over_skill_increase = self.user_play.health / 10
def _eto_uncap(self) -> None:
'''eto觉醒技能获得残片奖励时世界模式进度加7'''
@@ -688,3 +748,20 @@ class WorldPlay:
self.character_bonus_progress = -self.step_value / 2 / self.step_times
self.step_value = self.step_value / 2
self.user.current_map.reclimb(self.step_value)
def _ilith_awakened_skill(self) -> None:
'''
ilith 觉醒技能,曲目通关时步数+6wiki 说是 prog 值+6
'''
if self.user_play.health > 0:
self.prog_skill_increase = 6
def _skill_mika(self) -> None:
'''
mika 技能,通关特定曲目能力值翻倍
'''
if self.user_play.song.song_id in Constant.SKILL_MIKA_SONGS and self.user_play.clear_type != 0:
self.over_skill_increase = self.character_used.overdrive.get_value(
self.character_used.level)
self.prog_skill_increase = self.character_used.prog.get_value(
self.character_used.level)

View File

@@ -1,45 +1,45 @@
class InitData:
char = ['hikari', 'tairitsu', 'kou', 'sapphire', 'lethe', 'hikari&tairitsu(reunion)', 'Tairitsu(Axium)', 'Tairitsu(Grievous Lady)', 'stella', 'Hikari & Fisica', 'ilith', 'eto', 'luna', 'shirabe', 'Hikari(Zero)', 'Hikari(Fracture)', 'Hikari(Summer)', 'Tairitsu(Summer)', 'Tairitsu & Trin',
'ayu', 'Eto & Luna', 'yume', 'Seine & Hikari', 'saya', 'Tairitsu & Chuni Penguin', 'Chuni Penguin', 'haruna', 'nono', 'MTA-XXX', 'MDA-21', 'kanae', 'Hikari(Fantasia)', 'Tairitsu(Sonata)', 'sia', 'DORO*C', 'Tairitsu(Tempest)', 'brillante', 'Ilith(Summer)', 'etude', 'Alice & Tenniel', 'Luna & Mia', 'areus', 'seele', 'isabelle', 'mir', 'lagrange', 'linka', 'nami', 'Saya & Elizabeth', 'lily', 'kanae(midsummer)', 'alice&tenniel(minuet)', 'tairitsu(elegy)', 'marija', 'vita', 'hikari(fatalis)', 'saki', 'setsuna', 'amane', 'kou(winter)', 'lagrange(aria)', 'lethe(apophenia)']
'ayu', 'Eto & Luna', 'yume', 'Seine & Hikari', 'saya', 'Tairitsu & Chuni Penguin', 'Chuni Penguin', 'haruna', 'nono', 'MTA-XXX', 'MDA-21', 'kanae', 'Hikari(Fantasia)', 'Tairitsu(Sonata)', 'sia', 'DORO*C', 'Tairitsu(Tempest)', 'brillante', 'Ilith(Summer)', 'etude', 'Alice & Tenniel', 'Luna & Mia', 'areus', 'seele', 'isabelle', 'mir', 'lagrange', 'linka', 'nami', 'Saya & Elizabeth', 'lily', 'kanae(midsummer)', 'alice&tenniel(minuet)', 'tairitsu(elegy)', 'marija', 'vita', 'hikari(fatalis)', 'saki', 'setsuna', 'amane', 'kou(winter)', 'lagrange(aria)', 'lethe(apophenia)', 'shama(UNiVERSE)', 'milk(UNiVERSE)', 'shikoku', 'mika yurisaki']
skill_id = ['gauge_easy', '', '', '', 'note_mirror', 'skill_reunion', '', 'gauge_hard', 'frag_plus_10_pack_stellights', 'gauge_easy|frag_plus_15_pst&prs', 'gauge_hard|fail_frag_minus_100', 'frag_plus_5_side_light', 'visual_hide_hp', 'frag_plus_5_side_conflict', 'challenge_fullcombo_0gauge', 'gauge_overflow', 'gauge_easy|note_mirror', 'note_mirror', 'visual_tomato_pack_tonesphere',
'frag_rng_ayu', 'gaugestart_30|gaugegain_70', 'combo_100-frag_1', 'audio_gcemptyhit_pack_groovecoaster', 'gauge_saya', 'gauge_chuni', 'kantandeshou', 'gauge_haruna', 'frags_nono', 'gauge_pandora', 'gauge_regulus', 'omatsuri_daynight', '', '', 'sometimes(note_mirror|frag_plus_5)', 'scoreclear_aa|visual_scoregauge', 'gauge_tempest', 'gauge_hard', 'gauge_ilith_summer', '', 'note_mirror|visual_hide_far', 'frags_ongeki', 'gauge_areus', 'gauge_seele', 'gauge_isabelle', 'gauge_exhaustion', 'skill_lagrange', 'gauge_safe_10', 'frags_nami', 'skill_elizabeth', 'skill_lily', 'skill_kanae_midsummer', '', '', 'visual_ghost_skynotes', 'skill_vita', 'skill_fatalis', 'frags_ongeki_slash', 'frags_ongeki_hard', 'skill_amane', 'skill_kou_winter', '', 'gauge_hard|note_mirror']
'frag_rng_ayu', 'gaugestart_30|gaugegain_70', 'combo_100-frag_1', 'audio_gcemptyhit_pack_groovecoaster', 'gauge_saya', 'gauge_chuni', 'kantandeshou', 'gauge_haruna', 'frags_nono', 'gauge_pandora', 'gauge_regulus', 'omatsuri_daynight', '', '', 'sometimes(note_mirror|frag_plus_5)', 'scoreclear_aa|visual_scoregauge', 'gauge_tempest', 'gauge_hard', 'gauge_ilith_summer', '', 'note_mirror|visual_hide_far', 'frags_ongeki', 'gauge_areus', 'gauge_seele', 'gauge_isabelle', 'gauge_exhaustion', 'skill_lagrange', 'gauge_safe_10', 'frags_nami', 'skill_elizabeth', 'skill_lily', 'skill_kanae_midsummer', '', '', 'visual_ghost_skynotes', 'skill_vita', 'skill_fatalis', 'frags_ongeki_slash', 'frags_ongeki_hard', 'skill_amane', 'skill_kou_winter', '', 'gauge_hard|note_mirror', 'skill_shama', 'skill_milk', 'skill_shikoku', 'skill_mika']
skill_id_uncap = ['', '', 'frags_kou', '', 'visual_ink', '', '', '', '', '', '', 'eto_uncap', 'luna_uncap', 'shirabe_entry_fee',
'', '', '', '', '', 'ayu_uncap', '', 'frags_yume', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
skill_id_uncap = ['', '', 'frags_kou', '', 'visual_ink', '', '', '', '', '', 'ilith_awakened_skill', 'eto_uncap', 'luna_uncap', 'shirabe_entry_fee',
'', '', '', '', '', 'ayu_uncap', '', 'frags_yume', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
skill_unlock_level = [0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 0, 0, 0, 0, 0,
0, 0, 0, 8, 0, 14, 0, 0, 8, 8, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 8, 0, 0, 0, 0, 0]
0, 0, 0, 8, 0, 14, 0, 0, 8, 8, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0]
frag1 = [55, 55, 60, 50, 47, 79, 47, 57, 41, 22, 50, 54, 60, 56, 78, 42, 41, 61, 52, 50, 52, 32,
42, 55, 45, 58, 43, 0.5, 68, 50, 62, 45, 45, 52, 44, 27, 59, 0, 45, 50, 50, 47, 47, 61, 43, 42, 38, 25, 58, 50, 61, 45, 45, 38, 34, 27, 18, 56, 47, 30, 0, 57]
42, 55, 45, 58, 43, 0.5, 68, 50, 62, 45, 45, 52, 44, 27, 59, 0, 45, 50, 50, 47, 47, 61, 43, 42, 38, 25, 58, 50, 61, 45, 45, 38, 34, 27, 18, 56, 47, 30, 45, 57, 55.5, 47, 33, 26]
prog1 = [35, 55, 47, 50, 60, 70, 60, 70, 58, 45, 70, 45, 42, 46, 61, 67, 49, 44, 28, 45, 24, 46, 52,
59, 62, 33, 58, 25, 63, 69, 50, 45, 45, 51, 34, 70, 62, 70, 45, 32, 32, 61, 47, 47, 37, 42, 50, 50, 45, 41, 61, 45, 45, 58, 50, 130, 18, 57, 55, 50, 0, 70]
59, 62, 33, 58, 25, 63, 69, 50, 45, 45, 51, 34, 70, 62, 70, 45, 32, 32, 61, 47, 47, 37, 42, 50, 50, 45, 41, 61, 45, 45, 58, 50, 130, 18, 57, 55, 50, 45, 70, 37.5, 29, 44, 26]
overdrive1 = [35, 55, 25, 50, 47, 70, 72, 57, 41, 7, 10, 32, 65, 31, 61, 53, 31, 47, 38, 12, 39, 18,
48, 65, 45, 55, 44, 25, 46, 44, 33, 45, 45, 37, 25, 27, 50, 20, 45, 63, 21, 47, 61, 47, 65, 80, 38, 30, 49, 15, 34, 45, 45, 38, 67, 120, 44, 33, 55, 50, 0, 57]
48, 65, 45, 55, 44, 25, 46, 44, 33, 45, 45, 37, 25, 27, 50, 20, 45, 63, 21, 47, 61, 47, 65, 80, 38, 30, 49, 15, 34, 45, 45, 38, 67, 120, 44, 33, 55, 50, 45, 57, 31, 29, 65, 26]
frag20 = [78, 80, 90, 75, 70, 79, 70, 79, 65, 40, 50, 80, 90, 82, 0, 61, 67, 92, 85, 50, 86, 52,
65, 85, 67, 88, 64, 0.5, 95, 70, 95, 50, 80, 87, 71, 50, 85, 0, 80, 75, 50, 70, 70, 90, 65, 80, 61, 50, 68, 60, 90, 67, 50, 60, 51, 50, 35, 85, 47, 50]
65, 85, 67, 88, 64, 0.5, 95, 70, 95, 50, 80, 87, 71, 50, 85, 0, 80, 75, 50, 70, 70, 90, 65, 80, 61, 50, 68, 60, 90, 67, 50, 60, 51, 50, 35, 85, 47, 50, 75, 80, 89.5, 50, 50, 51]
prog20 = [61, 80, 70, 75, 90, 70, 90, 102, 84, 78, 105, 67, 63, 68, 0, 99, 80, 66, 46, 83, 40, 73,
80, 90, 93, 50, 86, 78, 89, 98, 75, 80, 50, 64, 55, 100, 90, 110, 80, 50, 74, 90, 70, 70, 56, 80, 79, 55, 65, 59, 90, 50, 90, 90, 75, 210, 35, 86, 92, 80]
80, 90, 93, 50, 86, 78, 89, 98, 75, 80, 50, 64, 55, 100, 90, 110, 80, 50, 74, 90, 70, 70, 56, 80, 79, 55, 65, 59, 90, 50, 90, 90, 75, 210, 35, 86, 92, 80, 75, 100, 60, 50, 68, 51]
overdrive20 = [61, 80, 47, 75, 70, 70, 95, 79, 65, 31, 50, 59, 90, 58, 0, 78, 50, 70, 62, 49, 64,
46, 73, 95, 67, 84, 70, 78, 69, 70, 50, 80, 80, 63, 25, 50, 72, 55, 50, 95, 55, 70, 90, 70, 99, 80, 61, 40, 69, 62, 51, 90, 67, 60, 100, 200, 85, 50, 92, 50]
46, 73, 95, 67, 84, 70, 78, 69, 70, 50, 80, 80, 63, 25, 50, 72, 55, 50, 95, 55, 70, 90, 70, 99, 80, 61, 40, 69, 62, 51, 90, 67, 60, 100, 200, 85, 50, 92, 50, 75, 80, 49.5, 50, 100, 51]
frag30 = [88, 90, 100, 75, 80, 89, 70, 79, 65, 40, 50, 90, 100, 92, 0, 61, 67, 92, 85, 50, 86, 62,
65, 85, 67, 88, 74, 0.5, 105, 80, 95, 50, 80, 87, 71, 50, 95, 0, 80, 75, 50, 70, 80, 100, 65, 80, 61, 50, 68, 60, 90, 67, 50, 60, 51, 50, 35, 85, 47, 50]
65, 85, 67, 88, 74, 0.5, 105, 80, 95, 50, 80, 87, 71, 50, 95, 0, 80, 75, 50, 70, 80, 100, 65, 80, 61, 50, 68, 60, 90, 67, 50, 60, 51, 50, 35, 85, 47, 50, 75, 80, 89.5, 50, 50, 51]
prog30 = [71, 90, 80, 75, 100, 80, 90, 102, 84, 78, 105, 77, 73, 78, 0, 99, 80, 66, 46, 93, 40, 83,
80, 90, 93, 50, 96, 88, 99, 108, 75, 80, 50, 64, 55, 100, 100, 110, 80, 50, 74, 90, 80, 80, 56, 80, 79, 55, 65, 59, 90, 50, 90, 90, 75, 210, 35, 86, 92, 80]
prog30 = [71, 90, 80, 75, 100, 80, 90, 102, 84, 78, 110, 77, 73, 78, 0, 99, 80, 66, 46, 93, 40, 83,
80, 90, 93, 50, 96, 88, 99, 108, 75, 80, 50, 64, 55, 100, 100, 110, 80, 50, 74, 90, 80, 80, 56, 80, 79, 55, 65, 59, 90, 50, 90, 90, 75, 210, 35, 86, 92, 80, 75, 100, 60, 50, 68, 51]
overdrive30 = [71, 90, 57, 75, 80, 80, 95, 79, 65, 31, 50, 69, 100, 68, 0, 78, 50, 70, 62, 59, 64,
56, 73, 95, 67, 84, 80, 88, 79, 80, 50, 80, 80, 63, 25, 50, 82, 55, 50, 95, 55, 70, 100, 80, 99, 80, 61, 40, 69, 62, 51, 90, 67, 60, 100, 200, 85, 50, 92, 50]
56, 73, 95, 67, 84, 80, 88, 79, 80, 50, 80, 80, 63, 25, 50, 82, 55, 50, 95, 55, 70, 100, 80, 99, 80, 61, 40, 69, 62, 51, 90, 67, 60, 100, 200, 85, 50, 92, 50, 75, 80, 49.5, 50, 100, 51]
char_type = [1, 0, 0, 0, 0, 0, 0, 2, 0, 1, 2, 0, 0, 0, 2, 3, 1, 0, 0, 0, 1,
0, 0, 0, 0, 0, 0, 0, 2, 2, 0, 0, 0, 0, 0, 2, 2, 2, 0, 0, 0, 2, 2, 2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 2, 2, 0]
0, 0, 0, 0, 0, 0, 0, 2, 2, 0, 0, 0, 0, 0, 2, 2, 2, 0, 0, 0, 2, 2, 2, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 2, 3, 0, 2, 2, 0, 0, 2, 0, 0, 2, 0]
char_core = {
0: [{'core_id': 'core_hollow', 'amount': 25}, {'core_id': 'core_desolate', 'amount': 5}],
@@ -57,14 +57,15 @@ class InitData:
43: [{'core_id': 'core_chunithm', 'amount': 15}],
11: [{'core_id': 'core_binary', 'amount': 25}, {'core_id': 'core_hollow', 'amount': 5}],
12: [{'core_id': 'core_binary', 'amount': 25}, {'core_id': 'core_desolate', 'amount': 5}],
19: [{'core_id': 'core_colorful', 'amount': 30}]
19: [{'core_id': 'core_colorful', 'amount': 30}],
10: [{'core_id': 'core_umbral', 'amount': 30}], # TODO: check
}
cores = ['core_hollow', 'core_desolate', 'core_chunithm', 'core_crimson',
'core_ambivalent', 'core_scarlet', 'core_groove', 'core_generic', 'core_binary', 'core_colorful', 'core_course_skip_purchase']
'core_ambivalent', 'core_scarlet', 'core_groove', 'core_generic', 'core_binary', 'core_colorful', 'core_course_skip_purchase', 'core_umbral']
world_songs = ["babaroque", "shadesoflight", "kanagawa", "lucifer", "anokumene", "ignotus", "rabbitintheblackroom", "qualia", "redandblue", "bookmaker", "darakunosono", "espebranch", "blacklotus", "givemeanightmare", "vividtheory", "onefr", "gekka", "vexaria3", "infinityheaven3", "fairytale3", "goodtek3", "suomi", "rugie", "faintlight", "harutopia", "goodtek", "dreaminattraction", "syro", "diode", "freefall", "grimheart", "blaster",
"cyberneciacatharsis", "monochromeprincess", "revixy", "vector", "supernova", "nhelv", "purgatorium3", "dement3", "crossover", "guardina", "axiumcrisis", "worldvanquisher", "sheriruth", "pragmatism", "gloryroad", "etherstrike", "corpssansorganes", "lostdesire", "blrink", "essenceoftwilight", "lapis", "solitarydream", "lumia3", "purpleverse", "moonheart3", "glow", "enchantedlove", "take", "lifeispiano", "vandalism", "nexttoyou3", "lostcivilization3", "turbocharger", "bookmaker3", "laqryma3", "kyogenkigo", "hivemind", "seclusion", "quonwacca3", "bluecomet", "energysynergymatrix", "gengaozo", "lastendconductor3", "antithese3", "qualia3", "kanagawa3", "heavensdoor3", "pragmatism3", "nulctrl", "avril", "ddd", "merlin3", "omakeno3", "nekonote", "sanskia", 'altair', 'mukishitsu', 'trapcrow', 'redandblue3', 'ignotus3', 'singularity3', 'dropdead3', 'arcahv', 'freefall3', 'partyvinyl3', 'tsukinimurakumo', 'mantis', 'worldfragments', 'astrawalkthrough', 'chronicle']
"cyberneciacatharsis", "monochromeprincess", "revixy", "vector", "supernova", "nhelv", "purgatorium3", "dement3", "crossover", "guardina", "axiumcrisis", "worldvanquisher", "sheriruth", "pragmatism", "gloryroad", "etherstrike", "corpssansorganes", "lostdesire", "blrink", "essenceoftwilight", "lapis", "solitarydream", "lumia3", "purpleverse", "moonheart3", "glow", "enchantedlove", "take", "lifeispiano", "vandalism", "nexttoyou3", "lostcivilization3", "turbocharger", "bookmaker3", "laqryma3", "kyogenkigo", "hivemind", "seclusion", "quonwacca3", "bluecomet", "energysynergymatrix", "gengaozo", "lastendconductor3", "antithese3", "qualia3", "kanagawa3", "heavensdoor3", "pragmatism3", "nulctrl", "avril", "ddd", "merlin3", "omakeno3", "nekonote", "sanskia", 'altair', 'mukishitsu', 'trapcrow', 'redandblue3', 'ignotus3', 'singularity3', 'dropdead3', 'arcahv', 'freefall3', 'partyvinyl3', 'tsukinimurakumo', 'mantis', 'worldfragments', 'astrawalkthrough', 'chronicle', 'trappola3', 'letsrock']
world_unlocks = ["scenery_chap1", "scenery_chap2",
"scenery_chap3", "scenery_chap4", "scenery_chap5", "scenery_chap6", "scenery_chap7"]

View File

@@ -646,5 +646,23 @@
"price": 700,
"discount_from": 1646784000000,
"discount_to": 1647388799000
},
{
"name": "maimai_append_1",
"items": [
{
"type": "pack",
"id": "maimai_append_1",
"is_available": true
},
{
"type": "core",
"amount": 4,
"id": "core_generic",
"is_available": true
}
],
"orig_price": 400,
"price": 400
}
]

View File

@@ -1306,5 +1306,59 @@
],
"orig_price": 100,
"price": 100
},
{
"name": "hiirogekka",
"items": [
{
"type": "single",
"id": "hiirogekka",
"is_available": true
},
{
"type": "core",
"amount": 1,
"id": "core_generic",
"is_available": true
}
],
"orig_price": 100,
"price": 100
},
{
"name": "manicjeer",
"items": [
{
"type": "single",
"id": "manicjeer",
"is_available": true
},
{
"type": "core",
"amount": 1,
"id": "core_generic",
"is_available": true
}
],
"orig_price": 100,
"price": 100
},
{
"name": "crimsonthrone",
"items": [
{
"type": "single",
"id": "crimsonthrone",
"is_available": true
},
{
"type": "core",
"amount": 1,
"id": "core_generic",
"is_available": true
}
],
"orig_price": 100,
"price": 100
}
]

View File

@@ -33,7 +33,8 @@ ban_flag text,
next_fragstam_ts int,
max_stamina_ts int,
stamina int,
world_mode_locked_end_ts int
world_mode_locked_end_ts int,
beyond_boost_gauge real default 0
);
create table if not exists login(access_token text,
user_id int,
@@ -182,7 +183,8 @@ course_score int,
course_clear_type int,
stamina_multiply int,
fragment_multiply int,
prog_boost_multiply int
prog_boost_multiply int,
beyond_boost_gauge_usage int
);
create table if not exists item(item_id text,
type text,

View File

@@ -15,6 +15,8 @@ class Config:
--------------------------------------------------
'''
DEBUG = False
TIME_LIMIT = 3600000
COMMAND_INTERVAL = 1000000

View File

@@ -1,93 +1,18 @@
import base64
import binascii
import logging
import random
import socketserver
import threading
import time
from os import urandom
# import binascii
from .aes import decrypt, encrypt
from .config import Config
from .udp_class import Player, Room, bi
from .store import Store, TCPRouter, clear_player, clear_room
from .udp_class import bi
from .udp_parser import CommandParser
# token: {'key': key, 'room': Room, 'player_index': player_index, 'player_id': player_id}
link_play_data = {}
room_id_dict = {} # 'room_id': Room
room_code_dict = {} # 'room_code': Room
player_dict = {} # 'player_id' : Player
clean_timer = 0
lock = threading.RLock()
logging.basicConfig(format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
level=logging.INFO)
def random_room_code():
# 随机生成房间号
re = ''
for _ in range(4):
re += chr(random.randint(65, 90))
for _ in range(2):
re += str(random.randint(0, 9))
return re
def unique_random(dataset, length=8, random_func=None):
'''无重复随机且默认非0'''
if random_func is None:
x = bi(urandom(length))
while x in dataset or x == 0:
x = bi(urandom(length))
else:
x = random_func()
while x in dataset:
x = random_func()
return x
def clear_player(token):
# 清除玩家信息和token
del player_dict[link_play_data[token]['player_id']]
del link_play_data[token]
def clear_room(room):
# 清除房间信息
room_id = room.room_id
room_code = room.room_code
del room_id_dict[room_id]
del room_code_dict[room_code]
del room
def memory_clean(now):
# 内存清理
with lock:
clean_room_list = []
clean_player_list = []
for token in link_play_data:
room = link_play_data[token]['room']
if now - room.timestamp >= Config.TIME_LIMIT:
clean_room_list.append(room.room_id)
if now - room.players[link_play_data[token]['player_index']].last_timestamp // 1000 >= Config.TIME_LIMIT:
clean_player_list.append(token)
for room_id in room_id_dict:
if now - room_id_dict[room_id].timestamp >= Config.TIME_LIMIT:
clean_room_list.append(room_id)
for room_id in clean_room_list:
if room_id in room_id_dict:
clear_room(room_id_dict[room_id])
for token in clean_player_list:
clear_player(token)
class UDP_handler(socketserver.BaseRequestHandler):
def handle(self):
client_msg, server = self.request
@@ -96,32 +21,34 @@ class UDP_handler(socketserver.BaseRequestHandler):
iv = client_msg[8:20]
tag = client_msg[20:32]
ciphertext = client_msg[32:]
if int.from_bytes(token, byteorder='little') in link_play_data:
user = link_play_data[bi(token)]
else:
if bi(token) not in Store.link_play_data:
return None
user = Store.link_play_data[bi(token)]
plaintext = decrypt(user['key'], b'', iv, ciphertext, tag)
except Exception as e:
logging.error(e)
return None
# print(binascii.b2a_hex(plaintext))
# if Config.DEBUG:
# logging.info(
# f'UDP-From-{self.client_address[0]}-{binascii.b2a_hex(plaintext)}')
commands = CommandParser(
user['room'], user['player_index']).get_commands(plaintext)
if user['room'].players[user['player_index']].player_id == 0:
clear_player(bi(token))
temp = []
for i in commands:
if i[:3] == b'\x06\x16\x12':
temp.append(i)
commands = temp
if user['room'].player_num == 0:
clear_room(user['room'])
commands = [i for i in commands if i[2] == 0x12]
# 处理不能正确被踢的问题
for i in commands:
iv, ciphertext, tag = encrypt(user['key'], i, b'')
# print(binascii.b2a_hex(i))
# if Config.DEBUG:
# logging.info(
# f'UDP-To-{self.client_address[0]}-{binascii.b2a_hex(i)}')
server.sendto(token + iv + tag[:12] +
ciphertext, self.client_address)
@@ -132,126 +59,18 @@ class TCP_handler(socketserver.StreamRequestHandler):
self.data = self.rfile.readline().strip()
message = self.data.decode('utf-8')
# print(message)
if Config.DEBUG:
logging.info(f'TCP-From-{self.client_address[0]}-{message}')
data = message.split('|')
if data[0] != Config.AUTHENTICATION:
self.wfile.write(b'No authentication')
logging.warning('TCP-%s-No authentication' %
self.client_address[0])
logging.warning(f'TCP-{self.client_address[0]}-No authentication')
return None
global clean_timer
now = round(time.time() * 1000)
if now - clean_timer >= Config.TIME_LIMIT:
logging.info('Start cleaning memory...')
clean_timer = now
memory_clean(now)
self.wfile.write(data_swap(data[1:]).encode('utf-8'))
def data_swap(data: list) -> str:
# data: list[str] = [command, ...]
if data[0] == '1':
# 开房
# data = ['1', name, song_unlock, ]
# song_unlock: base64 str
name = data[1]
song_unlock = base64.b64decode(data[2])
key = urandom(16)
with lock:
room_id = unique_random(room_id_dict)
room = Room()
room.room_id = room_id
room_id_dict[room_id] = room
player_id = unique_random(player_dict, 3)
player = Player()
player.player_id = player_id
player.set_player_name(name)
player_dict[player_id] = player
player.song_unlock = song_unlock
room.song_unlock = song_unlock
room.host_id = player_id
room.players[0] = player
room.player_num = 1
room_code = unique_random(
room_code_dict, random_func=random_room_code)
room.room_code = room_code
room_code_dict[room_code] = room
token = room_id
player.token = token
link_play_data[token] = {'key': key,
'room': room,
'player_index': 0,
'player_id': player_id}
logging.info('TCP-Create room `%s` by player `%s`' % (room_code, name))
return '|'.join([str(x) for x in (0, room_code, room_id, token, base64.b64encode(key).decode('utf-8'), player_id)])
elif data[0] == '2':
# 入房
# data = ['2', name, song_unlock, room_code]
# song_unlock: base64 str
room_code = data[3].upper()
with lock:
if room_code not in room_code_dict:
# 房间号错误
return '1202'
room = room_code_dict[room_code]
if room.player_num == 4:
# 满人
return '1201'
elif room.state != 2:
# 无法加入
return '1205'
name = data[1]
song_unlock = base64.b64decode(data[2])
key = urandom(16)
with lock:
token = unique_random(link_play_data)
player_id = unique_random(player_dict, 3)
player = Player()
player.player_id = player_id
player.set_player_name(name)
player.token = token
player_dict[player_id] = player
player.song_unlock = song_unlock
room.update_song_unlock()
for i in range(4):
if room.players[i].player_id == 0:
room.players[i] = player
player_index = i
break
link_play_data[token] = {'key': key,
'room': room,
'player_index': player_index,
'player_id': player_id}
logging.info('TCP-Player `%s` joins room `%s`' % (name, room_code))
return '|'.join([str(x) for x in (0, room_code, room.room_id, token, base64.b64encode(key).decode('utf-8'), player_id, base64.b64encode(room.song_unlock).decode('utf-8'))])
elif data[0] == '3':
# 房间信息更新
# data = ['3', token]
token = int(data[1])
with lock:
if token in link_play_data:
r = link_play_data[token]
logging.info('TCP-Room `%s` info update' % room_code)
return '|'.join([str(x) for x in (0, r['room'].room_code, r['room'].room_id, base64.b64encode(r['key']).decode('utf-8'), r['room'].players[r['player_index']].player_id, base64.b64encode(r['room'].song_unlock).decode('utf-8'))])
else:
return '108'
r = TCPRouter(data[1:]).handle()
if Config.DEBUG:
logging.info(f'TCP-To-{self.client_address[0]}-{r}')
self.wfile.write(r.encode('utf-8'))
def link_play(ip: str = Config.HOST, udp_port: int = Config.UDP_PORT, tcp_port: int = Config.TCP_PORT):

View File

@@ -0,0 +1,231 @@
import logging
from base64 import b64decode, b64encode
from os import urandom
from random import randint
from threading import RLock
from time import time
from .config import Config
from .udp_class import Player, Room, bi
class Store:
# token: {'key': key, 'room': Room, 'player_index': player_index, 'player_id': player_id}
link_play_data = {}
room_id_dict = {} # 'room_id': Room
room_code_dict = {} # 'room_code': Room
player_dict = {} # 'player_id' : Player
lock = RLock()
def random_room_code():
re = ''
for _ in range(4):
re += chr(randint(65, 90))
for _ in range(2):
re += str(randint(0, 9))
return re
def unique_random(dataset, length=8, random_func=None):
'''无重复随机且默认非0没处理可能的死循环'''
if random_func is None:
x = bi(urandom(length))
while x in dataset or x == 0:
x = bi(urandom(length))
else:
x = random_func()
while x in dataset:
x = random_func()
return x
def clear_player(token):
# 清除玩家信息和token
del Store.player_dict[Store.link_play_data[token]['player_id']]
del Store.link_play_data[token]
def clear_room(room):
# 清除房间信息
room_id = room.room_id
room_code = room.room_code
del Store.room_id_dict[room_id]
del Store.room_code_dict[room_code]
del room
def memory_clean(now):
# 内存清理,应对玩家不正常退出
with Store.lock:
clean_room_list = []
clean_player_list = []
for token in Store.link_play_data:
room = Store.link_play_data[token]['room']
if now - room.timestamp >= Config.TIME_LIMIT:
clean_room_list.append(room.room_id)
if now - room.players[Store.link_play_data[token]['player_index']].last_timestamp // 1000 >= Config.TIME_LIMIT:
clean_player_list.append(token)
for room_id in Store.room_id_dict:
if now - Store.room_id_dict[room_id].timestamp >= Config.TIME_LIMIT:
clean_room_list.append(room_id)
for room_id in clean_room_list:
if room_id in Store.room_id_dict:
clear_room(Store.room_id_dict[room_id])
for token in clean_player_list:
clear_player(token)
class TCPRouter:
clean_timer = 0
router = {
'0': 'debug',
'1': 'create_room',
'2': 'join_room',
'3': 'update_room',
}
def __init__(self, data: list):
self.data = data # data: list[str] = [command, ...]
def debug(self):
if Config.DEBUG:
return eval(self.data[1])
return 'ok'
@staticmethod
def clean_check():
now = round(time() * 1000)
if now - TCPRouter.clean_timer >= Config.TIME_LIMIT:
logging.info('Start cleaning memory...')
TCPRouter.clean_timer = now
memory_clean(now)
def handle(self) -> str:
self.clean_check()
if self.data[0] not in self.router:
return None
r = getattr(self, self.router[self.data[0]])()
if isinstance(r, tuple):
return '|'.join(map(str, r))
return str(r)
@staticmethod
def generate_player(name: str) -> Player:
player_id = unique_random(Store.player_dict, 3)
player = Player()
player.player_id = player_id
player.set_player_name(name)
Store.player_dict[player_id] = player
return player
@staticmethod
def generate_room() -> Room:
room_id = unique_random(Store.room_id_dict)
room = Room()
room.room_id = room_id
room.timestamp = round(time() * 1000)
Store.room_id_dict[room_id] = room
room_code = unique_random(
Store.room_code_dict, random_func=random_room_code)
room.room_code = room_code
Store.room_code_dict[room_code] = room
return room
def create_room(self) -> tuple:
# 开房
# data = ['1', name, song_unlock, ]
# song_unlock: base64 str
name = self.data[1]
song_unlock = b64decode(self.data[2])
key = urandom(16)
with Store.lock:
room = self.generate_room()
player = self.generate_player(name)
player.song_unlock = song_unlock
room.song_unlock = song_unlock
room.host_id = player.player_id
room.players[0] = player
token = room.room_id
player.token = token
Store.link_play_data[token] = {
'key': key,
'room': room,
'player_index': 0,
'player_id': player.player_id
}
logging.info(f'TCP-Create room `{room.room_code}` by player `{name}`')
return (0, room.room_code, room.room_id, token, b64encode(key).decode('utf-8'), player.player_id)
def join_room(self) -> tuple:
# 入房
# data = ['2', name, song_unlock, room_code]
# song_unlock: base64 str
room_code = self.data[3].upper()
key = urandom(16)
name = self.data[1]
song_unlock = b64decode(self.data[2])
with Store.lock:
if room_code not in Store.room_code_dict:
# 房间号错误 / 房间不存在
return '1202'
room: Room = Store.room_code_dict[room_code]
player_num = room.player_num
if player_num == 4:
# 满人
return '1201'
elif player_num == 0:
# 房间不存在
return '1202'
elif room.state != 2:
# 无法加入
return '1205'
token = unique_random(Store.link_play_data)
player = self.generate_player(name)
player.token = token
player.song_unlock = song_unlock
room.update_song_unlock()
for i in range(4):
if room.players[i].player_id == 0:
room.players[i] = player
player_index = i
break
Store.link_play_data[token] = {
'key': key,
'room': room,
'player_index': player_index,
'player_id': player.player_id
}
logging.info(f'TCP-Player `{name}` joins room `{room_code}`')
return (0, room_code, room.room_id, token, b64encode(key).decode('utf-8'), player.player_id, b64encode(room.song_unlock).decode('utf-8'))
def update_room(self) -> tuple:
# 房间信息更新
# data = ['3', token]
token = int(self.data[1])
with Store.lock:
if token not in Store.link_play_data:
return '108'
r = Store.link_play_data[token]
room = r['room']
logging.info(f'TCP-Room `{room.room_code}` info update')
return (0, room.room_code, room.room_id, b64encode(r['key']).decode('utf-8'), room.players[r['player_index']].player_id, b64encode(room.song_unlock).decode('utf-8'))

View File

@@ -1,3 +1,5 @@
from time import time
from .config import Config
@@ -61,11 +63,10 @@ class Room:
self.song_idx = 0xffff
self.last_song_idx = 0xffff
self.song_unlock = b'\x00' * Config.LINK_PLAY_UNLOCK_LENGTH
self.song_unlock = b'\xFF' * Config.LINK_PLAY_UNLOCK_LENGTH
self.host_id = 0
self.players = [Player(), Player(), Player(), Player()]
self.player_num = 0
self.interval = 1000
self.times = 100
@@ -73,7 +74,33 @@ class Room:
self.round_switch = 0
self.command_queue = []
self.command_queue_length = 0
@property
def command_queue_length(self) -> int:
return len(self.command_queue)
@property
def player_num(self) -> int:
self.check_player_online()
return sum(i.player_id != 0 for i in self.players)
def check_player_online(self, now: int = None):
# 检测玩家是否被自动踢出房间 / 离线判断
now = round(time() * 1000000) if now is None else now
flag = False
player_index_list = []
for i, x in enumerate(self.players):
if x.player_id == 0 or x.last_timestamp == 0:
continue
if now - x.last_timestamp >= Config.PLAYER_TIMEOUT:
self.delete_player(i)
flag = True
player_index_list.append(i)
elif x.online == 1 and now - x.last_timestamp >= Config.PLAYER_PRE_TIMEOUT:
x.online = 0
player_index_list.append(i)
return flag, player_index_list
def get_players_info(self):
# 获取所有玩家信息
@@ -113,7 +140,6 @@ class Room:
def delete_player(self, player_index: int):
# 删除某个玩家
self.player_num -= 1
if self.players[player_index].player_id == self.host_id:
self.make_round()

View File

@@ -6,31 +6,23 @@ from .udp_sender import CommandSender
class CommandParser:
route = [None, 'command_01', 'command_02', 'command_03', 'command_04', 'command_05',
'command_06', 'command_07', 'command_08', 'command_09', 'command_0a', 'command_0b']
def __init__(self, room: Room, player_index: int = 0) -> None:
self.room = room
self.player_index = player_index
self.s = CommandSender(self.room)
def get_commands(self, command):
self.command = command
l = {b'\x06\x16\x01': self.command_01,
b'\x06\x16\x02': self.command_02,
b'\x06\x16\x03': self.command_03,
b'\x06\x16\x04': self.command_04,
b'\x06\x16\x05': self.command_05,
b'\x06\x16\x06': self.command_06,
b'\x06\x16\x07': self.command_07,
b'\x06\x16\x08': self.command_08,
b'\x06\x16\x09': self.command_09,
b'\x06\x16\x0a': self.command_0a,
b'\x06\x16\x0b': self.command_0b
}
r = l[command[:3]]()
r = getattr(self, self.route[self.command[2]])()
re = []
flag_13 = False
for i in range(max(bi(self.command[12:16]), self.room.players[self.player_index].start_command_num), self.room.command_queue_length):
if self.room.command_queue[i][:3] == b'\x06\x16\x13':
if self.room.command_queue[i][2] == 0x13:
if flag_13:
break
flag_13 = True
@@ -52,16 +44,13 @@ class CommandParser:
if i.player_id == player_id and i.online == 1:
self.room.host_id = player_id
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_10())
self.s.random_code = self.command[16:24]
self.room.command_queue.append(self.s.command_10())
return None
def command_02(self):
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
song_idx = bi(self.command[24:26])
flag = 2
@@ -69,17 +58,14 @@ class CommandParser:
flag = 0
self.room.state = 3
self.room.song_idx = song_idx
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_11())
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.room.command_queue.append(self.s.command_11())
self.room.command_queue.append(self.s.command_13())
return [x.command_0d(flag)]
return [self.s.command_0d(flag)]
def command_03(self):
# 尝试进入结算
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
player = self.room.players[self.player_index]
player.score = bi(self.command[24:28])
player.cleartype = self.command[28]
@@ -89,20 +75,17 @@ class CommandParser:
player.last_timestamp -= Config.COMMAND_INTERVAL
self.room.last_song_idx = self.room.song_idx
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(self.player_index))
self.room.command_queue.append(self.s.command_12(self.player_index))
if self.room.is_finish():
self.room.make_finish()
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.room.command_queue.append(self.s.command_13())
return None
def command_04(self):
# 踢人
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
player_id = bi(self.command[24:32])
flag = 2
if self.room.players[self.player_index].player_id == self.room.host_id and player_id != self.room.host_id:
@@ -110,51 +93,42 @@ class CommandParser:
if self.room.players[i].player_id == player_id:
flag = 1
self.room.delete_player(i)
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(i))
self.room.command_queue.append(self.s.command_12(i))
self.room.update_song_unlock()
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_14())
self.room.command_queue.append(self.s.command_14())
break
return [x.command_0d(flag)]
return [self.s.command_0d(flag)]
def command_05(self):
pass
def command_06(self):
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
self.room.state = 1
self.room.song_idx = 0xffff
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.room.command_queue.append(self.s.command_13())
return None
def command_07(self):
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
self.room.players[self.player_index].song_unlock = self.command[24:536]
self.room.update_song_unlock()
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_14())
self.room.command_queue.append(self.s.command_14())
return None
def command_08(self):
self.room.round_switch = bi(self.command[24:25])
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.s.random_code = self.command[16:24]
self.room.command_queue.append(self.s.command_13())
return None
def command_09(self):
re = []
x = CommandSender(self.room)
x.random_code = self.command[16:24]
self.s.random_code = self.command[16:24]
player = self.room.players[self.player_index]
if bi(self.command[12:16]) == 0:
@@ -162,29 +136,17 @@ class CommandParser:
self.room.state = 1
self.room.update_song_unlock()
player.start_command_num = self.room.command_queue_length
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_15())
self.room.command_queue.append(self.s.command_15())
else:
if x.timestamp - player.last_timestamp >= Config.COMMAND_INTERVAL:
re.append(x.command_0c())
player.last_timestamp = x.timestamp
if self.s.timestamp - player.last_timestamp >= Config.COMMAND_INTERVAL:
re.append(self.s.command_0c())
player.last_timestamp = self.s.timestamp
flag_13 = False
# 离线判断
for i in range(4):
if i != self.player_index:
t = self.room.players[i]
if t.player_id != 0:
if t.last_timestamp != 0:
if t.online == 1 and x.timestamp - t.last_timestamp >= Config.PLAYER_PRE_TIMEOUT:
t.online = 0
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(i))
elif t.online == 0 and x.timestamp - t.last_timestamp >= Config.PLAYER_TIMEOUT:
self.room.delete_player(i)
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(i))
flag_13 = True
flag_13, player_index_list = self.room.check_player_online(
self.s.timestamp)
for i in player_index_list:
self.room.command_queue.append(self.s.command_12(i))
flag_11 = False
flag_12 = False
@@ -276,7 +238,7 @@ class CommandParser:
if player.timer != 0 or self.room.state != 8:
for i in self.room.players:
i.extra_command_queue.append(
x.command_0e(self.player_index))
self.s.command_0e(self.player_index))
if self.room.is_ready(8, 1):
flag_13 = True
@@ -293,14 +255,12 @@ class CommandParser:
flag_13 = True
if flag_11:
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_11())
self.room.command_queue.append(self.s.command_11())
if flag_12:
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(self.player_index))
self.room.command_queue.append(
self.s.command_12(self.player_index))
if flag_13:
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.room.command_queue.append(self.s.command_13())
return re
@@ -308,28 +268,22 @@ class CommandParser:
# 退出房间
self.room.delete_player(self.player_index)
x = CommandSender(self.room)
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_12(self.player_index))
self.room.command_queue.append(self.s.command_12(self.player_index))
if self.room.state == 3 or self.room.state == 2:
self.room.state = 1
self.room.song_idx = 0xffff
# self.room.command_queue_length += 1
# self.room.command_queue.append(x.command_11())
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_13())
self.room.command_queue_length += 1
self.room.command_queue.append(x.command_14())
# self.room.command_queue.append(self.s.command_11())
self.room.command_queue.append(self.s.command_13())
self.room.command_queue.append(self.s.command_14())
return None
def command_0b(self):
# 推荐歌曲
song_idx = bi(self.command[16:18])
x = CommandSender(self.room)
for i in range(4):
if self.player_index != i and self.room.players[i].online == 1:
self.room.players[i].extra_command_queue.append(
x.command_0f(self.player_index, song_idx))
self.s.command_0f(self.player_index, song_idx))
return None

View File

@@ -1,46 +1,64 @@
import time
from time import time
from .udp_class import Room, b
class CommandSender:
def __init__(self, room: Room = Room()) -> None:
PROTOCOL_NAME = b'\x06\x16'
PROTOCOL_VERSION = b'\x09'
def __init__(self, room: Room = None) -> None:
self.room = room
self.timestamp = round(time.time() * 1000000)
self.timestamp = round(time() * 1000000)
self.random_code = b'\x11\x11\x11\x11\x00\x00\x00\x00'
@staticmethod
def command_encode(t: tuple):
r = b''.join(t)
x = 16 - len(r) % 16
return r + b(x) * x
def command_prefix(self, command: bytes):
length = self.room.command_queue_length
if command >= b'\x10':
length += 1
return (self.PROTOCOL_NAME, command, self.PROTOCOL_VERSION, b(self.room.room_id, 8), b(length, 4))
def command_0c(self):
return b'\x06\x16\x0c\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + b(self.room.state) + b(self.room.countdown, 4) + b(self.timestamp, 8) + b'\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b\x0b'
return self.command_encode((*self.command_prefix(b'\x0c'), self.random_code, b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8)))
def command_0d(self, code: int):
return b'\x06\x16\x0d\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + b(code) + b'\x07\x07\x07\x07\x07\x07\x07'
return self.command_encode((*self.command_prefix(b'\x0d'), self.random_code, b(code)))
def command_0e(self, player_index: int):
# 分数广播
player = self.room.players[player_index]
return b'\x06\x16\x0e\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + b(player.player_id, 8) + b(player.character_id) + b(player.is_uncapped) + b(player.difficulty) + b(player.score, 4) + b(player.timer, 4) + b(player.cleartype) + b(player.player_state) + b(player.download_percent) + b'\x01' + b(player.last_score, 4) + b(player.last_timer, 4) + b(player.online)
return self.command_encode((*self.command_prefix(b'\x0e'), b(player.player_id, 8), b(player.character_id), b(player.is_uncapped), b(player.difficulty), b(player.score, 4), b(player.timer, 4), b(player.cleartype), b(player.player_state), b(player.download_percent), b'\x01', b(player.last_score, 4), b(player.last_timer, 4), b(player.online)))
def command_0f(self, player_index: int, song_idx: int):
# 歌曲推荐
player = self.room.players[player_index]
return b'\x06\x16\x0f\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + b(player.player_id, 8) + b(song_idx, 2) + b'\x06\x06\x06\x06\x06\x06'
return self.command_encode((*self.command_prefix(b'\x0f'), b(player.player_id, 8), b(song_idx, 2)))
def command_10(self):
# 房主宣告
return b'\x06\x16\x10\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + b(self.room.host_id, 8)
return self.command_encode((*self.command_prefix(b'\x10'), self.random_code, b(self.room.host_id, 8)))
def command_11(self):
return b'\x06\x16\x11\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + self.room.get_players_info() + b'\x08\x08\x08\x08\x08\x08\x08\x08'
return self.command_encode((*self.command_prefix(b'\x11'), self.random_code, self.room.get_players_info()))
def command_12(self, player_index: int):
player = self.room.players[player_index]
return b'\x06\x16\x12\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + b(player_index) + b(player.player_id, 8) + b(player.character_id) + b(player.is_uncapped) + b(player.difficulty) + b(player.score, 4) + b(player.timer, 4) + b(player.cleartype) + b(player.player_state) + b(player.download_percent) + b(player.online)
return self.command_encode((*self.command_prefix(b'\x12'), self.random_code, b(player_index), b(player.player_id, 8), b(player.character_id), b(player.is_uncapped), b(player.difficulty), b(player.score, 4), b(player.timer, 4), b(player.cleartype), b(player.player_state), b(player.download_percent), b(player.online)))
def command_13(self):
return b'\x06\x16\x13\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + b(self.room.host_id, 8) + b(self.room.state) + b(self.room.countdown, 4) + b(self.timestamp, 8) + b(self.room.song_idx, 2) + b(self.room.interval, 2) + b(self.room.times, 7) + self.room.get_player_last_score() + b(self.room.last_song_idx, 2) + b(self.room.round_switch, 1) + b'\x01'
return self.command_encode((*self.command_prefix(b'\x13'), self.random_code, b(self.room.host_id, 8), b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8), b(self.room.song_idx, 2), b(self.room.interval, 2), b(self.room.times, 7), self.room.get_player_last_score(), b(self.room.last_song_idx, 2), b(self.room.round_switch, 1)))
def command_14(self):
return b'\x06\x16\x14\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.random_code + self.room.song_unlock + b'\x08\x08\x08\x08\x08\x08\x08\x08'
return self.command_encode((*self.command_prefix(b'\x14'), self.random_code, self.room.song_unlock))
def command_15(self):
return b'\x06\x16\x15\x09' + b(self.room.room_id, 8) + b(self.room.command_queue_length, 4) + self.room.get_players_info() + self.room.song_unlock + b(self.room.host_id, 8) + b(self.room.state) + b(self.room.countdown, 4) + b(self.timestamp, 8) + b(self.room.song_idx, 2) + b(self.room.interval, 2) + b(self.room.times, 7) + self.room.get_player_last_score() + b(self.room.last_song_idx, 2) + b(self.room.round_switch, 1) + b'\x09\x09\x09\x09\x09\x09\x09\x09\x09'
return self.command_encode((*self.command_prefix(b'\x15'), self.room.get_players_info(), self.room.song_unlock, b(self.room.host_id, 8), b(self.room.state), b(self.room.countdown, 4), b(self.timestamp, 8), b(self.room.song_idx, 2), b(self.room.interval, 2), b(self.room.times, 7), self.room.get_player_last_score(), b(self.room.last_song_idx, 2), b(self.room.round_switch, 1)))

View File

@@ -1,13 +1,12 @@
import base64
from functools import wraps
from core.config_manager import Config
from core.error import ArcError, NoAccess
from core.sql import Connect
from core.user import UserAuth, UserLogin
from flask import Blueprint, g, jsonify, request
from flask import Blueprint, g, jsonify, request, current_app
from .func import arc_try, error_return
from .func import arc_try, error_return, header_check
bp = Blueprint('auth', __name__, url_prefix='/auth')
@@ -16,9 +15,9 @@ bp = Blueprint('auth', __name__, url_prefix='/auth')
@arc_try
def login():
headers = request.headers
if Config.ALLOW_APPVERSION: # 版本检查
if 'AppVersion' not in headers or headers['AppVersion'] not in Config.ALLOW_APPVERSION:
raise NoAccess('Invalid app version.', 1203)
e = header_check(request)
if e is not None:
raise e
request.form['grant_type']
with Connect() as c:
@@ -44,9 +43,11 @@ def auth_required(request):
headers = request.headers
if Config.ALLOW_APPVERSION: # 版本检查
if 'AppVersion' not in headers or headers['AppVersion'] not in Config.ALLOW_APPVERSION:
return error_return(NoAccess('Invalid app version.', 1203))
e = header_check(request)
if e is not None:
current_app.logger.warning(
f' - {e.error_code}|{e.api_error_code}: {e}')
return error_return(e)
with Connect() as c:
try:

View File

@@ -19,7 +19,7 @@ def course_me(user_id):
user = UserOnline(c, user_id)
core = ItemCore(c)
core.item_id = 'core_course_skip_purchase'
core.select(user)
core.select_user_item(user)
x = UserCourseList(c, user)
x.select_all()
return success_return({

View File

@@ -1,10 +1,18 @@
from functools import wraps
from traceback import format_exc
from core.config_manager import Config
from core.error import ArcError
from flask import current_app, g, jsonify
from core.config_manager import Config
from core.error import ArcError, NoAccess
has_arc_hash = False
try:
from core.arc_crypto import ArcHashChecker # type: ignore
has_arc_hash = True
except ModuleNotFoundError:
pass
default_error = ArcError('Unknown Error', status=500)
@@ -89,3 +97,16 @@ def arc_try(view):
return error_return(e)
return wrapped_view
def header_check(request) -> ArcError:
'''检查请求头是否合法'''
headers = request.headers
if Config.ALLOW_APPVERSION: # 版本检查
if 'AppVersion' not in headers or headers['AppVersion'] not in Config.ALLOW_APPVERSION:
return NoAccess('Invalid app version', 1203)
if has_arc_hash and not ArcHashChecker(request).check():
return NoAccess('Invalid request')
return None

View File

@@ -24,18 +24,16 @@ def score_token():
@arc_try
def score_token_world(user_id):
stamina_multiply = int(
request.args['stamina_multiply']) if 'stamina_multiply' in request.args else 1
fragment_multiply = int(
request.args['fragment_multiply']) if 'fragment_multiply' in request.args else 100
prog_boost_multiply = int(
request.args['prog_boost_multiply']) if 'prog_boost_multiply' in request.args else 0
stamina_multiply = int(request.args.get('stamina_multiply', 1))
fragment_multiply = int(request.args.get('fragment_multiply', 100))
prog_boost_multiply = int(request.args.get('prog_boost_multiply', 0))
beyond_boost_gauge_use = int(request.args.get('beyond_boost_gauge_use', 0))
with Connect() as c:
x = UserPlay(c, UserOnline(c, user_id))
x.song.set_chart(request.args['song_id'], int(
request.args['difficulty']))
x.set_play_state_for_world(stamina_multiply,
fragment_multiply, prog_boost_multiply)
x.set_play_state_for_world(
stamina_multiply, fragment_multiply, prog_boost_multiply, beyond_boost_gauge_use)
return success_return({
"stamina": x.user.stamina.stamina,
"max_stamina_ts": x.user.stamina.max_stamina_ts,

View File

@@ -8,7 +8,7 @@ from core.user import User, UserLogin, UserOnline, UserRegister
from flask import Blueprint, request
from .auth import auth_required
from .func import arc_try, success_return
from .func import arc_try, header_check, success_return
bp = Blueprint('user', __name__, url_prefix='/user')
@@ -17,9 +17,9 @@ bp = Blueprint('user', __name__, url_prefix='/user')
@arc_try
def register():
headers = request.headers
if Config.ALLOW_APPVERSION: # 版本检查
if 'AppVersion' not in headers or headers['AppVersion'] not in Config.ALLOW_APPVERSION:
raise NoAccess('Invalid app version.', 1203)
error = header_check(request)
if error is not None:
raise error
with Connect() as c:
new_user = UserRegister(c)