[Enhance] Restrict download by user's purchase

- An option `DOWNLOAD_FORBID_WHEN_NO_ITEM` has been added to the config file to make that users cannot download the songs' files if they has not bought them when the `songlist` file exists. (Experimental)

#128
This commit is contained in:
Lost-MSth
2023-09-08 11:38:46 +08:00
parent 04c6d1a0e0
commit f373c7b052
6 changed files with 108 additions and 21 deletions

View File

@@ -48,6 +48,8 @@ class Config:
DOWNLOAD_TIMES_LIMIT = 3000 DOWNLOAD_TIMES_LIMIT = 3000
DOWNLOAD_TIME_GAP_LIMIT = 1000 DOWNLOAD_TIME_GAP_LIMIT = 1000
DOWNLOAD_FORBID_WHEN_NO_ITEM = False
LOGIN_DEVICE_NUMBER_LIMIT = 1 LOGIN_DEVICE_NUMBER_LIMIT = 1
ALLOW_LOGIN_SAME_DEVICE = False ALLOW_LOGIN_SAME_DEVICE = False
ALLOW_BAN_MULTIDEVICE_USER_AUTO = True ALLOW_BAN_MULTIDEVICE_USER_AUTO = True

View File

@@ -21,6 +21,9 @@ class Constant:
WORLD_VALUE_NAME_ENUM = ['frag', 'prog', 'over'] WORLD_VALUE_NAME_ENUM = ['frag', 'prog', 'over']
FREE_PACK_NAME = 'base'
SINGLE_PACK_NAME = 'single'
ETO_UNCAP_BONUS_PROGRESS = 7 ETO_UNCAP_BONUS_PROGRESS = 7
LUNA_UNCAP_BONUS_PROGRESS = 7 LUNA_UNCAP_BONUS_PROGRESS = 7
AYU_UNCAP_BONUS_PROGRESS = 5 AYU_UNCAP_BONUS_PROGRESS = 5

View File

