Files
Arcaea-server/latest version/core/sql.py
Lost-MSth 8f1c28eb5c [Enhance] API & LinkPlay & something else
- Add song names for getting the score list (For API)
- Try to ensure thread safety in Link Play (twice again)
- Small change about database timeout
- Have a try to use `nginx` and `gevent` (Yes. I ensure it works well. This will not be viewed as a formal and necessary update.)
2022-10-03 23:46:52 +08:00

220 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import traceback
from flask import current_app
from .constant import Constant
from .error import ArcError, InputError
class Connect:
# 数据库连接类,上下文管理
def __init__(self, file_path=Constant.SQLITE_DATABASE_PATH):
"""
数据库连接默认连接arcaea_database.db\
接受:文件路径\
返回sqlite3连接操作对象
"""
self.file_path = file_path
def __enter__(self):
self.conn = sqlite3.connect(self.file_path, timeout=10)
self.c = self.conn.cursor()
return self.c
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
flag = True
if exc_type is not None:
if issubclass(exc_type, ArcError):
flag = False
else:
if self.conn:
self.conn.rollback()
current_app.logger.error(
traceback.format_exception(exc_type, exc_val, exc_tb))
if self.conn:
self.conn.commit()
self.conn.close()
return flag
class Query:
'''查询参数类'''
def __init__(self, query_able: list = None, quzzy_query_able: list = None, sort_able: list = None) -> None:
self.query_able: list = query_able
self.quzzy_query_able: list = quzzy_query_able
self.sort_able: list = sort_able
self.__limit: int = -1
self.__offset: int = 0
self.__query: dict = {} # {'name': 'admin'}
self.__fuzzy_query: dict = {} # {'name': 'dmi'}
# [{'column': 'user_id', 'order': 'ASC'}, ...]
self.__sort: list = []
@property
def limit(self) -> int:
return self.__limit
@limit.setter
def limit(self, limit: int) -> None:
if not isinstance(limit, int):
raise InputError(api_error_code=-101)
self.__limit = limit
@property
def offset(self) -> int:
return self.__offset
@offset.setter
def offset(self, offset: int) -> None:
if not isinstance(offset, int):
raise InputError(api_error_code=-101)
self.__offset = offset
@property
def query(self) -> dict:
return self.__query
@query.setter
def query(self, query: dict) -> None:
self.__query = {}
self.query_append(query)
def query_append(self, query: dict) -> None:
if not isinstance(query, dict):
raise InputError(api_error_code=-101)
if self.query_able is not None and query and not set(query).issubset(set(self.query_able)):
raise InputError(api_error_code=-102)
if not self.__query:
self.__query = query
else:
self.__query.update(query)
@property
def fuzzy_query(self) -> dict:
return self.__fuzzy_query
@fuzzy_query.setter
def fuzzy_query(self, fuzzy_query: dict) -> None:
self.__fuzzy_query = {}
self.fuzzy_query_append(fuzzy_query)
def fuzzy_query_append(self, fuzzy_query: dict) -> None:
if not isinstance(fuzzy_query, dict):
raise InputError(api_error_code=-101)
if self.quzzy_query_able is not None and fuzzy_query and not set(fuzzy_query).issubset(set(self.quzzy_query_able)):
raise InputError(api_error_code=-102)
if not self.__fuzzy_query:
self.__fuzzy_query = fuzzy_query
else:
self.__fuzzy_query.update(fuzzy_query)
@property
def sort(self) -> list:
return self.__sort
@sort.setter
def sort(self, sort: list) -> None:
if not isinstance(sort, list):
raise InputError(api_error_code=-101)
if self.sort_able is not None and sort:
for x in sort:
if not isinstance(x, dict):
raise InputError(api_error_code=-101)
if 'column' not in x or x['column'] not in self.sort_able:
raise InputError(api_error_code=-103)
if 'order' not in x:
x['order'] = 'ASC'
else:
if x['order'] not in ['ASC', 'DESC']:
raise InputError(api_error_code=-104)
self.__sort = sort
def set_value(self, limit=-1, offset=0, query={}, fuzzy_query={}, sort=[]) -> None:
self.limit = limit
self.offset = offset
self.query = query
self.fuzzy_query = fuzzy_query
self.sort = sort
def from_data(self, d: dict) -> 'Query':
self.set_value(d.get('limit', -1), d.get('offset', 0),
d.get('query', {}), d.get('fuzzy_query', {}), d.get('sort', []))
return self
class Sql:
'''
数据库增查删改类
'''
def __init__(self, c=None) -> None:
self.c = c
def select(self, table_name: str, target_column: 'list' = [], query: 'Query' = None) -> list:
'''单表内行查询单句sql语句返回fetchall数据'''
sql = 'select '
sql_list = []
if len(target_column) >= 2:
sql += target_column[0]
for i in range(1, len(target_column)):
sql += ',' + target_column[i]
sql += ' from ' + table_name
elif len(target_column) == 1:
sql += target_column[0] + ' from ' + table_name
else:
sql += '* from ' + table_name
if query is None:
self.c.execute(sql)
return self.c.fetchall()
where_key = []
where_like_key = []
for i in query.query:
where_key.append(i)
sql_list.append(query.query[i])
for i in query.fuzzy_query:
where_like_key.append(i)
sql_list.append('%' + query.fuzzy_query[i] + '%')
if where_key or where_like_key:
sql += ' where '
if where_key:
sql += where_key[0] + '=?'
if len(where_key) >= 2:
for i in range(1, len(where_key)):
sql += ' and ' + where_key[i] + '=?'
if where_like_key:
for i in where_like_key:
sql += ' and ' + i + ' like ?'
else:
sql += where_like_key[0] + ' like ?'
if len(where_like_key) >= 2:
for i in range(1, len(where_key)):
sql += ' and ' + where_key[i] + ' like ?'
if query.sort:
sql += ' order by ' + \
query.sort[0]['column'] + ' ' + query.sort[0]['order']
for i in range(1, len(query.sort)):
sql += ', ' + query.sort[i]['column'] + \
' ' + query.sort[i]['order']
if query.limit >= 0:
sql += ' limit ? offset ?'
sql_list.append(query.limit)
sql_list.append(query.offset)
self.c.execute(sql, sql_list)
return self.c.fetchall()