Integrate chat channels with multiplayer rooms
Added logic to create and manage chat channels for multiplayer rooms, ensuring room owners and participants are joined/removed from channels on room events. Improved chat message retrieval to consistently return messages in ascending order, supporting both forward and backward pagination.
This commit is contained in:
@@ -20,10 +20,62 @@ from app.fetcher import Fetcher
|
|||||||
from app.models.multiplayer_hub import PlaylistItem as HubPlaylistItem
|
from app.models.multiplayer_hub import PlaylistItem as HubPlaylistItem
|
||||||
from app.models.room import MatchType, QueueMode, RoomStatus
|
from app.models.room import MatchType, QueueMode, RoomStatus
|
||||||
from app.utils import utcnow
|
from app.utils import utcnow
|
||||||
|
from app.database.chat import ChatChannel, ChannelType # ChatChannel 模型 & 枚举
|
||||||
|
from .notification.server import server
|
||||||
|
|
||||||
router = APIRouter(prefix="/_lio", tags=["LIO"])
|
router = APIRouter(prefix="/_lio", tags=["LIO"])
|
||||||
|
|
||||||
|
|
||||||
|
async def _ensure_room_chat_channel(
|
||||||
|
db: Database,
|
||||||
|
room: Room,
|
||||||
|
host_user_id: int,
|
||||||
|
) -> ChatChannel:
|
||||||
|
"""
|
||||||
|
为房间创建/确保存在对应的聊天频道,channel_id 与 room.channel_id 保持一致,
|
||||||
|
名称使用 mp_{room.id}(可按需调整)。
|
||||||
|
"""
|
||||||
|
# 1) 按 channel_id 查是否已存在
|
||||||
|
ch = (await db.exec(
|
||||||
|
select(ChatChannel).where(ChatChannel.channel_id == room.channel_id)
|
||||||
|
)).first()
|
||||||
|
|
||||||
|
if ch is None:
|
||||||
|
# 确保为房间分配一个有效的 channel_id(ChatChannel.channel_id 需要 int)
|
||||||
|
if room.channel_id is None:
|
||||||
|
channel_id_value = await _alloc_channel_id(db)
|
||||||
|
# 同步回写到房间以保证二者一致
|
||||||
|
room.channel_id = channel_id_value
|
||||||
|
db.add(room)
|
||||||
|
else:
|
||||||
|
channel_id_value = int(room.channel_id)
|
||||||
|
|
||||||
|
ch = ChatChannel(
|
||||||
|
channel_id=channel_id_value, # 与房间绑定的同一 channel_id(确保为 int)
|
||||||
|
name=f"mp_{room.id}", # 频道名可自定义(注意唯一性)
|
||||||
|
description=f"Multiplayer room {room.id} chat",
|
||||||
|
type=ChannelType.MULTIPLAYER,
|
||||||
|
)
|
||||||
|
db.add(ch)
|
||||||
|
await db.commit()
|
||||||
|
await db.refresh(ch)
|
||||||
|
|
||||||
|
# 2) (可选)把房主加入频道 & 触发在线侧同步
|
||||||
|
# 如果你有 server 并希望立即让房主加入聊天频道,可取消注释以下代码
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
from app.router.v2.chat import server # 视你的项目实际路径调整
|
||||||
|
host_user = await db.get(User, host_user_id)
|
||||||
|
# server.batch_join_channel 接口签名:([users], channel, session)
|
||||||
|
await server.batch_join_channel([host_user], ch, db)
|
||||||
|
await db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
# 不中断主流程,打日志即可
|
||||||
|
print(f"Warning: failed to join host {host_user_id} to chat channel {ch.channel_id}: {e}")
|
||||||
|
"""
|
||||||
|
|
||||||
|
return ch
|
||||||
|
|
||||||
|
|
||||||
async def _alloc_channel_id(db: Database) -> int:
|
async def _alloc_channel_id(db: Database) -> int:
|
||||||
"""
|
"""
|
||||||
@@ -396,6 +448,12 @@ async def create_multiplayer_room(
|
|||||||
room_id = room.id
|
room_id = room.id
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
channel = await _ensure_room_chat_channel(db, room, host_user_id)
|
||||||
|
|
||||||
|
# 让房主加入频道
|
||||||
|
host_user = await db.get(User, host_user_id)
|
||||||
|
if host_user:
|
||||||
|
await server.batch_join_channel([host_user], channel, db)
|
||||||
# Add playlist items
|
# Add playlist items
|
||||||
await _add_playlist_items(db, room_id, room_data, host_user_id)
|
await _add_playlist_items(db, room_id, room_data, host_user_id)
|
||||||
|
|
||||||
@@ -531,7 +589,7 @@ async def remove_user_from_room(
|
|||||||
detail="Room not found"
|
detail="Room not found"
|
||||||
)
|
)
|
||||||
|
|
||||||
room_owner_id, room_status, current_participant_count, ends_at = room_query
|
room_owner_id, room_status, current_participant_count, ends_at, channel_id = room_query
|
||||||
|
|
||||||
# 如果房间已经结束,直接返回
|
# 如果房间已经结束,直接返回
|
||||||
if ends_at is not None:
|
if ends_at is not None:
|
||||||
@@ -553,6 +611,15 @@ async def remove_user_from_room(
|
|||||||
# 用户不在房间中,检查房间是否需要结束(幂等操作)
|
# 用户不在房间中,检查房间是否需要结束(幂等操作)
|
||||||
room_ended = await _end_room_if_empty(db, room_id)
|
room_ended = await _end_room_if_empty(db, room_id)
|
||||||
await db.commit()
|
await db.commit()
|
||||||
|
|
||||||
|
try:
|
||||||
|
if channel_id:
|
||||||
|
await server.leave_room_channel(int(channel_id), int(user_id))
|
||||||
|
if room_ended:
|
||||||
|
server.channels.pop(int(channel_id), None)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[warn] failed to leave user {user_id} from channel {channel_id}: {e}")
|
||||||
|
|
||||||
return {"success": True, "room_ended": room_ended}
|
return {"success": True, "room_ended": room_ended}
|
||||||
|
|
||||||
# 标记用户离开房间
|
# 标记用户离开房间
|
||||||
@@ -578,6 +645,16 @@ async def remove_user_from_room(
|
|||||||
|
|
||||||
await db.commit()
|
await db.commit()
|
||||||
print(f"Successfully removed user {user_id} from room {room_id}, room_ended: {room_ended}")
|
print(f"Successfully removed user {user_id} from room {room_id}, room_ended: {room_ended}")
|
||||||
|
|
||||||
|
# ===== 新增:提交后,把用户从聊天频道移除;若房间已结束,清理内存频道 =====
|
||||||
|
try:
|
||||||
|
if channel_id:
|
||||||
|
await server.leave_room_channel(int(channel_id), int(user_id))
|
||||||
|
if room_ended:
|
||||||
|
server.channels.pop(int(channel_id), None)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"[warn] failed to leave user {user_id} from channel {channel_id}: {e}")
|
||||||
|
|
||||||
return {"success": True, "room_ended": room_ended}
|
return {"success": True, "room_ended": room_ended}
|
||||||
|
|
||||||
except HTTPException:
|
except HTTPException:
|
||||||
@@ -622,37 +699,53 @@ async def add_user_to_room(
|
|||||||
|
|
||||||
# 检查房间是否已结束
|
# 检查房间是否已结束
|
||||||
room_result = await db.execute(
|
room_result = await db.execute(
|
||||||
select(Room.id, Room.ends_at)
|
select(Room.id, Room.ends_at, Room.channel_id, Room.host_id)
|
||||||
.where(col(Room.id) == room_id)
|
.where(col(Room.id) == room_id)
|
||||||
)
|
)
|
||||||
room_query = room_result.first()
|
room_row = room_result.first()
|
||||||
|
if not room_row:
|
||||||
if not room_query:
|
raise HTTPException(status_code=404, detail="Room not found")
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_404_NOT_FOUND,
|
|
||||||
detail="Room not found"
|
|
||||||
)
|
|
||||||
|
|
||||||
if room_query[1] is not None: # ends_at is not None
|
|
||||||
print(f"User {user_id} attempted to join ended room {room_id}")
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=status.HTTP_410_GONE,
|
|
||||||
detail="Room has ended and cannot accept new participants"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Verify room exists and check password
|
_, ends_at, channel_id, host_user_id = room_row
|
||||||
|
if ends_at is not None:
|
||||||
|
print(f"User {user_id} attempted to join ended room {room_id}")
|
||||||
|
raise HTTPException(status_code=410, detail="Room has ended and cannot accept new participants")
|
||||||
|
|
||||||
|
# Verify room password
|
||||||
provided_password = user_data.get("password") if user_data else None
|
provided_password = user_data.get("password") if user_data else None
|
||||||
print(f"Verifying room {room_id} with password: {provided_password}")
|
print(f"Verifying room {room_id} with password: {provided_password}")
|
||||||
await _verify_room_password(db, room_id, provided_password)
|
await _verify_room_password(db, room_id, provided_password)
|
||||||
|
|
||||||
# Add or update participant
|
# Add or update participant
|
||||||
await _add_or_update_participant(db, room_id, user_id)
|
await _add_or_update_participant(db, room_id, user_id)
|
||||||
|
|
||||||
# Update participant count
|
# Update participant count
|
||||||
await _update_room_participant_count(db, room_id)
|
await _update_room_participant_count(db, room_id)
|
||||||
|
|
||||||
|
# 先提交 DB 状态,确保参与关系已生效
|
||||||
await db.commit()
|
await db.commit()
|
||||||
print(f"Successfully added user {user_id} to room {room_id}")
|
print(f"Successfully added user {user_id} to room {room_id}")
|
||||||
|
|
||||||
|
# ===== 新增:确保有聊天频道并把用户加入 =====
|
||||||
|
try:
|
||||||
|
# 若房间还没分配/创建频道,补建并同步回写
|
||||||
|
if not channel_id:
|
||||||
|
room = await db.get(Room, room_id)
|
||||||
|
if room is None:
|
||||||
|
raise HTTPException(status_code=404, detail="Room not found")
|
||||||
|
await _ensure_room_chat_channel(db, room, host_user_id)
|
||||||
|
await db.refresh(room)
|
||||||
|
channel_id = room.channel_id
|
||||||
|
|
||||||
|
if channel_id:
|
||||||
|
# 加入聊天频道 → 内存注册 + 给在线客户端发 chat.channel.join
|
||||||
|
await server.join_room_channel(int(channel_id), int(user_id))
|
||||||
|
else:
|
||||||
|
# 理论上不会发生;留日志以便排查
|
||||||
|
print(f"[warn] Room {room_id} has no channel_id after ensure.")
|
||||||
|
except Exception as e:
|
||||||
|
# 不影响加入房间主流程,仅记录
|
||||||
|
print(f"[warn] failed to join user {user_id} to channel of room {room_id}: {e}")
|
||||||
|
|
||||||
return {"success": True}
|
return {"success": True}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -149,49 +149,77 @@ async def send_message(
|
|||||||
"/chat/channels/{channel}/messages",
|
"/chat/channels/{channel}/messages",
|
||||||
response_model=list[ChatMessageResp],
|
response_model=list[ChatMessageResp],
|
||||||
name="获取消息",
|
name="获取消息",
|
||||||
description="获取指定频道的消息列表。",
|
description="获取指定频道的消息列表(统一按时间正序返回)。",
|
||||||
tags=["聊天"],
|
tags=["聊天"],
|
||||||
)
|
)
|
||||||
async def get_message(
|
async def get_message(
|
||||||
session: Database,
|
session: Database,
|
||||||
channel: str,
|
channel: str,
|
||||||
limit: int = Query(50, ge=1, le=50, description="获取消息的数量"),
|
limit: int = Query(50, ge=1, le=50, description="获取消息的数量"),
|
||||||
since: int = Query(default=0, ge=0, description="获取自此消息 ID 之后的消息记录"),
|
since: int = Query(0, ge=0, description="获取自此消息 ID 之后的消息(向前加载新消息)"),
|
||||||
until: int | None = Query(None, description="获取自此消息 ID 之前的消息记录"),
|
until: int | None = Query(None, description="获取自此消息 ID 之前的消息(向后翻历史)"),
|
||||||
current_user: User = Security(get_current_user, scopes=["chat.read"]),
|
current_user: User = Security(get_current_user, scopes=["chat.read"]),
|
||||||
):
|
):
|
||||||
# 使用明确的查询获取 channel,避免延迟加载
|
# 1) 查频道
|
||||||
if channel.isdigit():
|
if channel.isdigit():
|
||||||
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.channel_id == int(channel)))).first()
|
db_channel = (await session.exec(
|
||||||
|
select(ChatChannel).where(ChatChannel.channel_id == int(channel))
|
||||||
|
)).first()
|
||||||
else:
|
else:
|
||||||
db_channel = (await session.exec(select(ChatChannel).where(ChatChannel.name == channel))).first()
|
db_channel = (await session.exec(
|
||||||
|
select(ChatChannel).where(ChatChannel.name == channel)
|
||||||
|
)).first()
|
||||||
|
|
||||||
if db_channel is None:
|
if db_channel is None:
|
||||||
raise HTTPException(status_code=404, detail="Channel not found")
|
raise HTTPException(status_code=404, detail="Channel not found")
|
||||||
|
|
||||||
# 提取必要的属性避免惰性加载
|
|
||||||
channel_id = db_channel.channel_id
|
channel_id = db_channel.channel_id
|
||||||
|
|
||||||
# 使用 Redis 消息系统获取消息
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
messages = await redis_message_system.get_messages(channel_id, limit, since)
|
messages = await redis_message_system.get_messages(channel_id, limit, since)
|
||||||
|
if len(messages) >= 2 and messages[0].message_id > messages[-1].message_id:
|
||||||
|
messages.reverse()
|
||||||
return messages
|
return messages
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to get messages from Redis system: {e}")
|
logger.warning(f"Failed to get messages from Redis system: {e}")
|
||||||
# 回退到传统数据库查询
|
|
||||||
pass
|
|
||||||
|
|
||||||
# 回退到数据库查询
|
base = select(ChatMessage).where(ChatMessage.channel_id == channel_id)
|
||||||
query = select(ChatMessage).where(ChatMessage.channel_id == channel_id)
|
|
||||||
if since > 0:
|
if since > 0 and until is None:
|
||||||
query = query.where(col(ChatMessage.message_id) > since)
|
# 向前加载新消息 → 直接 ASC
|
||||||
|
query = (
|
||||||
|
base.where(col(ChatMessage.message_id) > since)
|
||||||
|
.order_by(col(ChatMessage.message_id).asc())
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
rows = (await session.exec(query)).all()
|
||||||
|
resp = [await ChatMessageResp.from_db(m, session) for m in rows]
|
||||||
|
# 已经 ASC,无需反转
|
||||||
|
return resp
|
||||||
|
|
||||||
|
# until 分支(向后翻历史)
|
||||||
if until is not None:
|
if until is not None:
|
||||||
query = query.where(col(ChatMessage.message_id) < until)
|
# 用 DESC 取最近的更早消息,再反转为 ASC
|
||||||
|
query = (
|
||||||
|
base.where(col(ChatMessage.message_id) < until)
|
||||||
|
.order_by(col(ChatMessage.message_id).desc())
|
||||||
|
.limit(limit)
|
||||||
|
)
|
||||||
|
rows = (await session.exec(query)).all()
|
||||||
|
rows = list(rows)
|
||||||
|
rows.reverse() # 反转为 ASC
|
||||||
|
resp = [await ChatMessageResp.from_db(m, session) for m in rows]
|
||||||
|
return resp
|
||||||
|
|
||||||
query = query.order_by(col(ChatMessage.message_id).desc()).limit(limit)
|
query = base.order_by(col(ChatMessage.message_id).desc()).limit(limit)
|
||||||
messages = (await session.exec(query)).all()
|
rows = (await session.exec(query)).all()
|
||||||
resp = [await ChatMessageResp.from_db(msg, session) for msg in messages]
|
rows = list(rows)
|
||||||
|
rows.reverse() # 反转为 ASC
|
||||||
|
resp = [await ChatMessageResp.from_db(m, session) for m in rows]
|
||||||
return resp
|
return resp
|
||||||
|
return resp
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@router.put(
|
@router.put(
|
||||||
|
|||||||
Reference in New Issue
Block a user