Improve Redis key handling and spectator sync logic
Enhances Redis key type checks and cleanup in message system, adds periodic cleanup task, and improves error handling for Redis operations. Refines multiplayer and spectator hub logic to better synchronize player states and prevent invalid spectator sessions. Adds more detailed logging for channel/user join/leave events and spectator watch requests.
This commit is contained in:
@@ -92,6 +92,19 @@ async def send_message(
|
|||||||
channel_name = db_channel.name
|
channel_name = db_channel.name
|
||||||
user_id = current_user.id
|
user_id = current_user.id
|
||||||
|
|
||||||
|
# 对于多人游戏房间,在发送消息前进行Redis键检查
|
||||||
|
if channel_type == ChannelType.MULTIPLAYER:
|
||||||
|
try:
|
||||||
|
from app.dependencies.database import get_redis
|
||||||
|
redis = get_redis()
|
||||||
|
key = f"channel:{channel_id}:messages"
|
||||||
|
key_type = await redis.type(key)
|
||||||
|
if key_type not in ["none", "zset"]:
|
||||||
|
logger.warning(f"Fixing Redis key {key} with wrong type: {key_type}")
|
||||||
|
await redis.delete(key)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to check/fix Redis key for channel {channel_id}: {e}")
|
||||||
|
|
||||||
# 使用 Redis 消息系统发送消息 - 立即返回
|
# 使用 Redis 消息系统发送消息 - 立即返回
|
||||||
resp = await redis_message_system.send_message(
|
resp = await redis_message_system.send_message(
|
||||||
channel_id=channel_id,
|
channel_id=channel_id,
|
||||||
|
|||||||
@@ -76,6 +76,20 @@ class ChatServer:
|
|||||||
async def broadcast(self, channel_id: int, event: ChatEvent):
|
async def broadcast(self, channel_id: int, event: ChatEvent):
|
||||||
users_in_channel = self.channels.get(channel_id, [])
|
users_in_channel = self.channels.get(channel_id, [])
|
||||||
logger.info(f"Broadcasting to channel {channel_id}, users: {users_in_channel}")
|
logger.info(f"Broadcasting to channel {channel_id}, users: {users_in_channel}")
|
||||||
|
|
||||||
|
# 如果频道中没有用户,检查是否是多人游戏频道
|
||||||
|
if not users_in_channel:
|
||||||
|
try:
|
||||||
|
async with with_db() as session:
|
||||||
|
from sqlmodel import select
|
||||||
|
channel = await session.get(ChatChannel, channel_id)
|
||||||
|
if channel and channel.type == ChannelType.MULTIPLAYER:
|
||||||
|
logger.warning(f"No users in multiplayer channel {channel_id}, message will not be delivered to anyone")
|
||||||
|
# 对于多人游戏房间,这可能是正常的(用户都离开了房间)
|
||||||
|
# 但我们仍然记录这个情况以便调试
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to check channel type for {channel_id}: {e}")
|
||||||
|
|
||||||
for user_id in users_in_channel:
|
for user_id in users_in_channel:
|
||||||
await self.send_event(user_id, event)
|
await self.send_event(user_id, event)
|
||||||
logger.debug(f"Sent event to user {user_id} in channel {channel_id}")
|
logger.debug(f"Sent event to user {user_id} in channel {channel_id}")
|
||||||
@@ -199,12 +213,15 @@ class ChatServer:
|
|||||||
# 使用明确的查询避免延迟加载
|
# 使用明确的查询避免延迟加载
|
||||||
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first()
|
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first()
|
||||||
if db_channel is None:
|
if db_channel is None:
|
||||||
|
logger.warning(f"Attempted to join non-existent channel {channel_id} by user {user_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
user = await session.get(User, user_id)
|
user = await session.get(User, user_id)
|
||||||
if user is None:
|
if user is None:
|
||||||
|
logger.warning(f"Attempted to join channel {channel_id} by non-existent user {user_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
logger.info(f"User {user_id} joining channel {channel_id} (type: {db_channel.type.value})")
|
||||||
await self.join_channel(user, db_channel, session)
|
await self.join_channel(user, db_channel, session)
|
||||||
|
|
||||||
async def leave_room_channel(self, channel_id: int, user_id: int):
|
async def leave_room_channel(self, channel_id: int, user_id: int):
|
||||||
@@ -212,12 +229,15 @@ class ChatServer:
|
|||||||
# 使用明确的查询避免延迟加载
|
# 使用明确的查询避免延迟加载
|
||||||
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first()
|
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == channel_id))).first()
|
||||||
if db_channel is None:
|
if db_channel is None:
|
||||||
|
logger.warning(f"Attempted to leave non-existent channel {channel_id} by user {user_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
user = await session.get(User, user_id)
|
user = await session.get(User, user_id)
|
||||||
if user is None:
|
if user is None:
|
||||||
|
logger.warning(f"Attempted to leave channel {channel_id} by non-existent user {user_id}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
logger.info(f"User {user_id} leaving channel {channel_id} (type: {db_channel.type.value})")
|
||||||
await self.leave_channel(user, db_channel, session)
|
await self.leave_channel(user, db_channel, session)
|
||||||
|
|
||||||
async def new_private_notification(self, detail: NotificationDetail):
|
async def new_private_notification(self, detail: NotificationDetail):
|
||||||
|
|||||||
@@ -244,23 +244,52 @@ class RedisMessageSystem:
|
|||||||
# 清理可能存在的错误类型键,然后添加到频道消息列表(按时间排序)
|
# 清理可能存在的错误类型键,然后添加到频道消息列表(按时间排序)
|
||||||
channel_messages_key = f"channel:{channel_id}:messages"
|
channel_messages_key = f"channel:{channel_id}:messages"
|
||||||
|
|
||||||
# 检查键的类型,如果不是 zset 类型则删除
|
# 更健壮的键类型检查和清理
|
||||||
try:
|
try:
|
||||||
key_type = await self._redis_exec(self.redis.type, channel_messages_key)
|
key_type = await self._redis_exec(self.redis.type, channel_messages_key)
|
||||||
if key_type and key_type != "zset":
|
if key_type == "none":
|
||||||
|
# 键不存在,这是正常的
|
||||||
|
pass
|
||||||
|
elif key_type != "zset":
|
||||||
|
# 键类型错误,需要清理
|
||||||
logger.warning(f"Deleting Redis key {channel_messages_key} with wrong type: {key_type}")
|
logger.warning(f"Deleting Redis key {channel_messages_key} with wrong type: {key_type}")
|
||||||
await self._redis_exec(self.redis.delete, channel_messages_key)
|
await self._redis_exec(self.redis.delete, channel_messages_key)
|
||||||
|
|
||||||
|
# 验证删除是否成功
|
||||||
|
verify_type = await self._redis_exec(self.redis.type, channel_messages_key)
|
||||||
|
if verify_type != "none":
|
||||||
|
logger.error(f"Failed to delete problematic key {channel_messages_key}, type is still {verify_type}")
|
||||||
|
# 强制删除
|
||||||
|
await self._redis_exec(self.redis.unlink, channel_messages_key)
|
||||||
|
|
||||||
except Exception as type_check_error:
|
except Exception as type_check_error:
|
||||||
logger.warning(f"Failed to check key type for {channel_messages_key}: {type_check_error}")
|
logger.warning(f"Failed to check key type for {channel_messages_key}: {type_check_error}")
|
||||||
# 如果检查失败,直接删除键以确保清理
|
# 如果检查失败,尝试强制删除键以确保清理
|
||||||
await self._redis_exec(self.redis.delete, channel_messages_key)
|
try:
|
||||||
|
await self._redis_exec(self.redis.delete, channel_messages_key)
|
||||||
|
except Exception:
|
||||||
|
# 最后的努力:使用unlink
|
||||||
|
try:
|
||||||
|
await self._redis_exec(self.redis.unlink, channel_messages_key)
|
||||||
|
except Exception as final_error:
|
||||||
|
logger.error(f"Critical: Unable to clear problematic key {channel_messages_key}: {final_error}")
|
||||||
|
|
||||||
# 添加到频道消息列表(sorted set)
|
# 添加到频道消息列表(sorted set)
|
||||||
await self._redis_exec(
|
try:
|
||||||
self.redis.zadd,
|
await self._redis_exec(
|
||||||
channel_messages_key,
|
self.redis.zadd,
|
||||||
{f"msg:{channel_id}:{message_id}": message_id},
|
channel_messages_key,
|
||||||
)
|
{f"msg:{channel_id}:{message_id}": message_id},
|
||||||
|
)
|
||||||
|
except Exception as zadd_error:
|
||||||
|
logger.error(f"Failed to add message to sorted set {channel_messages_key}: {zadd_error}")
|
||||||
|
# 如果添加失败,再次尝试清理并重试
|
||||||
|
await self._redis_exec(self.redis.delete, channel_messages_key)
|
||||||
|
await self._redis_exec(
|
||||||
|
self.redis.zadd,
|
||||||
|
channel_messages_key,
|
||||||
|
{f"msg:{channel_id}:{message_id}": message_id},
|
||||||
|
)
|
||||||
|
|
||||||
# 保持频道消息列表大小(最多1000条)
|
# 保持频道消息列表大小(最多1000条)
|
||||||
await self._redis_exec(self.redis.zremrangebyrank, channel_messages_key, 0, -1001)
|
await self._redis_exec(self.redis.zremrangebyrank, channel_messages_key, 0, -1001)
|
||||||
@@ -516,6 +545,8 @@ class RedisMessageSystem:
|
|||||||
self._batch_timer = asyncio.create_task(self._batch_persist_to_database())
|
self._batch_timer = asyncio.create_task(self._batch_persist_to_database())
|
||||||
# 启动时初始化消息ID计数器
|
# 启动时初始化消息ID计数器
|
||||||
bg_tasks.add_task(self._initialize_message_counter)
|
bg_tasks.add_task(self._initialize_message_counter)
|
||||||
|
# 启动定期清理任务
|
||||||
|
bg_tasks.add_task(self._periodic_cleanup)
|
||||||
logger.info("Redis message system started")
|
logger.info("Redis message system started")
|
||||||
|
|
||||||
async def _initialize_message_counter(self):
|
async def _initialize_message_counter(self):
|
||||||
@@ -553,25 +584,67 @@ class RedisMessageSystem:
|
|||||||
keys_pattern = "channel:*:messages"
|
keys_pattern = "channel:*:messages"
|
||||||
keys = await self._redis_exec(self.redis.keys, keys_pattern)
|
keys = await self._redis_exec(self.redis.keys, keys_pattern)
|
||||||
|
|
||||||
|
fixed_count = 0
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if isinstance(key, bytes):
|
if isinstance(key, bytes):
|
||||||
key = key.decode("utf-8")
|
key = key.decode("utf-8")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
key_type = await self._redis_exec(self.redis.type, key)
|
key_type = await self._redis_exec(self.redis.type, key)
|
||||||
if key_type and key_type != "zset":
|
if key_type == "none":
|
||||||
|
# 键不存在,正常情况
|
||||||
|
continue
|
||||||
|
elif key_type != "zset":
|
||||||
logger.warning(f"Cleaning up Redis key {key} with wrong type: {key_type}")
|
logger.warning(f"Cleaning up Redis key {key} with wrong type: {key_type}")
|
||||||
await self._redis_exec(self.redis.delete, key)
|
await self._redis_exec(self.redis.delete, key)
|
||||||
|
|
||||||
|
# 验证删除是否成功
|
||||||
|
verify_type = await self._redis_exec(self.redis.type, key)
|
||||||
|
if verify_type != "none":
|
||||||
|
logger.error(f"Failed to delete problematic key {key}, trying unlink...")
|
||||||
|
await self._redis_exec(self.redis.unlink, key)
|
||||||
|
|
||||||
|
fixed_count += 1
|
||||||
except Exception as cleanup_error:
|
except Exception as cleanup_error:
|
||||||
logger.warning(f"Failed to cleanup key {key}: {cleanup_error}")
|
logger.warning(f"Failed to cleanup key {key}: {cleanup_error}")
|
||||||
# 强制删除问题键
|
# 强制删除问题键
|
||||||
await self._redis_exec(self.redis.delete, key)
|
try:
|
||||||
|
await self._redis_exec(self.redis.delete, key)
|
||||||
|
fixed_count += 1
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
await self._redis_exec(self.redis.unlink, key)
|
||||||
|
fixed_count += 1
|
||||||
|
except Exception as final_error:
|
||||||
|
logger.error(f"Critical: Unable to clear problematic key {key}: {final_error}")
|
||||||
|
|
||||||
logger.info("Redis keys cleanup completed")
|
if fixed_count > 0:
|
||||||
|
logger.info(f"Redis keys cleanup completed, fixed {fixed_count} keys")
|
||||||
|
else:
|
||||||
|
logger.debug("Redis keys cleanup completed, no issues found")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to cleanup Redis keys: {e}")
|
logger.error(f"Failed to cleanup Redis keys: {e}")
|
||||||
|
|
||||||
|
async def _periodic_cleanup(self):
|
||||||
|
"""定期清理任务"""
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
# 每5分钟执行一次清理
|
||||||
|
await asyncio.sleep(300)
|
||||||
|
if not self._running:
|
||||||
|
break
|
||||||
|
|
||||||
|
logger.debug("Running periodic Redis keys cleanup...")
|
||||||
|
await self._cleanup_redis_keys()
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Periodic cleanup error: {e}")
|
||||||
|
# 出错后等待1分钟再重试
|
||||||
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""停止系统"""
|
"""停止系统"""
|
||||||
if self._running:
|
if self._running:
|
||||||
|
|||||||
@@ -1018,6 +1018,18 @@ class MultiplayerHub(Hub[MultiplayerClientState]):
|
|||||||
played_user,
|
played_user,
|
||||||
ex=3600,
|
ex=3600,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Ensure spectator hub is aware of all active players for the new game.
|
||||||
|
# This helps spectators receive score data for every participant,
|
||||||
|
# especially in subsequent rounds where state may get out of sync.
|
||||||
|
for room_user in room.room.users:
|
||||||
|
if (client := self.get_client_by_id(str(room_user.user_id))) is not None:
|
||||||
|
try:
|
||||||
|
await self._sync_with_spectator_hub(client, room)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(
|
||||||
|
f"[MultiplayerHub] Failed to resync spectator hub for user {room_user.user_id}: {e}"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
await room.queue.finish_current_item()
|
await room.queue.finish_current_item()
|
||||||
|
|
||||||
|
|||||||
@@ -278,7 +278,19 @@ class SpectatorHub(Hub[StoreClientState]):
|
|||||||
user_id = int(client.connection_id)
|
user_id = int(client.connection_id)
|
||||||
store = self.get_or_create_state(client)
|
store = self.get_or_create_state(client)
|
||||||
if store.state is not None:
|
if store.state is not None:
|
||||||
return
|
logger.warning(f"[SpectatorHub] User {user_id} began new session without ending previous one; cleaning up")
|
||||||
|
try:
|
||||||
|
await self._end_session(user_id, store.state, store)
|
||||||
|
from app.router.private.stats import remove_playing_user
|
||||||
|
|
||||||
|
bg_tasks.add_task(remove_playing_user, user_id)
|
||||||
|
finally:
|
||||||
|
store.state = None
|
||||||
|
store.beatmap_status = None
|
||||||
|
store.checksum = None
|
||||||
|
store.ruleset_id = None
|
||||||
|
store.score_token = None
|
||||||
|
store.score = None
|
||||||
if state.beatmap_id is None or state.ruleset_id is None:
|
if state.beatmap_id is None or state.ruleset_id is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -540,27 +552,33 @@ class SpectatorHub(Hub[StoreClientState]):
|
|||||||
try:
|
try:
|
||||||
# Get target user's current state if it exists
|
# Get target user's current state if it exists
|
||||||
target_store = self.state.get(target_id)
|
target_store = self.state.get(target_id)
|
||||||
if target_store and target_store.state:
|
if not target_store or not target_store.state:
|
||||||
# CRITICAL FIX: Only send state if user is actually playing
|
logger.info(f"[SpectatorHub] Rejecting watch request for {target_id}: user not playing")
|
||||||
# Don't send state for finished/quit games
|
raise InvokeException("Target user is not currently playing")
|
||||||
if target_store.state.state == SpectatedUserState.Playing:
|
|
||||||
logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending state")
|
if target_store.state.state != SpectatedUserState.Playing:
|
||||||
# Send current state to the watcher immediately
|
logger.info(
|
||||||
await self.call_noblock(
|
f"[SpectatorHub] Rejecting watch request for {target_id}: state is {target_store.state.state}"
|
||||||
client,
|
)
|
||||||
"UserBeganPlaying",
|
raise InvokeException("Target user is not currently playing")
|
||||||
target_id,
|
|
||||||
target_store.state,
|
logger.debug(f"[SpectatorHub] {target_id} is currently playing, sending state")
|
||||||
)
|
# Send current state to the watcher immediately
|
||||||
else:
|
await self.call_noblock(
|
||||||
logger.debug(
|
client,
|
||||||
f"[SpectatorHub] {target_id} state is {target_store.state.state}, not sending to watcher"
|
"UserBeganPlaying",
|
||||||
)
|
target_id,
|
||||||
|
target_store.state,
|
||||||
|
)
|
||||||
|
except InvokeException:
|
||||||
|
# Re-raise to inform caller without adding to group
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# User isn't tracked or error occurred - this is not critical
|
# User isn't tracked or error occurred - this is not critical
|
||||||
logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}")
|
logger.debug(f"[SpectatorHub] Could not get state for {target_id}: {e}")
|
||||||
|
raise InvokeException("Target user is not currently playing") from e
|
||||||
|
|
||||||
# Add watcher to our tracked users
|
# Add watcher to our tracked users only after validation
|
||||||
store = self.get_or_create_state(client)
|
store = self.get_or_create_state(client)
|
||||||
store.watched_user.add(target_id)
|
store.watched_user.add(target_id)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user