change something

This commit is contained in:
Lost-MSth
2020-10-18 17:49:33 +08:00
parent 3ecefcb802
commit bb7ba249e9
45 changed files with 0 additions and 20182 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,141 +0,0 @@
import sqlite3
import hashlib
import time
def arc_login(name: str, password: str) -> str: # 登录判断
# 查询数据库中的user表验证账号密码返回并记录token
# token采用user_id和时间戳连接后hash生成真的是瞎想的没用bear
# 密码和token的加密方式为 SHA-256
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute('''select user_id from user where name = :name and password = :password''', {
'name': name, 'password': hash_pwd})
x = c.fetchone()
if x is not None:
user_id = str(x[0])
now = int(time.time() * 1000)
token = hashlib.sha256((user_id + str(now)).encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from login where user_id = :user_id)''', {"user_id": user_id})
if c.fetchone() == (1,): # 删掉多余token
c.execute('''delete from login where user_id = :user_id''',
{'user_id': user_id})
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
conn.commit()
conn.close()
return token
conn.commit()
conn.close()
return None
def arc_register(name: str, password: str): # 注册
# 账号注册只记录hash密码和用户名生成user_id和user_code自动登录返回token
# token和密码的处理同登录部分
def build_user_code(c):
# 生成9位的user_code用的自然是随机
import random
flag = True
while flag:
user_code = ''.join([str(random.randint(0, 9)) for i in range(9)])
c.execute('''select exists(select * from user where user_code = :user_code)''',
{'user_code': user_code})
if c.fetchone() == (0,):
flag = False
return user_code
def build_user_id(c):
# 生成user_id往后加1
c.execute('''select max(user_id) from user''')
x = c.fetchone()
if x[0] is not None:
return x[0] + 1
else:
return 2000001
def insert_user_char(c, user_id):
# 为用户添加所有可用角色
for i in range(0, 38):
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36]:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
c.execute(sql)
else:
if i != 5:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
c.execute(sql)
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
token = hashlib.sha256(
(str(user_id) + str(now)).encode("utf8")).hexdigest()
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
insert_user_char(c, user_id)
conn.commit()
conn.close()
return user_id, token, 0
else:
conn.commit()
conn.close()
return None, None, 101
def token_get_id(token: str):
# 用token获取id没有考虑不同用户token相同情况说不定会有bug
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from login where access_token = :token''', {
'token': token})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None
def code_get_id(user_code):
# 用user_code获取id
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from user where user_code = :a''',
{'a': user_code})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None

Binary file not shown.

Binary file not shown.

View File

@@ -1,204 +0,0 @@
import sqlite3
import hashlib
import time
# 数据库初始化文件删掉arcaea_database.db文件后运行即可谨慎使用
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
c.execute('''create table if not exists user(user_id int primary key,
name text unique,
password text not null,
join_date char(20),
user_code char(10),
rating_ptt int,
character_id int,
is_skill_sealed int,
is_char_uncapped int,
is_char_uncapped_override int,
is_hide_rating int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
clear_type int,
rating real,
favorite_character int,
max_stamina_notification_enabled int
);''')
c.execute('''create table if not exists login(access_token text,
user_id int,
last_login_time int,
last_login_ip text,
last_login_device text
);''')
c.execute('''create table if not exists friend(user_id_me int,
user_id_other int,
primary key (user_id_me, user_id_other)
);''')
c.execute('''create table if not exists best_score(user_id int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
best_clear_type int,
clear_type int,
rating real,
primary key(user_id, song_id, difficulty)
);''')
c.execute('''create table if not exists user_char(user_id int,
character_id int,
level int,
exp real,
level_exp real,
frag int,
prog int,
overdrive int,
skill_id text,
skill_unlock_level int,
skill_requires_uncap int,
skill_id_uncap text,
char_type int,
is_uncapped int,
is_uncapped_override int,
primary key(user_id, character_id)
);''')
c.execute('''create table if not exists character(character_id int primary key,
name text
);''')
c.execute('''create table if not exists recent30(user_id int primary key,
r0 real,
song_id0 text,
r1 real,
song_id1 text,
r2 real,
song_id2 text,
r3 real,
song_id3 text,
r4 real,
song_id4 text,
r5 real,
song_id5 text,
r6 real,
song_id6 text,
r7 real,
song_id7 text,
r8 real,
song_id8 text,
r9 real,
song_id9 text,
r10 real,
song_id10 text,
r11 real,
song_id11 text,
r12 real,
song_id12 text,
r13 real,
song_id13 text,
r14 real,
song_id14 text,
r15 real,
song_id15 text,
r16 real,
song_id16 text,
r17 real,
song_id17 text,
r18 real,
song_id18 text,
r19 real,
song_id19 text,
r20 real,
song_id20 text,
r21 real,
song_id21 text,
r22 real,
song_id22 text,
r23 real,
song_id23 text,
r24 real,
song_id24 text,
r25 real,
song_id25 text,
r26 real,
song_id26 text,
r27 real,
song_id27 text,
r28 real,
song_id28 text,
r29 real,
song_id29 text
);''')
char = ['Hikari','Tairitsu','Kou','Sapphire','Lethe','','Tairitsu(Axium)'
,'Tairitsu(Grievous Lady)','Stella','Hikari & Fisica','Ilith','Eto','Luna'
,'Shirabe','Hikari(Zero)','Hikari(Fracture)','Hikari(Summer)','Tairitsu(Summer)'
,'Tairitsu&Trin','Ayu','Eto&Luna','Yume','Seine & Hikari','Saya','Tairitsu & Chuni Penguin'
,'Chuni Penguin','Haruna','Nono','MTA-XXX','MDA-21','Kanae','Hikari(Fantasia)','Tairitsu(Sonata)','Sia','DORO*C'
,'Tairitsu(Tempest)','Brillante','Ilith(Summer)']
for i in range(0, 38):
c.execute("insert into character values("+str(i)+",'"+char[i]+"');")
conn.commit()
conn.close()
def arc_register(name: str, password: str):
def build_user_code(c):
return '123456789'
def build_user_id(c):
return 2000000
def insert_user_char(c, user_id):
for i in range(0, 38):
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36]:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
c.execute(sql)
else:
if i != 5:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
c.execute(sql)
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 1250, 1, 0, 1, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
c.execute('''insert into best_score values(2000000,'vexaria',3,10000000,100,0,0,0,100,0,1599667200000,3,3,10.8)''')
insert_user_char(c, user_id)
conn.commit()
conn.close()
return None
else:
conn.commit()
conn.close()
return None
arc_register('admin', 'admin123')

File diff suppressed because it is too large Load Diff

View File

@@ -1,418 +0,0 @@
from flask import Flask, request, jsonify, make_response
import configparser
import base64
import auth
import info
import setme
import arcscore
app = Flask(__name__)
wsgi_app = app.wsgi_app
def error_return(error_code): # 错误返回
# 100 无法在此ip地址下登录游戏
# 101 用户名占用
# 102 电子邮箱已注册
# 103 已有一个账号由此设备创建
# 104 用户名密码错误
# 105 24小时内登入两台设备
# 106 账户冻结
# 107 你没有足够的体力
# 401 用户不存在
# 403 无法连接至服务器
# 501 502 此物品目前无法获取
# 504 无效的序列码
# 505 此序列码已被使用
# 506 你已拥有了此物品
# 601 好友列表已满
# 602 此用户已是好友
# 604 你不能加自己为好友
# 其它 发生未知错误
return jsonify({
"success": False,
"error_code": error_code
})
@app.route('/')
def hello():
return "Hello World!"
@app.route('/coffee/12/auth/login', methods=['POST']) # 登录接口
def login():
headers = request.headers
id_pwd = headers['Authorization']
id_pwd = base64.b64decode(id_pwd[6:]).decode()
name, password = id_pwd.split(':', 1)
try:
token = auth.arc_login(name, password)
if token is not None:
r = {"success": True, "token_type": "Bearer"}
r['access_token'] = token
return jsonify(r)
else:
return error_return(104) # 用户名或密码错误
except:
return error_return(108)
@app.route('/coffee/12/user/', methods=['POST']) # 注册接口
def register():
name = request.form['name']
password = request.form['password']
try:
user_id, token, error_code = auth.arc_register(name, password)
if user_id is not None:
r = {"success": True, "value": {
'user_id': user_id, 'access_token': token}}
return jsonify(r)
else:
return error_return(error_code) # 应该是101用户名被占用毕竟电子邮箱、设备号没记录
except:
return error_return(108)
@app.route('/coffee/12/compose/aggregate', methods=['GET']) # 用户信息获取
def aggregate():
calls = request.args.get('calls')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
if calls == '[{ "endpoint": "/user/me", "id": 0 }]': # 极其沙雕的判断我猜get的参数就两种
r = info.arc_aggregate_small(user_id)
else:
r = info.arc_aggregate_big(user_id)
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/character', methods=['POST']) # 角色切换
def character_change():
headers = request.headers
token = headers['Authorization']
token = token[7:]
character_id = request.form['character']
skill_sealed = request.form['skill_sealed']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
flag = setme.change_char(user_id, character_id, skill_sealed)
if flag:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": character_id
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/<path:path>/toggle_uncap', methods=['POST']) # 角色觉醒切换
def character_uncap(path):
character_id = int(path[22:])
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = setme.change_char_uncap(user_id, character_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": [r]
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/add', methods=['POST']) # 加好友
def add_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_code = request.form['friend_code']
try:
user_id = auth.token_get_id(token)
friend_id = auth.code_get_id(friend_code)
if user_id is not None and friend_id is not None:
r = setme.arc_add_friend(user_id, friend_id)
if r is not None and r != 602 and r != 604:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
if r is not None:
return error_return(r)
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/delete', methods=['POST']) # 删好友
def delete_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_id = int(request.form['friend_id'])
try:
user_id = auth.token_get_id(token)
if user_id is not None and friend_id is not None:
r = setme.arc_delete_friend(user_id, friend_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/friend', methods=['GET']) # 好友排名默认最多50
def song_score_friend():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_friend(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/me', methods=['GET']) # 我的排名默认最多20
def song_score_me():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_me(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['GET']) # TOP20
def song_score_top():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_top(song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['POST']) # 成绩上传
def song_score_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
song_id = request.form['song_id']
difficulty = int(request.form['difficulty'])
score = int(request.form['score'])
shiny_perfect_count = int(request.form['shiny_perfect_count'])
perfect_count = int(request.form['perfect_count'])
near_count = int(request.form['near_count'])
miss_count = int(request.form['miss_count'])
health = int(request.form['health'])
modifier = int(request.form['modifier'])
beyond_gauge = int(request.form['beyond_gauge'])
clear_type = int(request.form['clear_type'])
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_post(user_id, song_id, difficulty, score, shiny_perfect_count,
perfect_count, near_count, miss_count, health, modifier, beyond_gauge, clear_type)
if r is not None:
return jsonify({
"success": True,
"value": {"user_rating": r}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/token', methods=['GET']) # 成绩上传所需的token显然我不想验证
def score_token():
return jsonify({
"success": True,
"value": {
"token": "1145141919810"
}
})
@app.route('/coffee/12/user/me/save', methods=['GET']) # 从云端同步
def cloud_get():
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_all_get(user_id)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/save', methods=['POST']) # 向云端同步
def cloud_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
scores_data = request.form['scores_data']
clearlamps_data = request.form['clearlamps_data']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
arcscore.arc_all_post(user_id, scores_data, clearlamps_data)
return jsonify({
"success": True,
"value": {
"user_id": user_id
}
})
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/purchase/me/redeem', methods=['POST']) # 兑换码,自然没有用
def redeem():
return error_return(504)
@app.route('/coffee/<path:path>', methods=['POST']) # 三个设置,写在最后降低优先级
def sys_set(path):
set_arg = path[10:]
headers = request.headers
token = headers['Authorization']
token = token[7:]
value = request.form['value']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
setme.arc_sys_set(user_id, value, set_arg)
r = info.arc_aggregate_small(user_id)
r['value'] = r['value'][0]['value']
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
def main():
config = configparser.ConfigParser()
path = r'setting.ini'
config.read(path, encoding="utf-8")
HOST = config.get('CONFIG','HOST')
PORT = config.get('CONFIG','PORT')
app.run(HOST, PORT)
if __name__ == '__main__':
main()
## Made By Lost 2020.9.11

View File

@@ -1 +0,0 @@
python main.py

View File

@@ -1,155 +0,0 @@
import sqlite3
import info
def b2int(x):
# int与布尔值转换
if x:
return 1
else:
return 0
def int2b(x):
# int与布尔值转换
if x is None or x == 0:
return False
else:
return True
def change_char(user_id, character_id, skill_sealed):
# 角色改变,包括技能封印的改变,返回成功与否的布尔值
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
if x is not None:
if skill_sealed == 'false':
skill_sealed = False
else:
skill_sealed = True
c.execute('''update user set is_skill_sealed = :a, character_id = :b, is_char_uncapped = :c, is_char_uncapped_override = :d where user_id = :e''', {
'a': b2int(skill_sealed), 'b': character_id, 'c': x[0], 'd': x[1], 'e': user_id})
conn.commit()
conn.close()
return True
conn.commit()
conn.close()
return False
def change_char_uncap(user_id, character_id):
# 角色觉醒改变,返回字典
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
r = None
if x is not None and x[0] == 1:
c.execute('''update user set is_char_uncapped_override = :a where user_id = :b''', {
'a': b2int(x[1] == 0), 'b': user_id})
c.execute('''update user_char set is_uncapped_override = :a where user_id = :b and character_id = :c''', {
'a': b2int(x[1] == 0), 'b': user_id, 'c': character_id})
c.execute('''select * from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
y = c.fetchone()
c.execute(
'''select name from character where character_id = :x''', {'x': y[1]})
z = c.fetchone()
if z is not None:
char_name = z[0]
if y is not None:
r = {
"is_uncapped_override": int2b(y[14]),
"is_uncapped": int2b(y[13]),
"uncap_cores": [],
"char_type": y[12],
"skill_id_uncap": y[11],
"skill_requires_uncap": int2b(y[10]),
"skill_unlock_level": y[9],
"skill_id": y[8],
"overdrive": y[7],
"prog": y[6],
"frag": y[5],
"level_exp": y[4],
"exp": y[3],
"level": y[2],
"name": char_name,
"character_id": y[1]
}
conn.commit()
conn.close()
return r
def arc_sys_set(user_id, value, set_arg):
# 三个设置PTT隐藏、体力满通知、最爱角色无返回
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
if 'favorite_character' in set_arg:
value = int(value)
c.execute('''update user set favorite_character = :a where user_id = :b''', {
'a': value, 'b': user_id})
else:
if value == 'false':
value = False
else:
value = True
if 'is_hide_rating' in set_arg:
c.execute('''update user set is_hide_rating = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
if 'max_stamina_notification_enabled' in set_arg:
c.execute('''update user set max_stamina_notification_enabled = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
conn.commit()
conn.close()
return None
def arc_add_friend(user_id, friend_id):
# 加好友返回好友列表或者是错误码602、604
if user_id == friend_id:
return 604
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (0,):
c.execute('''insert into friend values(:a, :b)''',
{'a': user_id, 'b': friend_id})
r = info.get_user_friend(c, user_id)
else:
return 602
conn.commit()
conn.close()
return r
def arc_delete_friend(user_id, friend_id):
# 删好友,返回好友列表
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (1,):
c.execute('''delete from friend where user_id_me = :x and user_id_other = :y''',
{'x': user_id, 'y': friend_id})
r = info.get_user_friend(c, user_id)
conn.commit()
conn.close()
return r

View File

@@ -1,3 +0,0 @@
[CONFIG]
HOST = 192.168.1.112
PORT = 80

File diff suppressed because it is too large Load Diff

View File

@@ -1,141 +0,0 @@
import sqlite3
import hashlib
import time
def arc_login(name: str, password: str) -> str: # 登录判断
# 查询数据库中的user表验证账号密码返回并记录token
# token采用user_id和时间戳连接后hash生成真的是瞎想的没用bear
# 密码和token的加密方式为 SHA-256
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute('''select user_id from user where name = :name and password = :password''', {
'name': name, 'password': hash_pwd})
x = c.fetchone()
if x is not None:
user_id = str(x[0])
now = int(time.time() * 1000)
token = hashlib.sha256((user_id + str(now)).encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from login where user_id = :user_id)''', {"user_id": user_id})
if c.fetchone() == (1,): # 删掉多余token
c.execute('''delete from login where user_id = :user_id''',
{'user_id': user_id})
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
conn.commit()
conn.close()
return token
conn.commit()
conn.close()
return None
def arc_register(name: str, password: str): # 注册
# 账号注册只记录hash密码和用户名生成user_id和user_code自动登录返回token
# token和密码的处理同登录部分
def build_user_code(c):
# 生成9位的user_code用的自然是随机
import random
flag = True
while flag:
user_code = ''.join([str(random.randint(0, 9)) for i in range(9)])
c.execute('''select exists(select * from user where user_code = :user_code)''',
{'user_code': user_code})
if c.fetchone() == (0,):
flag = False
return user_code
def build_user_id(c):
# 生成user_id往后加1
c.execute('''select max(user_id) from user''')
x = c.fetchone()
if x[0] is not None:
return x[0] + 1
else:
return 2000001
def insert_user_char(c, user_id):
# 为用户添加所有可用角色
for i in range(0, 38):
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21]:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
c.execute(sql)
else:
if i != 5:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
c.execute(sql)
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
token = hashlib.sha256(
(str(user_id) + str(now)).encode("utf8")).hexdigest()
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
insert_user_char(c, user_id)
conn.commit()
conn.close()
return user_id, token, 0
else:
conn.commit()
conn.close()
return None, None, 101
def token_get_id(token: str):
# 用token获取id没有考虑不同用户token相同情况说不定会有bug
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from login where access_token = :token''', {
'token': token})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None
def code_get_id(user_code):
# 用user_code获取id
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from user where user_code = :a''',
{'a': user_code})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None

Binary file not shown.

Binary file not shown.

View File

@@ -1,204 +0,0 @@
import sqlite3
import hashlib
import time
# 数据库初始化文件删掉arcaea_database.db文件后运行即可谨慎使用
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
c.execute('''create table if not exists user(user_id int primary key,
name text unique,
password text not null,
join_date char(20),
user_code char(10),
rating_ptt int,
character_id int,
is_skill_sealed int,
is_char_uncapped int,
is_char_uncapped_override int,
is_hide_rating int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
clear_type int,
rating real,
favorite_character int,
max_stamina_notification_enabled int
);''')
c.execute('''create table if not exists login(access_token text,
user_id int,
last_login_time int,
last_login_ip text,
last_login_device text
);''')
c.execute('''create table if not exists friend(user_id_me int,
user_id_other int,
primary key (user_id_me, user_id_other)
);''')
c.execute('''create table if not exists best_score(user_id int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
best_clear_type int,
clear_type int,
rating real,
primary key(user_id, song_id, difficulty)
);''')
c.execute('''create table if not exists user_char(user_id int,
character_id int,
level int,
exp real,
level_exp real,
frag int,
prog int,
overdrive int,
skill_id text,
skill_unlock_level int,
skill_requires_uncap int,
skill_id_uncap text,
char_type int,
is_uncapped int,
is_uncapped_override int,
primary key(user_id, character_id)
);''')
c.execute('''create table if not exists character(character_id int primary key,
name text
);''')
c.execute('''create table if not exists recent30(user_id int primary key,
r0 real,
song_id0 text,
r1 real,
song_id1 text,
r2 real,
song_id2 text,
r3 real,
song_id3 text,
r4 real,
song_id4 text,
r5 real,
song_id5 text,
r6 real,
song_id6 text,
r7 real,
song_id7 text,
r8 real,
song_id8 text,
r9 real,
song_id9 text,
r10 real,
song_id10 text,
r11 real,
song_id11 text,
r12 real,
song_id12 text,
r13 real,
song_id13 text,
r14 real,
song_id14 text,
r15 real,
song_id15 text,
r16 real,
song_id16 text,
r17 real,
song_id17 text,
r18 real,
song_id18 text,
r19 real,
song_id19 text,
r20 real,
song_id20 text,
r21 real,
song_id21 text,
r22 real,
song_id22 text,
r23 real,
song_id23 text,
r24 real,
song_id24 text,
r25 real,
song_id25 text,
r26 real,
song_id26 text,
r27 real,
song_id27 text,
r28 real,
song_id28 text,
r29 real,
song_id29 text
);''')
char = ['Hikari','Tairitsu','Kou','Sapphire','Lethe','','Tairitsu(Axium)'
,'Tairitsu(Grievous Lady)','Stella','Hikari & Fisica','Ilith','Eto','Luna'
,'Shirabe','Hikari(Zero)','Hikari(Fracture)','Hikari(Summer)','Tairitsu(Summer)'
,'Tairitsu&Trin','Ayu','Eto&Luna','Yume','Seine & Hikari','Saya','Tairitsu & Chuni Penguin'
,'Chuni Penguin','Haruna','Nono','MTA-XXX','MDA-21','Kanae','Hikari(Fantasia)','Tairitsu(Sonata)','Sia','DORO*C'
,'Tairitsu(Tempest)','Brillante','Ilith(Summer)']
for i in range(0, 38):
c.execute("insert into character values("+str(i)+",'"+char[i]+"');")
conn.commit()
conn.close()
def arc_register(name: str, password: str):
def build_user_code(c):
return '123456789'
def build_user_id(c):
return 2000000
def insert_user_char(c, user_id):
for i in range(0, 38):
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21]:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
c.execute(sql)
else:
if i != 5:
sql = 'insert into user_char values('+str(user_id)+','+str(
i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
c.execute(sql)
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 1250, 1, 0, 1, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
c.execute('''insert into best_score values(2000000,'vexaria',3,10000000,100,0,0,0,100,0,1599667200000,3,3,10.8)''')
insert_user_char(c, user_id)
conn.commit()
conn.close()
return None
else:
conn.commit()
conn.close()
return None
arc_register('admin', 'admin123')

File diff suppressed because it is too large Load Diff

View File

@@ -1,418 +0,0 @@
from flask import Flask, request, jsonify, make_response
import configparser
import base64
import auth
import info
import setme
import arcscore
app = Flask(__name__)
wsgi_app = app.wsgi_app
def error_return(error_code): # 错误返回
# 100 无法在此ip地址下登录游戏
# 101 用户名占用
# 102 电子邮箱已注册
# 103 已有一个账号由此设备创建
# 104 用户名密码错误
# 105 24小时内登入两台设备
# 106 账户冻结
# 107 你没有足够的体力
# 401 用户不存在
# 403 无法连接至服务器
# 501 502 此物品目前无法获取
# 504 无效的序列码
# 505 此序列码已被使用
# 506 你已拥有了此物品
# 601 好友列表已满
# 602 此用户已是好友
# 604 你不能加自己为好友
# 其它 发生未知错误
return jsonify({
"success": False,
"error_code": error_code
})
@app.route('/')
def hello():
return "Hello World!"
@app.route('/coffee/12/auth/login', methods=['POST']) # 登录接口
def login():
headers = request.headers
id_pwd = headers['Authorization']
id_pwd = base64.b64decode(id_pwd[6:]).decode()
name, password = id_pwd.split(':', 1)
try:
token = auth.arc_login(name, password)
if token is not None:
r = {"success": True, "token_type": "Bearer"}
r['access_token'] = token
return jsonify(r)
else:
return error_return(104) # 用户名或密码错误
except:
return error_return(108)
@app.route('/coffee/12/user/', methods=['POST']) # 注册接口
def register():
name = request.form['name']
password = request.form['password']
try:
user_id, token, error_code = auth.arc_register(name, password)
if user_id is not None:
r = {"success": True, "value": {
'user_id': user_id, 'access_token': token}}
return jsonify(r)
else:
return error_return(error_code) # 应该是101用户名被占用毕竟电子邮箱、设备号没记录
except:
return error_return(108)
@app.route('/coffee/12/compose/aggregate', methods=['GET']) # 用户信息获取
def aggregate():
calls = request.args.get('calls')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
if calls == '[{ "endpoint": "/user/me", "id": 0 }]': # 极其沙雕的判断我猜get的参数就两种
r = info.arc_aggregate_small(user_id)
else:
r = info.arc_aggregate_big(user_id)
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/character', methods=['POST']) # 角色切换
def character_change():
headers = request.headers
token = headers['Authorization']
token = token[7:]
character_id = request.form['character']
skill_sealed = request.form['skill_sealed']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
flag = setme.change_char(user_id, character_id, skill_sealed)
if flag:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": character_id
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/<path:path>/toggle_uncap', methods=['POST']) # 角色觉醒切换
def character_uncap(path):
character_id = int(path[22:])
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = setme.change_char_uncap(user_id, character_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": [r]
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/add', methods=['POST']) # 加好友
def add_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_code = request.form['friend_code']
try:
user_id = auth.token_get_id(token)
friend_id = auth.code_get_id(friend_code)
if user_id is not None and friend_id is not None:
r = setme.arc_add_friend(user_id, friend_id)
if r is not None and r != 602 and r != 604:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
if r is not None:
return error_return(r)
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/delete', methods=['POST']) # 删好友
def delete_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_id = int(request.form['friend_id'])
try:
user_id = auth.token_get_id(token)
if user_id is not None and friend_id is not None:
r = setme.arc_delete_friend(user_id, friend_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/friend', methods=['GET']) # 好友排名默认最多50
def song_score_friend():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_friend(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/me', methods=['GET']) # 我的排名默认最多20
def song_score_me():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_me(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['GET']) # TOP20
def song_score_top():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_top(song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['POST']) # 成绩上传
def song_score_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
song_id = request.form['song_id']
difficulty = int(request.form['difficulty'])
score = int(request.form['score'])
shiny_perfect_count = int(request.form['shiny_perfect_count'])
perfect_count = int(request.form['perfect_count'])
near_count = int(request.form['near_count'])
miss_count = int(request.form['miss_count'])
health = int(request.form['health'])
modifier = int(request.form['modifier'])
beyond_gauge = int(request.form['beyond_gauge'])
clear_type = int(request.form['clear_type'])
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_score_post(user_id, song_id, difficulty, score, shiny_perfect_count,
perfect_count, near_count, miss_count, health, modifier, beyond_gauge, clear_type)
if r is not None:
return jsonify({
"success": True,
"value": {"user_rating": r}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/token', methods=['GET']) # 成绩上传所需的token显然我不想验证
def score_token():
return jsonify({
"success": True,
"value": {
"token": "1145141919810"
}
})
@app.route('/coffee/12/user/me/save', methods=['GET']) # 从云端同步
def cloud_get():
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = auth.token_get_id(token)
if user_id is not None:
r = arcscore.arc_all_get(user_id)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/save', methods=['POST']) # 向云端同步
def cloud_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
scores_data = request.form['scores_data']
clearlamps_data = request.form['clearlamps_data']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
arcscore.arc_all_post(user_id, scores_data, clearlamps_data)
return jsonify({
"success": True,
"value": {
"user_id": user_id
}
})
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/purchase/me/redeem', methods=['POST']) # 兑换码,自然没有用
def redeem():
return error_return(504)
@app.route('/coffee/<path:path>', methods=['POST']) # 三个设置,写在最后降低优先级
def sys_set(path):
set_arg = path[10:]
headers = request.headers
token = headers['Authorization']
token = token[7:]
value = request.form['value']
try:
user_id = auth.token_get_id(token)
if user_id is not None:
setme.arc_sys_set(user_id, value, set_arg)
r = info.arc_aggregate_small(user_id)
r['value'] = r['value'][0]['value']
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
def main():
config = configparser.ConfigParser()
path = r'setting.ini'
config.read(path, encoding="utf-8")
HOST = config.get('CONFIG','HOST')
PORT = config.get('CONFIG','PORT')
app.run(HOST, PORT)
if __name__ == '__main__':
main()
## Made By Lost 2020.9.11

View File

@@ -1 +0,0 @@
python main.py

View File

@@ -1,155 +0,0 @@
import sqlite3
import info
def b2int(x):
# int与布尔值转换
if x:
return 1
else:
return 0
def int2b(x):
# int与布尔值转换
if x is None or x == 0:
return False
else:
return True
def change_char(user_id, character_id, skill_sealed):
# 角色改变,包括技能封印的改变,返回成功与否的布尔值
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
if x is not None:
if skill_sealed == 'false':
skill_sealed = False
else:
skill_sealed = True
c.execute('''update user set is_skill_sealed = :a, character_id = :b, is_char_uncapped = :c, is_char_uncapped_override = :d where user_id = :e''', {
'a': b2int(skill_sealed), 'b': character_id, 'c': x[0], 'd': x[1], 'e': user_id})
conn.commit()
conn.close()
return True
conn.commit()
conn.close()
return False
def change_char_uncap(user_id, character_id):
# 角色觉醒改变,返回字典
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
r = None
if x is not None and x[0] == 1:
c.execute('''update user set is_char_uncapped_override = :a where user_id = :b''', {
'a': b2int(x[1] == 0), 'b': user_id})
c.execute('''update user_char set is_uncapped_override = :a where user_id = :b and character_id = :c''', {
'a': b2int(x[1] == 0), 'b': user_id, 'c': character_id})
c.execute('''select * from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
y = c.fetchone()
c.execute(
'''select name from character where character_id = :x''', {'x': y[1]})
z = c.fetchone()
if z is not None:
char_name = z[0]
if y is not None:
r = {
"is_uncapped_override": int2b(y[14]),
"is_uncapped": int2b(y[13]),
"uncap_cores": [],
"char_type": y[12],
"skill_id_uncap": y[11],
"skill_requires_uncap": int2b(y[10]),
"skill_unlock_level": y[9],
"skill_id": y[8],
"overdrive": y[7],
"prog": y[6],
"frag": y[5],
"level_exp": y[4],
"exp": y[3],
"level": y[2],
"name": char_name,
"character_id": y[1]
}
conn.commit()
conn.close()
return r
def arc_sys_set(user_id, value, set_arg):
# 三个设置PTT隐藏、体力满通知、最爱角色无返回
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
if 'favorite_character' in set_arg:
value = int(value)
c.execute('''update user set favorite_character = :a where user_id = :b''', {
'a': value, 'b': user_id})
else:
if value == 'false':
value = False
else:
value = True
if 'is_hide_rating' in set_arg:
c.execute('''update user set is_hide_rating = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
if 'max_stamina_notification_enabled' in set_arg:
c.execute('''update user set max_stamina_notification_enabled = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
conn.commit()
conn.close()
return None
def arc_add_friend(user_id, friend_id):
# 加好友返回好友列表或者是错误码602、604
if user_id == friend_id:
return 604
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (0,):
c.execute('''insert into friend values(:a, :b)''',
{'a': user_id, 'b': friend_id})
r = info.get_user_friend(c, user_id)
else:
return 602
conn.commit()
conn.close()
return r
def arc_delete_friend(user_id, friend_id):
# 删好友,返回好友列表
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (1,):
c.execute('''delete from friend where user_id_me = :x and user_id_other = :y''',
{'x': user_id, 'y': friend_id})
r = info.get_user_friend(c, user_id)
conn.commit()
conn.close()
return r

View File

@@ -1,3 +0,0 @@
[CONFIG]
HOST = 192.168.1.113
PORT = 80

Binary file not shown.

Binary file not shown.

View File

@@ -1,233 +0,0 @@
import sqlite3
import hashlib
import time
# 数据库初始化文件删掉arcaea_database.db文件后运行即可谨慎使用
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
c.execute('''create table if not exists user(user_id int primary key,
name text unique,
password text not null,
join_date char(20),
user_code char(10),
rating_ptt int,
character_id int,
is_skill_sealed int,
is_char_uncapped int,
is_char_uncapped_override int,
is_hide_rating int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
clear_type int,
rating real,
favorite_character int,
max_stamina_notification_enabled int
);''')
c.execute('''create table if not exists login(access_token text,
user_id int,
last_login_time int,
last_login_ip text,
last_login_device text
);''')
c.execute('''create table if not exists friend(user_id_me int,
user_id_other int,
primary key (user_id_me, user_id_other)
);''')
c.execute('''create table if not exists best_score(user_id int,
song_id text,
difficulty int,
score int,
shiny_perfect_count int,
perfect_count int,
near_count int,
miss_count int,
health int,
modifier int,
time_played int,
best_clear_type int,
clear_type int,
rating real,
primary key(user_id, song_id, difficulty)
);''')
c.execute('''create table if not exists user_char(user_id int,
character_id int,
level int,
exp real,
level_exp real,
frag int,
prog int,
overdrive int,
skill_id text,
skill_unlock_level int,
skill_requires_uncap int,
skill_id_uncap text,
char_type int,
is_uncapped int,
is_uncapped_override int,
primary key(user_id, character_id)
);''')
c.execute('''create table if not exists character(character_id int primary key,
name text,
level int,
exp real,
level_exp real,
frag int,
prog int,
overdrive int,
skill_id text,
skill_unlock_level int,
skill_requires_uncap int,
skill_id_uncap text,
char_type int,
uncap_cores text,
is_uncapped int,
is_uncapped_override int
);''')
c.execute('''create table if not exists recent30(user_id int primary key,
r0 real,
song_id0 text,
r1 real,
song_id1 text,
r2 real,
song_id2 text,
r3 real,
song_id3 text,
r4 real,
song_id4 text,
r5 real,
song_id5 text,
r6 real,
song_id6 text,
r7 real,
song_id7 text,
r8 real,
song_id8 text,
r9 real,
song_id9 text,
r10 real,
song_id10 text,
r11 real,
song_id11 text,
r12 real,
song_id12 text,
r13 real,
song_id13 text,
r14 real,
song_id14 text,
r15 real,
song_id15 text,
r16 real,
song_id16 text,
r17 real,
song_id17 text,
r18 real,
song_id18 text,
r19 real,
song_id19 text,
r20 real,
song_id20 text,
r21 real,
song_id21 text,
r22 real,
song_id22 text,
r23 real,
song_id23 text,
r24 real,
song_id24 text,
r25 real,
song_id25 text,
r26 real,
song_id26 text,
r27 real,
song_id27 text,
r28 real,
song_id28 text,
r29 real,
song_id29 text
);''')
char = ['Hikari','Tairitsu','Kou','Sapphire','Lethe','','Tairitsu(Axium)'
,'Tairitsu(Grievous Lady)','Stella','Hikari & Fisica','Ilith','Eto','Luna'
,'Shirabe','Hikari(Zero)','Hikari(Fracture)','Hikari(Summer)','Tairitsu(Summer)'
,'Tairitsu&Trin','Ayu','Eto&Luna','Yume','Seine & Hikari','Saya','Tairitsu & Chuni Penguin'
,'Chuni Penguin','Haruna','Nono','MTA-XXX','MDA-21','Kanae','Hikari(Fantasia)','Tairitsu(Sonata)','Sia','DORO*C'
,'Tairitsu(Tempest)','Brillante','Ilith(Summer)']
for i in range(0, 38):
if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21]:
sql = 'insert into character values('+str(i)+',"'+char[i]+'''",30,25000,25000,90,90,90,'',0,0,'',0,'',1,1)'''
c.execute(sql)
else:
if i != 5:
sql = 'insert into character values('+str(i)+',"'+char[i]+'''",30,25000,25000,90,90,90,'',0,0,'',0,'',0,0)'''
c.execute(sql)
conn.commit()
conn.close()
def arc_register(name: str, password: str):
def build_user_code(c):
return '123456789'
def build_user_id(c):
return 2000000
## def insert_user_char(c, user_id):
## for i in range(0, 38):
## if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21]:
## sql = 'insert into user_char values('+str(user_id)+','+str(
## i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
## c.execute(sql)
## else:
## if i != 5:
## sql = 'insert into user_char values('+str(user_id)+','+str(
## i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
## c.execute(sql)
def insert_user_char(c, user_id):
# 为用户添加所有可用角色
c.execute('''select * from character''')
x = c.fetchall()
if x != []:
for i in x:
c.execute('''insert into user_char values(:a,:b,:c,:d,:e,:f,:g,:h,:i,:j,:k,:l,:m,:n,:o)''', {
'a': user_id, 'b': i[0], 'c': i[2], 'd': i[3], 'e': i[4], 'f': i[5], 'g': i[6], 'h': i[7], 'i': i[8], 'j': i[9], 'k': i[10], 'l': i[11], 'm': i[12], 'n': i[14], 'o': i[15]})
conn = sqlite3.connect('arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 1250, 1, 0, 1, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
c.execute('''insert into best_score values(2000000,'vexaria',3,10000000,100,0,0,0,100,0,1599667200,3,3,10.8)''')
insert_user_char(c, user_id)
conn.commit()
conn.close()
return None
else:
conn.commit()
conn.close()
return None
arc_register('admin', 'admin123')

View File

@@ -1,425 +0,0 @@
from flask import Flask, request, jsonify, make_response
import configparser
import base64
import server.auth
import server.info
import server.setme
import server.arcscore
import web.login
import web.index
app = Flask(__name__)
wsgi_app = app.wsgi_app
def error_return(error_code): # 错误返回
# 100 无法在此ip地址下登录游戏
# 101 用户名占用
# 102 电子邮箱已注册
# 103 已有一个账号由此设备创建
# 104 用户名密码错误
# 105 24小时内登入两台设备
# 106 账户冻结
# 107 你没有足够的体力
# 401 用户不存在
# 403 无法连接至服务器
# 501 502 此物品目前无法获取
# 504 无效的序列码
# 505 此序列码已被使用
# 506 你已拥有了此物品
# 601 好友列表已满
# 602 此用户已是好友
# 604 你不能加自己为好友
# 其它 发生未知错误
return jsonify({
"success": False,
"error_code": error_code
})
@app.route('/')
def hello():
return "Hello World!"
@app.route('/coffee/12/auth/login', methods=['POST']) # 登录接口
def login():
headers = request.headers
id_pwd = headers['Authorization']
id_pwd = base64.b64decode(id_pwd[6:]).decode()
name, password = id_pwd.split(':', 1)
try:
token = server.auth.arc_login(name, password)
if token is not None:
r = {"success": True, "token_type": "Bearer"}
r['access_token'] = token
return jsonify(r)
else:
return error_return(104) # 用户名或密码错误
except:
return error_return(108)
@app.route('/coffee/12/user/', methods=['POST']) # 注册接口
def register():
name = request.form['name']
password = request.form['password']
try:
user_id, token, error_code = server.auth.arc_register(name, password)
if user_id is not None:
r = {"success": True, "value": {
'user_id': user_id, 'access_token': token}}
return jsonify(r)
else:
return error_return(error_code) # 应该是101用户名被占用毕竟电子邮箱、设备号没记录
except:
return error_return(108)
@app.route('/coffee/12/compose/aggregate', methods=['GET']) # 用户信息获取
def aggregate():
calls = request.args.get('calls')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
if calls == '[{ "endpoint": "/user/me", "id": 0 }]': # 极其沙雕的判断我猜get的参数就两种
r = server.info.arc_aggregate_small(user_id)
else:
r = server.info.arc_aggregate_big(user_id)
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/character', methods=['POST']) # 角色切换
def character_change():
headers = request.headers
token = headers['Authorization']
token = token[7:]
character_id = request.form['character']
skill_sealed = request.form['skill_sealed']
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
flag = server.setme.change_char(
user_id, character_id, skill_sealed)
if flag:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": character_id
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/<path:path>/toggle_uncap', methods=['POST']) # 角色觉醒切换
def character_uncap(path):
character_id = int(path[22:])
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.setme.change_char_uncap(user_id, character_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"character": [r]
}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/add', methods=['POST']) # 加好友
def add_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_code = request.form['friend_code']
try:
user_id = server.auth.token_get_id(token)
friend_id = server.auth.code_get_id(friend_code)
if user_id is not None and friend_id is not None:
r = server.setme.arc_add_friend(user_id, friend_id)
if r is not None and r != 602 and r != 604:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
if r is not None:
return error_return(r)
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/friend/me/delete', methods=['POST']) # 删好友
def delete_friend():
headers = request.headers
token = headers['Authorization']
token = token[7:]
friend_id = int(request.form['friend_id'])
try:
user_id = server.auth.token_get_id(token)
if user_id is not None and friend_id is not None:
r = server.setme.arc_delete_friend(user_id, friend_id)
if r is not None:
return jsonify({
"success": True,
"value": {
"user_id": user_id,
"updatedAt": "2020-09-07T07:32:12.740Z",
"createdAt": "2020-09-06T10:05:18.471Z",
"friends": r
}
})
else:
return error_return(108)
else:
if friend_id is None:
return error_return(401)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/friend', methods=['GET']) # 好友排名默认最多50
def song_score_friend():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.arcscore.arc_score_friend(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song/me', methods=['GET']) # 我的排名默认最多20
def song_score_me():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.arcscore.arc_score_me(user_id, song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['GET']) # TOP20
def song_score_top():
song_id = request.args.get('song_id')
difficulty = request.args.get('difficulty')
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.arcscore.arc_score_top(song_id, difficulty)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/song', methods=['POST']) # 成绩上传
def song_score_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
song_id = request.form['song_id']
difficulty = int(request.form['difficulty'])
score = int(request.form['score'])
shiny_perfect_count = int(request.form['shiny_perfect_count'])
perfect_count = int(request.form['perfect_count'])
near_count = int(request.form['near_count'])
miss_count = int(request.form['miss_count'])
health = int(request.form['health'])
modifier = int(request.form['modifier'])
beyond_gauge = int(request.form['beyond_gauge'])
clear_type = int(request.form['clear_type'])
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.arcscore.arc_score_post(user_id, song_id, difficulty, score, shiny_perfect_count,
perfect_count, near_count, miss_count, health, modifier, beyond_gauge, clear_type)
if r is not None:
return jsonify({
"success": True,
"value": {"user_rating": r}
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/score/token', methods=['GET']) # 成绩上传所需的token显然我不想验证
def score_token():
return jsonify({
"success": True,
"value": {
"token": "1145141919810"
}
})
@app.route('/coffee/12/user/me/save', methods=['GET']) # 从云端同步
def cloud_get():
headers = request.headers
token = headers['Authorization']
token = token[7:]
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
r = server.arcscore.arc_all_get(user_id)
if r is not None:
return jsonify({
"success": True,
"value": r
})
else:
return error_return(108)
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/user/me/save', methods=['POST']) # 向云端同步
def cloud_post():
headers = request.headers
token = headers['Authorization']
token = token[7:]
scores_data = request.form['scores_data']
clearlamps_data = request.form['clearlamps_data']
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
server.arcscore.arc_all_post(user_id, scores_data, clearlamps_data)
return jsonify({
"success": True,
"value": {
"user_id": user_id
}
})
else:
return error_return(108)
except:
return error_return(108)
@app.route('/coffee/12/purchase/me/redeem', methods=['POST']) # 兑换码,自然没有用
def redeem():
return error_return(504)
@app.route('/coffee/<path:path>', methods=['POST']) # 三个设置,写在最后降低优先级
def sys_set(path):
set_arg = path[10:]
headers = request.headers
token = headers['Authorization']
token = token[7:]
value = request.form['value']
try:
user_id = server.auth.token_get_id(token)
if user_id is not None:
server.setme.arc_sys_set(user_id, value, set_arg)
r = server.info.arc_aggregate_small(user_id)
r['value'] = r['value'][0]['value']
return jsonify(r)
else:
return error_return(108)
except:
return error_return(108)
def main():
config = configparser.ConfigParser()
path = r'setting.ini'
config.read(path, encoding="utf-8")
HOST = config.get('CONFIG', 'HOST')
PORT = config.get('CONFIG', 'PORT')
app.config.from_mapping(SECRET_KEY='1145141919810')
app.register_blueprint(web.login.bp)
app.register_blueprint(web.index.bp)
app.run(HOST, PORT)
if __name__ == '__main__':
main()
# Made By Lost 2020.9.11

View File

@@ -1 +0,0 @@
python main.py

File diff suppressed because it is too large Load Diff

View File

@@ -1,149 +0,0 @@
import sqlite3
import hashlib
import time
def arc_login(name: str, password: str) -> str: # 登录判断
# 查询数据库中的user表验证账号密码返回并记录token
# token采用user_id和时间戳连接后hash生成真的是瞎想的没用bear
# 密码和token的加密方式为 SHA-256
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute('''select user_id from user where name = :name and password = :password''', {
'name': name, 'password': hash_pwd})
x = c.fetchone()
if x is not None:
user_id = str(x[0])
now = int(time.time() * 1000)
token = hashlib.sha256((user_id + str(now)).encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from login where user_id = :user_id)''', {"user_id": user_id})
if c.fetchone() == (1,): # 删掉多余token
c.execute('''delete from login where user_id = :user_id''',
{'user_id': user_id})
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
conn.commit()
conn.close()
return token
conn.commit()
conn.close()
return None
def arc_register(name: str, password: str): # 注册
# 账号注册只记录hash密码和用户名生成user_id和user_code自动登录返回token
# token和密码的处理同登录部分
def build_user_code(c):
# 生成9位的user_code用的自然是随机
import random
flag = True
while flag:
user_code = ''.join([str(random.randint(0, 9)) for i in range(9)])
c.execute('''select exists(select * from user where user_code = :user_code)''',
{'user_code': user_code})
if c.fetchone() == (0,):
flag = False
return user_code
def build_user_id(c):
# 生成user_id往后加1
c.execute('''select max(user_id) from user''')
x = c.fetchone()
if x[0] is not None:
return x[0] + 1
else:
return 2000001
# def insert_user_char(c, user_id):
# # 为用户添加所有可用角色
# for i in range(0, 38):
# if i in [0, 1, 2, 4, 13, 26, 27, 28, 29, 36, 21]:
# sql = 'insert into user_char values('+str(user_id)+','+str(
# i)+''',30,25000,25000,90,90,90,'',0,0,'',0,1,1)'''
# c.execute(sql)
# else:
# if i != 5:
# sql = 'insert into user_char values('+str(user_id)+','+str(
# i)+''',30,25000,25000,90,90,90,'',0,0,'',0,0,0)'''
# c.execute(sql)
def insert_user_char(c, user_id):
# 为用户添加所有可用角色
c.execute('''select * from character''')
x = c.fetchall()
if x != []:
for i in x:
c.execute('''insert into user_char values(:a,:b,:c,:d,:e,:f,:g,:h,:i,:j,:k,:l,:m,:n,:o)''', {
'a': user_id, 'b': i[0], 'c': i[2], 'd': i[3], 'e': i[4], 'f': i[5], 'g': i[6], 'h': i[7], 'i': i[8], 'j': i[9], 'k': i[10], 'l': i[11], 'm': i[12], 'n': i[14], 'o': i[15]})
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
hash_pwd = hashlib.sha256(password.encode("utf8")).hexdigest()
c.execute(
'''select exists(select * from user where name = :name)''', {'name': name})
if c.fetchone() == (0,):
user_code = build_user_code(c)
user_id = build_user_id(c)
now = int(time.time() * 1000)
c.execute('''insert into user(user_id, name, password, join_date, user_code, rating_ptt,
character_id, is_skill_sealed, is_char_uncapped, is_char_uncapped_override, is_hide_rating, favorite_character, max_stamina_notification_enabled)
values(:user_id, :name, :password, :join_date, :user_code, 0, 0, 0, 0, 0, 0, -1, 0)
''', {'user_code': user_code, 'user_id': user_id, 'join_date': now, 'name': name, 'password': hash_pwd})
c.execute('''insert into recent30(user_id) values(:user_id)''', {
'user_id': user_id})
token = hashlib.sha256(
(str(user_id) + str(now)).encode("utf8")).hexdigest()
c.execute('''insert into login(access_token, user_id) values(:access_token, :user_id)''', {
'user_id': user_id, 'access_token': token})
insert_user_char(c, user_id)
conn.commit()
conn.close()
return user_id, token, 0
else:
conn.commit()
conn.close()
return None, None, 101
def token_get_id(token: str):
# 用token获取id没有考虑不同用户token相同情况说不定会有bug
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from login where access_token = :token''', {
'token': token})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None
def code_get_id(user_code):
# 用user_code获取id
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select user_id from user where user_code = :a''',
{'a': user_code})
x = c.fetchone()
if x is not None:
conn.commit()
conn.close()
return x[0]
else:
conn.commit()
conn.close()
return None

File diff suppressed because it is too large Load Diff

View File

@@ -1,155 +0,0 @@
import sqlite3
import server.info
def b2int(x):
# int与布尔值转换
if x:
return 1
else:
return 0
def int2b(x):
# int与布尔值转换
if x is None or x == 0:
return False
else:
return True
def change_char(user_id, character_id, skill_sealed):
# 角色改变,包括技能封印的改变,返回成功与否的布尔值
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
if x is not None:
if skill_sealed == 'false':
skill_sealed = False
else:
skill_sealed = True
c.execute('''update user set is_skill_sealed = :a, character_id = :b, is_char_uncapped = :c, is_char_uncapped_override = :d where user_id = :e''', {
'a': b2int(skill_sealed), 'b': character_id, 'c': x[0], 'd': x[1], 'e': user_id})
conn.commit()
conn.close()
return True
conn.commit()
conn.close()
return False
def change_char_uncap(user_id, character_id):
# 角色觉醒改变,返回字典
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select is_uncapped, is_uncapped_override from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
x = c.fetchone()
r = None
if x is not None and x[0] == 1:
c.execute('''update user set is_char_uncapped_override = :a where user_id = :b''', {
'a': b2int(x[1] == 0), 'b': user_id})
c.execute('''update user_char set is_uncapped_override = :a where user_id = :b and character_id = :c''', {
'a': b2int(x[1] == 0), 'b': user_id, 'c': character_id})
c.execute('''select * from user_char where user_id = :a and character_id = :b''',
{'a': user_id, 'b': character_id})
y = c.fetchone()
c.execute(
'''select name from character where character_id = :x''', {'x': y[1]})
z = c.fetchone()
if z is not None:
char_name = z[0]
if y is not None:
r = {
"is_uncapped_override": int2b(y[14]),
"is_uncapped": int2b(y[13]),
"uncap_cores": [],
"char_type": y[12],
"skill_id_uncap": y[11],
"skill_requires_uncap": int2b(y[10]),
"skill_unlock_level": y[9],
"skill_id": y[8],
"overdrive": y[7],
"prog": y[6],
"frag": y[5],
"level_exp": y[4],
"exp": y[3],
"level": y[2],
"name": char_name,
"character_id": y[1]
}
conn.commit()
conn.close()
return r
def arc_sys_set(user_id, value, set_arg):
# 三个设置PTT隐藏、体力满通知、最爱角色无返回
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
if 'favorite_character' in set_arg:
value = int(value)
c.execute('''update user set favorite_character = :a where user_id = :b''', {
'a': value, 'b': user_id})
else:
if value == 'false':
value = False
else:
value = True
if 'is_hide_rating' in set_arg:
c.execute('''update user set is_hide_rating = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
if 'max_stamina_notification_enabled' in set_arg:
c.execute('''update user set max_stamina_notification_enabled = :a where user_id = :b''', {
'a': b2int(value), 'b': user_id})
conn.commit()
conn.close()
return None
def arc_add_friend(user_id, friend_id):
# 加好友返回好友列表或者是错误码602、604
if user_id == friend_id:
return 604
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (0,):
c.execute('''insert into friend values(:a, :b)''',
{'a': user_id, 'b': friend_id})
r = server.info.get_user_friend(c, user_id)
else:
return 602
conn.commit()
conn.close()
return r
def arc_delete_friend(user_id, friend_id):
# 删好友,返回好友列表
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select exists(select * from friend where user_id_me = :x and user_id_other = :y)''',
{'x': user_id, 'y': friend_id})
r = None
if c.fetchone() == (1,):
c.execute('''delete from friend where user_id_me = :x and user_id_other = :y''',
{'x': user_id, 'y': friend_id})
r = server.info.get_user_friend(c, user_id)
conn.commit()
conn.close()
return r

View File

@@ -1,3 +0,0 @@
[CONFIG]
HOST = 192.168.1.113
PORT = 80

View File

@@ -1,217 +0,0 @@
html {
font-family: sans-serif;
background: #eee;
padding: 1rem;
}
body {
max-width: 960px;
margin: 0 auto;
background: white;
}
h1 {
font-family: serif;
color: #377ba8;
margin: 1rem 0;
}
a {
text-decoration: none;
color: #377ba8;
}
hr {
border: none;
border-top: 1px solid lightgray;
}
nav {
background: lightgray;
display: flex;
align-items: center;
padding: 0 0.5rem;
}
nav h1 {
flex: auto;
margin: 0;
}
nav h1 a {
text-decoration: none;
padding: 0.25rem 0.5rem;
}
nav ul {
display: flex;
list-style: none;
margin: 0;
padding: 0;
}
nav ul li a,
nav ul li span,
header .action {
display: block;
padding: 0.5rem;
}
.content {
padding: 0 1rem 1rem;
}
.content>header {
border-bottom: 1px solid lightgray;
display: flex;
align-items: flex-end;
}
.content>header h1 {
flex: auto;
margin: 1rem 0 0.25rem 0;
}
.flash {
margin: 1em 0;
padding: 1em;
background: #cae6f6;
border: 1px solid #377ba8;
}
.post>header {
display: flex;
align-items: flex-end;
font-size: 0.85em;
}
.post>header>div:first-of-type {
flex: auto;
}
.post>header h1 {
font-size: 1.5em;
margin-bottom: 0;
}
.post .about {
color: slategray;
font-style: italic;
}
.post .body {
white-space: pre-line;
}
.content:last-child {
margin-bottom: 0;
}
.content form {
margin: 1em 0;
display: flex;
flex-direction: column;
}
.content label {
font-weight: bold;
margin-bottom: 0.5em;
}
.content input,
.content textarea {
margin-bottom: 1em;
}
.content textarea {
min-height: 12em;
resize: vertical;
}
input.danger {
color: #cc2f2e;
}
input[type=submit] {
align-self: start;
min-width: 10em;
}
.score-item {
margin-bottom: 10px;
clear: both;
}
.song-title {
font-size: 1.3em;
font-weight: bold;
display: inline-block;
}
.difficulty_pst {
font-size: 0.9em;
background-color: rgb(10, 130, 190);
color: white;
}
.difficulty_prs {
font-size: 0.9em;
background-color: rgb(100, 140, 60);
color: white;
}
.difficulty_ftr {
font-size: 0.9em;
background-color: rgb(80, 25, 75);
color: white;
}
.difficulty_byd {
font-size: 0.9em;
background-color: rgb(130, 35, 40);
color: white;
}
.rank {
font-size: 0.8em;
margin-left: 4px;
padding: 0 4px;
}
.rank_big {
font-size: 1.2em;
margin-left: 4px;
padding: 0 4px;
}
.song-detail {
font-size: 0.8em;
float: right;
text-align: right;
}
.song-score {
font-size: 1.375em;
font-family: Arial;
}
.song-clear-type {
font-size: 0.875em;
padding-left: 0.625em;
}
.title {
font-size: 1.375em;
}
.name {
font-size: 1.12em;
font-weight: bold;
}
.footer{
font-size: 0.5em;
color: grey;
text-align: center;
}

View File

@@ -1,23 +0,0 @@
<!doctype html>
<title>{% block title %}{% endblock %} - Arcaea Server</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
<nav>
<h1>Arcaea Server</h1>
<ul>
{% if g.user %}
<li><span>{{ g.user['username'] }}</span>
<li><a href="{{ url_for('index.index') }}">Home</a>
<li><a href="{{ url_for('login.logout') }}">Log Out</a> {% else %}
<li><a href="{{ url_for('login.login') }}">Log In</a> {% endif %}
</ul>
</nav>
<section class="content">
<header>
{% block header %}{% endblock %}
</header>
{% for message in get_flashed_messages() %}
<div class="flash">{{ message }}</div>
{% endfor %} {% block content %}{% endblock %}
</section>
<footer id="footer" class="footer">Made by Lost@2020</footer>

View File

@@ -1,81 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}All players{% endblock %}</h1>
{% endblock %}
{% block content %}
{% if posts %}
{% for user in posts %}
<div id="user-info">
<div class="name">{{user['name']}}
<span class="rank">UID: {{user['user_id']}}</span>
<span class="rank">User code: {{user['user_code']}}</span>
</div>
<div class="join-date">注册于 Registered in: {{user['join_date']}}</div>
<div class="ptt">PTT: {{user['rating_ptt']//100 ~ '.' ~ user['rating_ptt']%100}}</div>
<div>
<div>Recent plays: </div>
<div>
<div class="score-item">
<span class="song-title">
{{user['song_id']}}
</span>
{% if user['difficulty'] == 0 %}
<span class="difficulty_pst">PST</span>
{% elif user['difficulty'] == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif user['difficulty'] == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<div class="song-detail">
<br />
<table>
<tbody>
<tr>
<td>PURE: </td>
<td>{{user['perfect_count']}}</td>
<td> {{'(' ~ user['shiny_perfect_count'] ~ ')'}}</td>
</tr>
<tr>
<td>FAR: </td>
<td>{{user['near_count']}}</td>
<td></td>
</tr>
<tr>
<td>LOST: </td>
<td>{{user['miss_count']}}</td>
<td></td>
</tr>
</tbody>
</table>
</div>
<div class="song-score">{{user['score']}}</div>
<div class="song-clear-type">
{% if user['clear_type'] == 3 %}Pure Memory
{% elif user['clear_type'] == 2 %}Full Recall
{% elif user['clear_type'] == 5 %}Hard Clear
{% elif user['clear_type'] == 1 %}Normal Clear
{% elif user['clear_type'] == 4 %}Easy Clear
{% else%}Track Lost
{% endif %}
</div>
<div class="song-rating">成绩评价 Rating: {{user['rating']}}</div>
<div class="song-clear-date">日期 Date:
{{user['time_played']}}
</div>
</div>
</div>
</div>
</div>
{% if not loop.last %}
<hr />
<br />
{% endif %}
{% endfor %}
{% endif %}
{% endblock %}

View File

@@ -1,48 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}All songs{% endblock %}</h1>
{% endblock %}
{% block content %}
{% if posts %}<br />
{% for song in posts %}
<div class="score-item">
<span>Sid: </span>
<span class="song-title">{{song['song_id']}}</span>
<br />
<span>Name_en: </span>
<span class="song-title">{{song['name_en']}}</span>
<br />
<div>铺面定数 Chart const: </div>
{% if song['rating_pst'] %}
<span class="difficulty_pst">PST</span>
<span class="song-rating">{{song['rating_pst']}}</span>
{% endif %}
<br />
{% if song['rating_prs'] %}
<span class="difficulty_prs">PRS</span>
<span class="song-rating">{{song['rating_prs']}}</span>
{% endif %}
<br />
{% if song['rating_ftr'] %}
<span class="difficulty_ftr">FTR</span>
<span class="song-rating">{{song['rating_ftr']}}</span>
{% endif %}
<br />
{% if song['rating_byn'] %}
<span class="difficulty_byd">BYD</span>
<span class="song-rating">{{song['rating_byn']}}</span>
{% endif %}
</div>
{% if not loop.last %}
<br />
<hr />
<br />
{% endif %}
{% endfor %}
{% endif %}
{% endblock %}

View File

@@ -1,35 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Change the songs{% endblock %}</h1>
{% endblock %}
{% block content %}
<form action="/web/changesong/addsong" method="post">
<div class="title">Add the song</div>
<label for="sid">SID of the song</label>
<input name="sid" id="sid" required>
<label for="name_en">English name of the song(Not important but neccessary)</label>
<input name="name_en" id="name_en" required>
<label for="rating_pst">Past chart const</label>
<input name="rating_pst" id="rating_pst" required>
<label for="rating_prs">Present chart const</label>
<input name="rating_prs" id="rating_prs" required>
<label for="rating_ftr">Future chart const</label>
<input name="rating_ftr" id="rating_ftr" required>
<label for="rating_byd">Beyond chart const</label>
<input name="rating_byd" id="rating_byd" required>
<div class="content">如果没有某个铺面,应该填入-1。</div>
<div class="content">If there is no some chart, fill in -1 please.</div>
<input type="submit" value="Add">
</form>
<br />
<hr />
<form action="/web/changesong/deletesong" method="post">
<div class="title">Delete the song</div>
<label for="sid">SID of the song</label>
<input name="sid" id="sid" required>
<input type="submit" value="Delete">
</form>
{% endblock %}

View File

@@ -1,24 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Index{% endblock %}</h1>
{% endblock %}
{% block content %}
<h1>说明 Statement</h1>
<p>这是Arcaea Server的后台管理协助系统可以进行一些简单的操作</br>
This is the background management assistance system of Arcaea Server, which can be used to do some simple
operations.
</p>
<h1>游戏方面 Game</h1>
<a href="{{ url_for('index.single_player_score') }}">单个玩家成绩查询 Single player score</a></br></br>
<a href="{{ url_for('index.single_player_ptt') }}">单个玩家PTT详情查询 Single player ptt</a></br></br>
<a href="{{ url_for('index.all_player') }}">所有玩家信息查询 All players</a></br></br>
<a href="{{ url_for('index.all_song') }}">铺面信息查询 All songs</a></br></br>
<a href="{{ url_for('index.single_chart_top') }}">单个铺面排行榜查询 Single song chart tops</a>
<hr>
<h1>系统方面 System</h1>
<a href="{{ url_for('index.update_database') }}">数据库更新 Update databases</a></br></br>
<a href="{{ url_for('index.change_song') }}">歌曲修改 Change the songs</a>
{% endblock %}

View File

@@ -1,14 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Log In{% endblock %}</h1>
{% endblock %}
{% block content %}
<form method="post">
<label for="username">Username</label>
<input name="username" id="username" required>
<label for="password">Password</label>
<input type="password" name="password" id="password" required>
<input type="submit" value="Log In">
</form>
{% endblock %}

View File

@@ -1,108 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Single chart top{% endblock %}</h1>
{% endblock %}
{% block content %}
<form method="post">
<label for="sid">SID of the song</label>
<span>模糊查询,只返回第一个</span>
<span>Fuzzy query,and only return the first one.</span>
<input name="sid" id="sid">
<label for="difficulty">Difficulty</label>
<select name='difficulty' id='difficulty'>
<option value="0" selected>Past</option>
<option value="1">Present</option>
<option value="2">Future</option>
<option value="3">Beyond</option>
</select>
<br />
<input type="submit" value="Find">
</br>
</br>
</br>
{% if song_id %}
<div>
<span class="song-title">
{{song_id}}
</span>
{% if difficulty == 0 %}
<span class="difficulty_pst">PST</span>
{% elif difficulty == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif difficulty == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<br />
<div class="song-title">
{{song_name_en}}
</div>
</div>
{% endif %}
<br />
<hr />
{% for post in posts %}
<div class="score-item">
<div>
<span class='rank_big'>{{'#' ~ post['rank']}}</span>
<span class="name">
{{post['name']}}
</span>
<span class="rank">UID: {{post['user_id']}}</span>
</div>
<div class="song-detail">
<table>
<tbody>
<tr>
<td>PURE: </td>
<td>{{post['perfect_count']}}</td>
<td> {{'(' ~ post['shiny_perfect_count'] ~ ')'}}</td>
</tr>
<tr>
<td>FAR: </td>
<td>{{post['near_count']}}</td>
<td></td>
</tr>
<tr>
<td>LOST: </td>
<td>{{post['miss_count']}}</td>
<td></td>
</tr>
</tbody>
</table>
</div>
<div class="song-score">{{post['score']}}</div>
<div class="song-clear-type">
{% if post['clear_type'] == 3 %}Pure Memory
{% elif post['clear_type'] == 2 %}Full Recall
{% elif post['clear_type'] == 5 %}Hard Clear
{% elif post['clear_type'] == 1 %}Normal Clear
{% elif post['clear_type'] == 4 %}Easy Clear
{% else%}Track Lost
{% endif %}
<span class="song-clear-type" style="margin-left: 1.875em;">
{% if post['best_clear_type'] == 3 %}(Pure Memory)
{% elif post['best_clear_type'] == 2 %}(Full Recall)
{% elif post['best_clear_type'] == 5 %}(Hard Clear)
{% elif post['best_clear_type'] == 1 %}(Normal Clear)
{% elif post['best_clear_type'] == 4 %}(Easy Clear)
{% else%}(Track Lost)
{% endif %}
</span>
</div>
<div class="song-clear-date">日期 Date:
{{post['time_played']}}
</div>
</div>
{% if not loop.last %}
</br>
{% endif %}
{% endfor %}
</form>
{% endblock %}

View File

@@ -1,85 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Single player score{% endblock %}</h1>
{% endblock %}
{% block content %}
<form method="post">
<label for="name">Arcaea Username</label>
<input name="name" id="name">
or<br />
<label for="user_code">Arcaea User Code</label>
<input name="user_code" id="user_code">
<input type="submit" value="Find">
</br>
</br>
</br>
{% for post in posts %}
<div class="score-item">
<span class="song-title">
{{post['song_id']}}
</span>
{% if post['difficulty'] == 0 %}
<span class="difficulty_pst">PST</span>
{% elif post['difficulty'] == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif post['difficulty'] == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<span class='rank'>{{'#' ~ post['rank']}}</span>
<div class="song-detail">
</br>
<table>
<tbody>
<tr>
<td>PURE: </td>
<td>{{post['perfect_count']}}</td>
<td> {{'(' ~ post['shiny_perfect_count'] ~ ')'}}</td>
</tr>
<tr>
<td>FAR: </td>
<td>{{post['near_count']}}</td>
<td></td>
</tr>
<tr>
<td>LOST: </td>
<td>{{post['miss_count']}}</td>
<td></td>
</tr>
</tbody>
</table>
</div>
<div class="song-score">{{post['score']}}</div>
<div class="song-clear-type">
{% if post['clear_type'] == 3 %}Pure Memory
{% elif post['clear_type'] == 2 %}Full Recall
{% elif post['clear_type'] == 5 %}Hard Clear
{% elif post['clear_type'] == 1 %}Normal Clear
{% elif post['clear_type'] == 4 %}Easy Clear
{% else%}Track Lost
{% endif %}
<span class="song-clear-type" style="margin-left: 1.875em;">
{% if post['best_clear_type'] == 3 %}(Pure Memory)
{% elif post['best_clear_type'] == 2 %}(Full Recall)
{% elif post['best_clear_type'] == 5 %}(Hard Clear)
{% elif post['best_clear_type'] == 1 %}(Normal Clear)
{% elif post['best_clear_type'] == 4 %}(Easy Clear)
{% else%}(Track Lost)
{% endif %}
</span>
</div>
<div class="song-rating">成绩评价 Rating: {{post['rating']}}</div>
<div class="song-clear-date">日期 Date:
{{post['time_played']}}
</div>
</div>
{% if not loop.last %}
</br>
{% endif %}
{% endfor %}
</form>
{% endblock %}

View File

@@ -1,191 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Single player ptt{% endblock %}</h1>
{% endblock %}
{% block content %}
<form method="post">
<label for="name">Arcaea Username</label>
<input name="name" id="name">
or<br />
<label for="user_code">Arcaea User Code</label>
<input name="user_code" id="user_code">
<input type="submit" value="Find">
<br />
<br />
<br />
{% if user %}
<hr />
<div id="user-info">
<div class="title">玩家信息 Player information</div>
<div class="name">{{user['name']}}
<span class="rank">UID: {{user['user_id']}}</span>
<span class="rank">User code: {{user['user_code']}}</span>
</div>
<div class="join-date">注册于 Registered in: {{user['join_date']}}</div>
<div class="ptt">PTT: {{user['rating_ptt']//100 ~ '.' ~ user['rating_ptt']%100}}</div>
<div class="ptt">Best 30 PTT: {{bestptt}}</div>
<div class="ptt">Recent 10 PTT: {{recentptt}}</div>
<div>
<div>Recent plays: </div>
<div>
<div class="score-item">
<span class="song-title">
{{user['song_id']}}
</span>
{% if user['difficulty'] == 0 %}
<span class="difficulty_pst">PST</span>
{% elif user['difficulty'] == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif user['difficulty'] == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<div class="song-detail">
<br />
<table>
<tbody>
<tr>
<td>PURE: </td>
<td>{{user['perfect_count']}}</td>
<td> {{'(' ~ user['shiny_perfect_count'] ~ ')'}}</td>
</tr>
<tr>
<td>FAR: </td>
<td>{{user['near_count']}}</td>
<td></td>
</tr>
<tr>
<td>LOST: </td>
<td>{{user['miss_count']}}</td>
<td></td>
</tr>
</tbody>
</table>
</div>
<div class="song-score">{{user['score']}}</div>
<div class="song-clear-type">
{% if user['clear_type'] == 3 %}Pure Memory
{% elif user['clear_type'] == 2 %}Full Recall
{% elif user['clear_type'] == 5 %}Hard Clear
{% elif user['clear_type'] == 1 %}Normal Clear
{% elif user['clear_type'] == 4 %}Easy Clear
{% else%}Track Lost
{% endif %}
</div>
<div class="song-rating">成绩评价 Rating: {{user['rating']}}</div>
<div class="song-clear-date">日期 Date:
{{user['time_played']}}
</div>
</div>
</div>
</div>
</div>
<hr />
{% if posts %}
<div class="title">Best 30</div>
{% endif %}
{% for post in posts %}
<div class="score-item">
<span class="song-title">
{{post['song_id']}}
</span>
{% if post['difficulty'] == 0 %}
<span class="difficulty_pst">PST</span>
{% elif post['difficulty'] == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif post['difficulty'] == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<span class='rank'>{{'#' ~ post['rank']}}</span>
<div class="song-detail">
<br />
<table>
<tbody>
<tr>
<td>PURE: </td>
<td>{{post['perfect_count']}}</td>
<td> {{'(' ~ post['shiny_perfect_count'] ~ ')'}}</td>
</tr>
<tr>
<td>FAR: </td>
<td>{{post['near_count']}}</td>
<td></td>
</tr>
<tr>
<td>LOST: </td>
<td>{{post['miss_count']}}</td>
<td></td>
</tr>
</tbody>
</table>
</div>
<div class="song-score">{{post['score']}}</div>
<div class="song-clear-type">
{% if post['clear_type'] == 3 %}Pure Memory
{% elif post['clear_type'] == 2 %}Full Recall
{% elif post['clear_type'] == 5 %}Hard Clear
{% elif post['clear_type'] == 1 %}Normal Clear
{% elif post['clear_type'] == 4 %}Easy Clear
{% else%}Track Lost
{% endif %}
<span class="song-clear-type" style="margin-left: 1.875em;">
{% if post['best_clear_type'] == 3 %}(Pure Memory)
{% elif post['best_clear_type'] == 2 %}(Full Recall)
{% elif post['best_clear_type'] == 5 %}(Hard Clear)
{% elif post['best_clear_type'] == 1 %}(Normal Clear)
{% elif post['best_clear_type'] == 4 %}(Easy Clear)
{% else%}(Track Lost)
{% endif %}
</span>
</div>
<div class="song-rating">成绩评价 Rating: {{post['rating']}}</div>
<div class="song-clear-date">日期 Date:
{{post['time_played']}}
</div>
</div>
{% if not loop.last %}
<br />
{% endif %}
{% endfor %}
<hr />
{% if recent %}
<div class="title">Recent 30</div>
{% set rank = 0 %}
{% for i in recent %}
{% if i %}
{% set rank = rank + 1 %}
<div class="score-item">
<span class="song-title">
{{i['song_id']}}
</span>
{% if i['difficulty'] == 0 %}
<span class="difficulty_pst">PST</span>
{% elif i['difficulty'] == 1 %}
<span class="difficulty_prs">PRS</span>
{% elif i['difficulty'] == 2 %}
<span class="difficulty_ftr">FTR</span>
{% else %}
<span class="difficulty_byd">BYD</span>
{% endif %}
<span class='rank'>{{rank}}</span>
<div class="song-rating">成绩评价 Rating: {{i['rating']}}</div>
</div>
{% if not loop.last %}
<br />
{% endif %}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
</form>
{% endblock %}

View File

@@ -1,21 +0,0 @@
{% extends 'base.html' %}
{% block header %}
<h1>{% block title %}Update databases{% endblock %}</h1>
{% endblock %}
{% block content %}
<form method="post" enctype="multipart/form-data">
<label for="name">Old database</label>
<input type="file" name="file">
<input type="submit" value="Submit">
</form>
<div class="content">
这里可以将旧版本的数据库同步到新版本的数据库,并刷新用户拥有的角色列表。<br />
可上传文件: arcaea_database.db和arcsong.db<br />
新数据库不存在的数据会被添加,存在的数据将不会被改变。<br /><br />
Here you can synchronize the old version of the database to the new version of the database and refresh the list of
characters owned by players.<br />
Uploadable files: arcaea_database.db & arcsong.db<br />
Data that does not exist in the new database will be added and the existing data will not be changed.
</div>
{% endblock %}

View File

@@ -1,357 +0,0 @@
from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for
)
from web.login import login_required
from werkzeug.utils import secure_filename
import sqlite3
import web.webscore
import web.system
import time
import server.arcscore
import os
UPLOAD_FOLDER = 'database'
ALLOWED_EXTENSIONS = {'db'}
bp = Blueprint('index', __name__, url_prefix='/web')
def is_number(s):
try: # 判断字符串s是浮点数
float(s)
return True
except ValueError:
pass
return False
@bp.route('/index')
@bp.route('/')
@login_required
def index():
# 主页
return render_template('web/index.html')
@bp.route('/singleplayer', methods=['POST', 'GET'])
@login_required
def single_player_score():
# 单个玩家分数查询
if request.method == 'POST':
name = request.form['name']
user_code = request.form['user_code']
error = None
if name or user_code:
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
if user_code:
c.execute('''select user_id from user where user_code=:a''', {
'a': user_code})
else:
c.execute(
'''select user_id from user where name=:a''', {'a': name})
user_id = c.fetchone()
posts = []
if user_id:
user_id = user_id[0]
posts = web.webscore.get_user_score(c, user_id)
if not posts:
error = '无成绩 No score.'
else:
error = '玩家不存在 The player does not exist.'
conn.commit()
conn.close()
else:
error = '输入为空 Null Input.'
if error:
flash(error)
else:
return render_template('web/singleplayer.html', posts=posts)
return render_template('web/singleplayer.html')
@bp.route('/singleplayerptt', methods=['POST', 'GET'])
@login_required
def single_player_ptt():
# 单个玩家PTT详情查询
if request.method == 'POST':
name = request.form['name']
user_code = request.form['user_code']
error = None
if name or user_code:
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
if user_code:
c.execute('''select user_id from user where user_code=:a''', {
'a': user_code})
else:
c.execute(
'''select user_id from user where name=:a''', {'a': name})
user_id = c.fetchone()
posts = []
if user_id:
user_id = user_id[0]
user = web.webscore.get_user(c, user_id)
posts = web.webscore.get_user_score(c, user_id, 30)
recent, recentptt = web.webscore.get_user_recent30(c, user_id)
if not posts:
error = '无成绩 No score.'
else:
bestptt = 0
for i in posts:
if i['rating']:
bestptt += i['rating']
bestptt = bestptt / 30
else:
error = '玩家不存在 The player does not exist.'
conn.commit()
conn.close()
else:
error = '输入为空 Null Input.'
if error:
flash(error)
else:
return render_template('web/singleplayerptt.html', posts=posts, user=user, recent=recent, recentptt=recentptt, bestptt=bestptt)
return render_template('web/singleplayerptt.html')
@bp.route('/allplayer', methods=['GET'])
@login_required
def all_player():
# 所有玩家数据按照ptt排序
conn = sqlite3.connect('./database/arcaea_database.db')
c = conn.cursor()
c.execute('''select * from user order by rating_ptt DESC''')
x = c.fetchall()
error = None
if x:
posts = []
for i in x:
join_data = None
time_played = None
if i[3]:
join_date = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(int(i[3])//1000))
if i[20]:
time_played = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(int(i[20])//1000))
posts.append({'name': i[1],
'user_id': i[0],
'join_date': join_date,
'user_code': i[4],
'rating_ptt': i[5],
'song_id': i[11],
'difficulty': i[12],
'score': i[13],
'shiny_perfect_count': i[14],
'perfect_count': i[15],
'near_count': i[16],
'miss_count': i[17],
'time_played': time_played,
'clear_type': i[21],
'rating': i[22]
})
else:
error = '没有玩家数据 No player data.'
conn.commit()
conn.close()
if error:
flash(error)
return render_template('web/allplayer.html')
else:
return render_template('web/allplayer.html', posts=posts)
@bp.route('/allsong', methods=['GET'])
@login_required
def all_song():
# 所有歌曲数据
def defnum(x):
# 定数转换
if x >= 0:
return x / 10
else:
return None
conn = sqlite3.connect('./database/arcsong.db')
c = conn.cursor()
c.execute('''select * from songs''')
x = c.fetchall()
error = None
if x:
posts = []
for i in x:
posts.append({'song_id': i[0],
'name_en': i[1],
'rating_pst': defnum(i[12]),
'rating_prs': defnum(i[13]),
'rating_ftr': defnum(i[14]),
'rating_byn': defnum(i[15])
})
else:
error = '没有铺面数据 No song data.'
conn.commit()
conn.close()
if error:
flash(error)
return render_template('web/allsong.html')
else:
return render_template('web/allsong.html', posts=posts)
@bp.route('/singlecharttop', methods=['GET', 'POST'])
@login_required
def single_chart_top():
# 歌曲排行榜
if request.method == 'POST':
song_name = request.form['sid']
difficulty = request.form['difficulty']
if difficulty.isdigit():
difficulty = int(difficulty)
error = None
conn = sqlite3.connect('./database/arcsong.db')
c = conn.cursor()
song_name = '%'+song_name+'%'
c.execute('''select sid, name_en from songs where sid like :a limit 1''',
{'a': song_name})
x = c.fetchone()
conn.commit()
conn.close()
print(x)
if x:
song_id = x[0]
posts = server.arcscore.arc_score_top(song_id, difficulty, -1)
for i in posts:
i['time_played'] = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(i['time_played']))
else:
error = '查询为空 No song.'
if not error:
return render_template('web/singlecharttop.html', posts=posts, song_name_en=x[1], song_id=song_id, difficulty=difficulty)
else:
flash(error)
return render_template('web/singlecharttop.html')
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@bp.route('/updatedatabase', methods=['GET', 'POST'])
@login_required
def update_database():
# 更新数据库
error = None
if request.method == 'POST':
if 'file' not in request.files:
flash('无文件 No file part.')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('未选择文件 No selected file.')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = 'old_' + secure_filename(file.filename)
file.save(os.path.join(UPLOAD_FOLDER, filename))
flash('上传成功 Success upload.')
web.system.update_database()
flash('数据更新成功 Success update data.')
else:
error = '上传失败 Upload error.'
if error:
flash(error)
return render_template('web/updatedatabase.html')
@bp.route('/changesong', methods=['GET'])
@login_required
def change_song():
# 修改歌曲数据
return render_template('web/changesong.html')
@bp.route('/changesong/addsong', methods=['POST'])
@login_required
def add_song():
# 添加歌曲数据
def get_rating(x):
# 换算定数
if is_number(x):
x = float(x)
if x >= 0:
return int(x*10)
else:
return -1
else:
return -1
error = None
song_id = request.form['sid']
name_en = request.form['name_en']
rating_pst = get_rating(request.form['rating_pst'])
rating_prs = get_rating(request.form['rating_prs'])
rating_ftr = get_rating(request.form['rating_ftr'])
rating_byd = get_rating(request.form['rating_byd'])
if len(song_id) >= 256:
song_id = song_id[:200]
if len(name_en) >= 256:
name_en = name_en[:200]
conn = sqlite3.connect('./database/arcsong.db')
c = conn.cursor()
c.execute(
'''select exists(select * from songs where sid=:a)''', {'a': song_id})
if c.fetchone() == (0,):
c.execute('''insert into songs(sid,name_en,rating_pst,rating_prs,rating_ftr,rating_byn) values(:a,:b,:c,:d,:e,:f)''', {
'a': song_id, 'b': name_en, 'c': rating_pst, 'd': rating_prs, 'e': rating_ftr, 'f': rating_byd})
flash('歌曲添加成功 Successfully add the song.')
else:
error = '歌曲已存在 The song exists.'
conn.commit()
conn.close()
if error:
flash(error)
return redirect(url_for('index.change_song'))
@bp.route('/changesong/deletesong', methods=['POST'])
@login_required
def delete_song():
# 删除歌曲数据
error = None
song_id = request.form['sid']
conn = sqlite3.connect('./database/arcsong.db')
c = conn.cursor()
c.execute(
'''select exists(select * from songs where sid=:a)''', {'a': song_id})
if c.fetchone() == (1,):
c.execute('''delete from songs where sid=:a''', {'a': song_id})
flash('歌曲删除成功 Successfully delete the song.')
else:
error = "歌曲不存在 The song doesn't exist."
conn.commit()
conn.close()
if error:
flash(error)
return redirect(url_for('index.change_song'))

View File

@@ -1,50 +0,0 @@
#import sqlite3
from flask import (Blueprint, flash, g, redirect,
render_template, request, session, url_for)
import functools
bp = Blueprint('login', __name__, url_prefix='/web')
@bp.route('/login', methods=('GET', 'POST'))
def login():
# 登录
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
error = None
if username != 'admin' and password != 'admin':
error = '错误的用户名或密码 Incorrect username or password.'
if error is None:
session.clear()
session['user_id'] = 'admin'
return redirect(url_for('index.index'))
flash(error)
return render_template('web/login.html')
@bp.route('/logout')
def logout():
# 登出
session.clear()
flash('成功登出 Successfully log out.')
return redirect(url_for('index.index'))
def login_required(view):
# 登录验证,写成了修饰器
@functools.wraps(view)
def wrapped_view(**kwargs):
x = session.get('user_id')
# 少用户存在验证
if x is None:
return redirect(url_for('login.login'))
g.user = {'user_id': x, 'username': 'admin'}
return view(**kwargs)
return wrapped_view

View File

@@ -1,102 +0,0 @@
import os
import sqlite3
def update_user_char(c):
# 用character数据更新user_char
c.execute('''select * from character''')
x = c.fetchall()
c.execute('''select user_id from user''')
y = c.fetchall()
if x and y:
for j in y:
for i in x:
c.execute('''delete from user_char where user_id=:a and character_id=:b''', {
'a': j[0], 'b': i[0]})
c.execute('''insert into user_char values(:a,:b,:c,:d,:e,:f,:g,:h,:i,:j,:k,:l,:m,:n,:o)''', {
'a': j[0], 'b': i[0], 'c': i[2], 'd': i[3], 'e': i[4], 'f': i[5], 'g': i[6], 'h': i[7], 'i': i[8], 'j': i[9], 'k': i[10], 'l': i[11], 'm': i[12], 'n': i[14], 'o': i[15]})
def update_database():
# 将old数据库不存在数据加入到新数据库上并删除old数据库
# 对于arcaea_datebase.db更新best_scorefriendrecent30user并用character数据更新user_char
# 对于arcsong.db更新songs
if os.path.isfile("database/old_arcaea_database.db") and os.path.isfile("database/arcaea_database.db"):
conn1 = sqlite3.connect('./database/old_arcaea_database.db')
c1 = conn1.cursor()
conn2 = sqlite3.connect('./database/arcaea_database.db')
c2 = conn2.cursor()
# user
c1.execute('''select * from user''')
x = c1.fetchall()
if x:
for i in x:
c2.execute(
'''select exists(select * from user where user_id=:a)''', {'a': i[0]})
if c2.fetchone() == (0,):
c2.execute('''insert into user values(:a0,:a1,:a2,:a3,:a4,:a5,:a6,:a7,:a8,:a9,:a10,:a11,:a12,:a13,:a14,:a15,:a16,:a17,:a18,:a19,:a20,:a21,:a22,:a23,:a24)''', {
'a0': i[0], 'a1': i[1], 'a2': i[2], 'a3': i[3], 'a4': i[4], 'a5': i[5], 'a6': i[6], 'a7': i[7], 'a8': i[8], 'a9': i[9], 'a10': i[10], 'a11': i[11], 'a12': i[12], 'a13': i[13], 'a14': i[14], 'a15': i[15], 'a16': i[16], 'a17': i[17], 'a18': i[18], 'a19': i[19], 'a20': i[20], 'a21': i[21], 'a22': i[22], 'a23': i[23], 'a24': i[24]})
# friend
c1.execute('''select * from friend''')
x = c1.fetchall()
if x:
for i in x:
c2.execute(
'''select exists(select * from friend where user_id_me=:a and user_id_other=:b)''', {'a': i[0], 'b': i[1]})
if c2.fetchone() == (0,):
c2.execute('''insert into friend values(:a,:b)''', {
'a': i[0], 'b': i[1]})
# best_score
c1.execute('''select * from best_score''')
x = c1.fetchall()
if x:
for i in x:
c2.execute('''select exists(select * from best_score where user_id=:a and song_id=:b and difficulty=:c)''', {
'a': i[0], 'b': i[1], 'c': i[2]})
if c2.fetchone() == (0,):
c2.execute('''insert into best_score values(:a0,:a1,:a2,:a3,:a4,:a5,:a6,:a7,:a8,:a9,:a10,:a11,:a12,:a13)''', {
'a0': i[0], 'a1': i[1], 'a2': i[2], 'a3': i[3], 'a4': i[4], 'a5': i[5], 'a6': i[6], 'a7': i[7], 'a8': i[8], 'a9': i[9], 'a10': i[10], 'a11': i[11], 'a12': i[12], 'a13': i[13]})
# recent30
c1.execute('''select * from recent30''')
x = c1.fetchall()
if x:
for i in x:
c2.execute(
'''select exists(select * from recent30 where user_id=:a)''', {'a': i[0]})
if c2.fetchone() == (0,):
c2.execute('''insert into recent30 values(:a0,:a1,:a2,:a3,:a4,:a5,:a6,:a7,:a8,:a9,:a10,:a11,:a12,:a13,:a14,:a15,:a16,:a17,:a18,:a19,:a20,:a21,:a22,:a23,:a24,:a25,:a26,:a27,:a28,:a29,:a30,:a31,:a32,:a33,:a34,:a35,:a36,:a37,:a38,:a39,:a40,:a41,:a42,:a43,:a44,:a45,:a46,:a47,:a48,:a49,:a50,:a51,:a52,:a53,:a54,:a55,:a56,:a57,:a58,:a59,:a60)''', {'a0': i[0], 'a1': i[1], 'a2': i[2], 'a3': i[3], 'a4': i[4], 'a5': i[5], 'a6': i[6], 'a7': i[7], 'a8': i[8], 'a9': i[9], 'a10': i[10], 'a11': i[11], 'a12': i[12], 'a13': i[13], 'a14': i[14], 'a15': i[15], 'a16': i[16], 'a17': i[17], 'a18': i[
18], 'a19': i[19], 'a20': i[20], 'a21': i[21], 'a22': i[22], 'a23': i[23], 'a24': i[24], 'a25': i[25], 'a26': i[26], 'a27': i[27], 'a28': i[28], 'a29': i[29], 'a30': i[30], 'a31': i[31], 'a32': i[32], 'a33': i[33], 'a34': i[34], 'a35': i[35], 'a36': i[36], 'a37': i[37], 'a38': i[38], 'a39': i[39], 'a40': i[40], 'a41': i[41], 'a42': i[42], 'a43': i[43], 'a44': i[44], 'a45': i[45], 'a46': i[46], 'a47': i[47], 'a48': i[48], 'a49': i[49], 'a50': i[50], 'a51': i[51], 'a52': i[52], 'a53': i[53], 'a54': i[54], 'a55': i[55], 'a56': i[56], 'a57': i[57], 'a58': i[58], 'a59': i[59], 'a60': i[60]})
update_user_char(c2)
conn1.commit()
conn1.close()
conn2.commit()
conn2.close()
os.remove('database/old_arcaea_database.db')
# songs
if os.path.isfile("database/old_arcsong.db") and os.path.isfile("database/arcsong.db"):
conn1 = sqlite3.connect('./database/old_arcsong.db')
c1 = conn1.cursor()
conn2 = sqlite3.connect('./database/arcsong.db')
c2 = conn2.cursor()
c1.execute('''select * from songs''')
x = c1.fetchall()
if x:
for i in x:
c2.execute(
'''select exists(select * from songs where sid=:a)''', {'a': i[0]})
if c2.fetchone() == (0,):
c2.execute('''insert into songs values(:a0,:a1,:a2,:a3,:a4,:a5,:a6,:a7,:a8,:a9,:a10,:a11,:a12,:a13,:a14,:a15,:a16,:a17,:a18,:a19,:a20,:a21,:a22,:a23,:a24,:a25,:a26,:a27)''', {'a0': i[0], 'a1': i[1], 'a2': i[2], 'a3': i[3], 'a4': i[4], 'a5': i[5], 'a6': i[6], 'a7': i[7], 'a8': i[
8], 'a9': i[9], 'a10': i[10], 'a11': i[11], 'a12': i[12], 'a13': i[13], 'a14': i[14], 'a15': i[15], 'a16': i[16], 'a17': i[17], 'a18': i[18], 'a19': i[19], 'a20': i[20], 'a21': i[21], 'a22': i[22], 'a23': i[23], 'a24': i[24], 'a25': i[25], 'a26': i[26], 'a27': i[27]})
conn1.commit()
conn1.close()
conn2.commit()
conn2.close()
os.remove('database/old_arcsong.db')

View File

@@ -1,104 +0,0 @@
import time
def get_user_score(c, user_id, limit=-1):
# 返回用户的所有歌曲数据,带排名,返回字典列表
if limit >= 0:
c.execute('''select * from best_score where user_id =:a order by rating DESC limit :b''',
{'a': user_id, 'b': limit})
else:
c.execute(
'''select * from best_score where user_id =:a order by rating DESC''', {'a': user_id})
x = c.fetchall()
r = []
if x:
rank = 0
for i in x:
rank += 1
r.append({
"song_id": i[1],
"difficulty": i[2],
"score": i[3],
"shiny_perfect_count": i[4],
"perfect_count": i[5],
"near_count": i[6],
"miss_count": i[7],
"health": i[8],
"modifier": i[9],
"time_played": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(i[10])),
"best_clear_type": i[11],
"clear_type": i[12],
"rating": i[13],
"rank": rank
})
return r
def get_user(c, user_id):
# 得到user表部分用户信息返回字典
c.execute('''select * from user where user_id = :a''', {'a': user_id})
x = c.fetchone()
r = None
if x:
join_date = None
time_played = None
if x[3]:
join_date = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(int(x[3])//1000))
if x[20]:
time_played = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(int(x[20])//1000))
r = {'name': x[1],
'user_id': user_id,
'join_date': join_date,
'user_code': x[4],
'rating_ptt': x[5],
'song_id': x[11],
'difficulty': x[12],
'score': x[13],
'shiny_perfect_count': x[14],
'perfect_count': x[15],
'near_count': x[16],
'miss_count': x[17],
'time_played': time_played,
'clear_type': x[21],
'rating': x[22]
}
return r
def get_user_recent30(c, user_id):
# 获取玩家recent30信息并计算这一部分的ptt返回字典列表和一个值
c.execute('''select * from recent30 where user_id=:a''', {'a': user_id})
sumr = 0
x = c.fetchone()
r = []
if x is not None:
r30 = []
s30 = []
for i in range(1, 61, 2):
if x[i] is not None:
r30.append(float(x[i]))
s30.append(x[i+1])
else:
r30.append(0)
s30.append('')
r30, s30 = (list(t) for t in zip(*sorted(zip(r30, s30), reverse=True)))
songs = []
i = 0
while len(songs) < 10 and i <= 29 and s30[i] != '' and s30[i] is not None:
if s30[i] not in songs:
sumr += r30[i]
songs.append(s30[i])
i += 1
for i in range(0, 30):
if s30[i]:
r.append({
'song_id': s30[i][:-1],
'difficulty': int(s30[i][-1]),
'rating': r30[i]
})
return r, sumr / 10