[chunithm] Support LUMINOUS

This commit is contained in:
beerpsi
2024-06-20 08:11:24 +07:00
parent 68bf3843ec
commit 5378655c52
9 changed files with 467 additions and 8 deletions

View File

@@ -243,6 +243,36 @@ matching = Table(
mysql_charset="utf8mb4",
)
cmission = Table(
"chuni_item_cmission",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column(
"user",
ForeignKey("aime_user.id", ondelete="cascade", onupdate="cascade"),
nullable=False,
),
Column("missionId", Integer, nullable=False),
Column("point", Integer),
UniqueConstraint("user", "missionId", name="chuni_item_cmission_uk"),
mysql_charset="utf8mb4",
)
cmission_progress = Table(
"chuni_item_cmission_progress",
metadata,
Column("id", Integer, primary_key=True, nullable=False),
Column("user", ForeignKey("aime_user.id", ondelete="cascade"), nullable=False),
Column("missionId", Integer, nullable=False),
Column("order", Integer),
Column("stage", Integer),
Column("progress", Integer),
UniqueConstraint(
"user", "missionId", "order", name="chuni_item_cmission_progress_uk"
),
mysql_charset="utf8mb4",
)
class ChuniItemData(BaseData):
async def get_oldest_free_matching(self, version: int) -> Optional[Row]:
@@ -594,3 +624,66 @@ class ChuniItemData(BaseData):
)
return None
return result.lastrowid
async def put_cmission_progress(
self, user_id: int, mission_id: int, progress_data: Dict
) -> Optional[int]:
progress_data["user"] = user_id
progress_data["missionId"] = mission_id
sql = insert(cmission_progress).values(**progress_data)
conflict = sql.on_duplicate_key_update(**progress_data)
result = await self.execute(conflict)
if result is None:
return None
return result.lastrowid
async def get_cmission_progress(
self, user_id: int, mission_id: int
) -> Optional[List[Row]]:
sql = cmission_progress.select(
and_(
cmission_progress.c.user == user_id,
cmission_progress.c.missionId == mission_id,
)
).order_by(cmission_progress.c.order.asc())
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()
async def get_cmission(self, user_id: int, mission_id: int) -> Optional[Row]:
sql = cmission.select(
and_(cmission.c.user == user_id, cmission.c.missionId == mission_id)
)
result = await self.execute(sql)
if result is None:
return None
return result.fetchone()
async def put_cmission(self, user_id: int, mission_data: Dict) -> Optional[int]:
mission_data["user"] = user_id
sql = insert(cmission).values(**mission_data)
conflict = sql.on_duplicate_key_update(**mission_data)
result = await self.execute(conflict)
if result is None:
return None
return result.lastrowid
async def get_cmissions(self, user_id: int) -> Optional[List[Row]]:
sql = cmission.select(cmission.c.user == user_id)
result = await self.execute(sql)
if result is None:
return None
return result.fetchall()