@@ -26,6 +26,8 @@ class SonglistParser:
FILE_NAMES = ['0.aff', '1.aff', '2.aff', '3.aff', FILE_NAMES = ['0.aff', '1.aff', '2.aff', '3.aff',
'base.ogg', '3.ogg', 'video.mp4', 'video_audio.ogg'] 'base.ogg', '3.ogg', 'video.mp4', 'video_audio.ogg']
has_songlist = False
songs: dict = {} # {song_id: value, ...} songs: dict = {} # {song_id: value, ...}
# value: bit 76543210 # value: bit 76543210
# 7: video_audio.ogg # 7: video_audio.ogg
@@ -37,6 +39,10 @@ class SonglistParser:
# 1: 1.aff # 1: 1.aff
# 0: 0.aff # 0: 0.aff
pack_info: 'dict[str, set]' = {} # {pack_id: {song_id, ...}, ...}
free_songs: set = set() # {song_id, ...}
world_songs: set = set() # {world_song_id, ...}
def __init__(self, path=Constant.SONGLIST_FILE_PATH) -> None: def __init__(self, path=Constant.SONGLIST_FILE_PATH) -> None:
self.path = path self.path = path
self.data: list = [] self.data: list = []
@@ -54,10 +60,31 @@ class SonglistParser:
return True return True
return False return False
@staticmethod
def get_user_unlocks(user) -> set:
'''user: UserInfo类或子类的实例'''
x = SonglistParser
if user is None:
return set()
r = set()
for i in user.packs:
if i in x.pack_info:
r.update(x.pack_info[i])
if Constant.SINGLE_PACK_NAME in x.pack_info:
r.update(x.pack_info[Constant.SINGLE_PACK_NAME]
& set(user.singles))
r.update(set(i if i[-1] != '3' else i[:-1]
for i in (x.world_songs & set(user.world_songs))))
r.update(x.free_songs)
return r
def parse_one(self, song: dict) -> dict: def parse_one(self, song: dict) -> dict:
'''解析单个歌曲''' '''解析单个歌曲'''
if not 'id' in song: if not 'id' in song:
return None return {}
r = 0 r = 0
if 'remote_dl' in song and song['remote_dl']: if 'remote_dl' in song and song['remote_dl']:
r |= 16 r |= 16
@@ -76,21 +103,42 @@ class SonglistParser:
r |= 128 r |= 128
return {song['id']: r} return {song['id']: r}
def parse_one_unlock(self, song: dict) -> None:
'''解析单个歌曲解锁方式'''
if not 'id' in song or not 'set' in song or not 'purchase' in song:
return {}
x = SonglistParser
if Constant.FREE_PACK_NAME == song['set']:
if any(i['ratingClass'] == 3 for i in song.get('difficulties', [])):
x.world_songs.add(song['id'] + '3')
x.free_songs.add(song['id'])
return None
if song.get('world_unlock', False):
x.world_songs.add(song['id'])
if song['purchase'] == '':
return None
x.pack_info.setdefault(song['set'], set()).add(song['id'])
def parse(self) -> None: def parse(self) -> None:
'''解析songlist文件''' '''解析songlist文件'''
if not os.path.isfile(self.path): if not os.path.isfile(self.path):
return return
with open(self.path, 'r', encoding='utf-8') as f: with open(self.path, 'r', encoding='utf-8') as f:
self.data = loads(f.read()).get('songs', []) self.data = loads(f.read()).get('songs', [])
self.has_songlist = True
for x in self.data: for x in self.data:
self.songs.update(self.parse_one(x)) self.songs.update(self.parse_one(x))
self.parse_one_unlock(x)
class UserDownload: class UserDownload:
''' '''
用户下载类 用户下载类
properties: `user` - `User`类或子类的实例 properties: `user` - `UserInfo`类或子类的实例
''' '''
limiter = ArcLimiter( limiter = ArcLimiter(
@@ -198,6 +246,10 @@ class DownloadList(UserDownload):
DownloadList.get_one_song_file_names.cache_clear() DownloadList.get_one_song_file_names.cache_clear()
DownloadList.get_all_song_ids.cache_clear() DownloadList.get_all_song_ids.cache_clear()
SonglistParser.songs = {} SonglistParser.songs = {}
SonglistParser.pack_info = {}
SonglistParser.free_songs = set()
SonglistParser.world_songs = set()
SonglistParser.has_songlist = False
def clear_download_token(self) -> None: def clear_download_token(self) -> None:
'''清除过期下载链接''' '''清除过期下载链接'''
@@ -277,9 +329,19 @@ class DownloadList(UserDownload):
if not self.song_ids: if not self.song_ids:
self.song_ids = self.get_all_song_ids() self.song_ids = self.get_all_song_ids()
if Config.DOWNLOAD_FORBID_WHEN_NO_ITEM and SonglistParser.has_songlist:
# 没有歌曲时不允许下载
self.song_ids = list(SonglistParser.get_user_unlocks(
self.user) & set(self.song_ids))
for i in self.song_ids: for i in self.song_ids:
self.add_one_song(i) self.add_one_song(i)
else: else:
if Config.DOWNLOAD_FORBID_WHEN_NO_ITEM and SonglistParser.has_songlist:
# 没有歌曲时不允许下载
self.song_ids = list(SonglistParser.get_user_unlocks(
self.user) & set(self.song_ids))
for i in self.song_ids: for i in self.song_ids:
if os.path.isdir(os.path.join(Constant.SONG_FILE_FOLDER_PATH, i)): if os.path.isdir(os.path.join(Constant.SONG_FILE_FOLDER_PATH, i)):
self.add_one_song(i) self.add_one_song(i)

View File

@@ -1,12 +1,27 @@
{ {
"songs": [ "songs": [
{
"id": "dement",
"difficulties": [
{ {
"ratingClass": 3 "id": "dement",
"set": "base",
"purchase": "",
"difficulties": [
{
"ratingClass": 0,
"rating": 3
},
{
"ratingClass": 1,
"rating": 6
},
{
"ratingClass": 2,
"rating": 7
},
{
"ratingClass": 3,
"rating": 9
}
]
} }
] ]
}
]
} }

View File

@@ -1,4 +1,4 @@
import binascii # import binascii
import logging import logging
import socketserver import socketserver
import threading import threading
@@ -56,9 +56,13 @@ class UDP_handler(socketserver.BaseRequestHandler):
class TCP_handler(socketserver.StreamRequestHandler): class TCP_handler(socketserver.StreamRequestHandler):
def handle(self): def handle(self):
self.data = self.rfile.readline().strip() try:
self.data = self.rfile.readline().strip()
message = self.data.decode('utf-8')
except Exception as e:
logging.error(e)
return None
message = self.data.decode('utf-8')
if Config.DEBUG: if Config.DEBUG:
logging.info(f'TCP-From-{self.client_address[0]}-{message}') logging.info(f'TCP-From-{self.client_address[0]}-{message}')
data = message.split('|') data = message.split('|')

View File

@@ -30,15 +30,16 @@ def game_info():
@auth_required(request) @auth_required(request)
@arc_try @arc_try
def download_song(user_id): def download_song(user_id):
with Connect(in_memory=True) as c: with Connect(in_memory=True) as c_m:
x = DownloadList(c, UserOnline(None, user_id)) with Connect() as c:
x.song_ids = request.args.getlist('sid') x = DownloadList(c_m, UserOnline(c, user_id))
x.url_flag = json.loads(request.args.get('url', 'true')) x.song_ids = request.args.getlist('sid')
if x.url_flag and x.is_limited: x.url_flag = json.loads(request.args.get('url', 'true'))
raise RateLimit('You have reached the download limit.', 903) if x.url_flag and x.is_limited:
raise RateLimit('You have reached the download limit.', 903)
x.add_songs() x.add_songs()
return success_return(x.urls) return success_return(x.urls)
@bp.route('/finale/progress', methods=['GET']) @bp.route('/finale/progress', methods=['GET'])