diff --git a/app/router/lio.py b/app/router/lio.py index 17952f8..4fb3261 100644 --- a/app/router/lio.py +++ b/app/router/lio.py @@ -20,10 +20,62 @@ from app.fetcher import Fetcher from app.models.multiplayer_hub import PlaylistItem as HubPlaylistItem from app.models.room import MatchType, QueueMode, RoomStatus from app.utils import utcnow +from app.database.chat import ChatChannel, ChannelType # ChatChannel 模型 & 枚举 +from .notification.server import server router = APIRouter(prefix="/_lio", tags=["LIO"]) +async def _ensure_room_chat_channel( + db: Database, + room: Room, + host_user_id: int, +) -> ChatChannel: + """ + 为房间创建/确保存在对应的聊天频道,channel_id 与 room.channel_id 保持一致, + 名称使用 mp_{room.id}(可按需调整)。 + """ + # 1) 按 channel_id 查是否已存在 + ch = (await db.exec( + select(ChatChannel).where(ChatChannel.channel_id == room.channel_id) + )).first() + + if ch is None: + # 确保为房间分配一个有效的 channel_id(ChatChannel.channel_id 需要 int) + if room.channel_id is None: + channel_id_value = await _alloc_channel_id(db) + # 同步回写到房间以保证二者一致 + room.channel_id = channel_id_value + db.add(room) + else: + channel_id_value = int(room.channel_id) + + ch = ChatChannel( + channel_id=channel_id_value, # 与房间绑定的同一 channel_id(确保为 int) + name=f"mp_{room.id}", # 频道名可自定义(注意唯一性) + description=f"Multiplayer room {room.id} chat", + type=ChannelType.MULTIPLAYER, + ) + db.add(ch) + await db.commit() + await db.refresh(ch) + + # 2) (可选)把房主加入频道 & 触发在线侧同步 + # 如果你有 server 并希望立即让房主加入聊天频道,可取消注释以下代码 + """ + try: + from app.router.v2.chat import server # 视你的项目实际路径调整 + host_user = await db.get(User, host_user_id) + # server.batch_join_channel 接口签名:([users], channel, session) + await server.batch_join_channel([host_user], ch, db) + await db.commit() + except Exception as e: + # 不中断主流程,打日志即可 + print(f"Warning: failed to join host {host_user_id} to chat channel {ch.channel_id}: {e}") + """ + + return ch + async def _alloc_channel_id(db: Database) -> int: """ @@ -396,6 +448,12 @@ async def create_multiplayer_room( room_id = room.id try: + channel = await _ensure_room_chat_channel(db, room, host_user_id) + + # 让房主加入频道 + host_user = await db.get(User, host_user_id) + if host_user: + await server.batch_join_channel([host_user], channel, db) # Add playlist items await _add_playlist_items(db, room_id, room_data, host_user_id) @@ -531,7 +589,7 @@ async def remove_user_from_room( detail="Room not found" ) - room_owner_id, room_status, current_participant_count, ends_at = room_query + room_owner_id, room_status, current_participant_count, ends_at, channel_id = room_query # 如果房间已经结束,直接返回 if ends_at is not None: @@ -553,6 +611,15 @@ async def remove_user_from_room( # 用户不在房间中,检查房间是否需要结束(幂等操作) room_ended = await _end_room_if_empty(db, room_id) await db.commit() + + try: + if channel_id: + await server.leave_room_channel(int(channel_id), int(user_id)) + if room_ended: + server.channels.pop(int(channel_id), None) + except Exception as e: + print(f"[warn] failed to leave user {user_id} from channel {channel_id}: {e}") + return {"success": True, "room_ended": room_ended} # 标记用户离开房间 @@ -578,6 +645,16 @@ async def remove_user_from_room( await db.commit() print(f"Successfully removed user {user_id} from room {room_id}, room_ended: {room_ended}") + + # ===== 新增:提交后,把用户从聊天频道移除;若房间已结束,清理内存频道 ===== + try: + if channel_id: + await server.leave_room_channel(int(channel_id), int(user_id)) + if room_ended: + server.channels.pop(int(channel_id), None) + except Exception as e: + print(f"[warn] failed to leave user {user_id} from channel {channel_id}: {e}") + return {"success": True, "room_ended": room_ended} except HTTPException: @@ -622,37 +699,53 @@ async def add_user_to_room( # 检查房间是否已结束 room_result = await db.execute( - select(Room.id, Room.ends_at) + select(Room.id, Room.ends_at, Room.channel_id, Room.host_id) .where(col(Room.id) == room_id) ) - room_query = room_result.first() - - if not room_query: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Room not found" - ) - - if room_query[1] is not None: # ends_at is not None - print(f"User {user_id} attempted to join ended room {room_id}") - raise HTTPException( - status_code=status.HTTP_410_GONE, - detail="Room has ended and cannot accept new participants" - ) + room_row = room_result.first() + if not room_row: + raise HTTPException(status_code=404, detail="Room not found") - # Verify room exists and check password + _, ends_at, channel_id, host_user_id = room_row + if ends_at is not None: + print(f"User {user_id} attempted to join ended room {room_id}") + raise HTTPException(status_code=410, detail="Room has ended and cannot accept new participants") + + # Verify room password provided_password = user_data.get("password") if user_data else None print(f"Verifying room {room_id} with password: {provided_password}") await _verify_room_password(db, room_id, provided_password) # Add or update participant await _add_or_update_participant(db, room_id, user_id) - # Update participant count await _update_room_participant_count(db, room_id) - + + # 先提交 DB 状态,确保参与关系已生效 await db.commit() print(f"Successfully added user {user_id} to room {room_id}") + + # ===== 新增:确保有聊天频道并把用户加入 ===== + try: + # 若房间还没分配/创建频道,补建并同步回写 + if not channel_id: + room = await db.get(Room, room_id) + if room is None: + raise HTTPException(status_code=404, detail="Room not found") + await _ensure_room_chat_channel(db, room, host_user_id) + await db.refresh(room) + channel_id = room.channel_id + + if channel_id: + # 加入聊天频道 → 内存注册 + 给在线客户端发 chat.channel.join + await server.join_room_channel(int(channel_id), int(user_id)) + else: + # 理论上不会发生;留日志以便排查 + print(f"[warn] Room {room_id} has no channel_id after ensure.") + except Exception as e: + # 不影响加入房间主流程,仅记录 + print(f"[warn] failed to join user {user_id} to channel of room {room_id}: {e}") + return {"success": True} diff --git a/app/router/notification/message.py b/app/router/notification/message.py index 187203e..006dfcf 100644 --- a/app/router/notification/message.py +++ b/app/router/notification/message.py @@ -149,49 +149,77 @@ async def send_message( "/chat/channels/{channel}/messages", response_model=list[ChatMessageResp], name="获取消息", - description="获取指定频道的消息列表。", + description="获取指定频道的消息列表(统一按时间正序返回)。", tags=["聊天"], ) async def get_message( session: Database, channel: str, limit: int = Query(50, ge=1, le=50, description="获取消息的数量"), - since: int = Query(default=0, ge=0, description="获取自此消息 ID 之后的消息记录"), - until: int | None = Query(None, description="获取自此消息 ID 之前的消息记录"), + since: int = Query(0, ge=0, description="获取自此消息 ID 之后的消息(向前加载新消息)"), + until: int | None = Query(None, description="获取自此消息 ID 之前的消息(向后翻历史)"), current_user: User = Security(get_current_user, scopes=["chat.read"]), ): - # 使用明确的查询获取 channel,避免延迟加载 + # 1) 查频道 if channel.isdigit(): - db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == int(channel)))).first() + db_channel = (await session.exec( + select(ChatChannel).where(ChatChannel.channel_id == int(channel)) + )).first() else: - db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.name == channel))).first() + db_channel = (await session.exec( + select(ChatChannel).where(ChatChannel.name == channel) + )).first() if db_channel is None: raise HTTPException(status_code=404, detail="Channel not found") - # 提取必要的属性避免惰性加载 channel_id = db_channel.channel_id - # 使用 Redis 消息系统获取消息 try: + messages = await redis_message_system.get_messages(channel_id, limit, since) + if len(messages) >= 2 and messages[0].message_id > messages[-1].message_id: + messages.reverse() return messages except Exception as e: logger.warning(f"Failed to get messages from Redis system: {e}") - # 回退到传统数据库查询 - pass - # 回退到数据库查询 - query = select(ChatMessage).where(ChatMessage.channel_id == channel_id) - if since > 0: - query = query.where(col(ChatMessage.message_id) > since) + base = select(ChatMessage).where(ChatMessage.channel_id == channel_id) + + if since > 0 and until is None: + # 向前加载新消息 → 直接 ASC + query = ( + base.where(col(ChatMessage.message_id) > since) + .order_by(col(ChatMessage.message_id).asc()) + .limit(limit) + ) + rows = (await session.exec(query)).all() + resp = [await ChatMessageResp.from_db(m, session) for m in rows] + # 已经 ASC,无需反转 + return resp + + # until 分支(向后翻历史) if until is not None: - query = query.where(col(ChatMessage.message_id) < until) + # 用 DESC 取最近的更早消息,再反转为 ASC + query = ( + base.where(col(ChatMessage.message_id) < until) + .order_by(col(ChatMessage.message_id).desc()) + .limit(limit) + ) + rows = (await session.exec(query)).all() + rows = list(rows) + rows.reverse() # 反转为 ASC + resp = [await ChatMessageResp.from_db(m, session) for m in rows] + return resp - query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit) - messages = (await session.exec(query)).all() - resp = [await ChatMessageResp.from_db(msg, session) for msg in messages] + query = base.order_by(col(ChatMessage.message_id).desc()).limit(limit) + rows = (await session.exec(query)).all() + rows = list(rows) + rows.reverse() # 反转为 ASC + resp = [await ChatMessageResp.from_db(m, session) for m in rows] return resp + return resp + @router.put(