From 9893aedbb217dc9b81fc2dad99dd8292a153c349 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Thu, 21 May 2026 20:02:44 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(im):=20=E4=BF=AE=E4=B8=80?= =?UTF-8?q?=E6=89=B9=20WS=20=E5=81=A5=E5=A3=AE=E6=80=A7=E4=B8=8E=E8=B7=A8?= =?UTF-8?q?=E8=B4=A6=E5=8F=B7=E9=98=B2=E5=BE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - WS 重连改指数退避(1→2→4→8→16→30s + jitter),频率封顶不再固定 3s 形成惊群 - onerror 不再调 reconnect,主动 close 让 onclose 成为唯一重连入口,避免双触计数 +2 - 私聊 / 群消息入口加防御层,senderId / receiverId / 定向 receiverUserIds 不含当前用户的帧直接丢弃 - useMessagePuller 引入 epoch + userId 双重快照,离开 IM / 切账号时旧 pull 写入前自检跳出 - cancelPull 同步清 WS messageBuffer,防止下次进 IM 把旧 session 缓冲帧回放进新 store --- .../im/home/composables/useMessagePuller.ts | 108 +++++++++++--- src/views/im/home/index.vue | 5 +- src/views/im/home/store/websocketStore.ts | 133 +++++++++++++----- src/views/im/utils/config.ts | 35 ++++- 4 files changed, 224 insertions(+), 57 deletions(-) diff --git a/src/views/im/home/composables/useMessagePuller.ts b/src/views/im/home/composables/useMessagePuller.ts index 707244fba..7ce8fac25 100644 --- a/src/views/im/home/composables/useMessagePuller.ts +++ b/src/views/im/home/composables/useMessagePuller.ts @@ -29,9 +29,9 @@ import { MESSAGE_GROUP_PULL_SIZE, MESSAGE_PRIVATE_READ_ENABLED } from '../../utils/config' -import { useUserStore } from '@/store/modules/user' import { buildChannelConversationStub } from '../../utils/channel' import { getPrivateMessagePeerId } from '../../utils/message' +import { getCurrentUserId } from '../../utils/storage' import type { Message } from '../types' /** @@ -48,10 +48,9 @@ import type { Message } from '../types' export const useMessagePuller = () => { const conversationStore = useConversationStore() const wsStore = useImWebSocketStore() - const userStore = useUserStore() const friendStore = useFriendStore() const groupStore = useGroupStore() - const currentUserId = Number(userStore.getUser?.id) || 0 + const currentUserId = getCurrentUserId() /** 私聊会话归属:自己发的算"发给 receiverId 的会话",否则算"发送方的会话";curry currentUserId 进闭包减少 3 处调用方的样板 */ const getPrivatePeerId = (message: ImPrivateMessageRespVO) => @@ -136,14 +135,30 @@ export const useMessagePuller = () => { } } - /** 循环拉取指定会话类型的消息:以本批最大 id 作为下次 minId,直到接口返回空列表或游标不再前进 */ - const pullByType = async (conversationType: number, startMinId: number) => { + /** + * 循环拉取指定会话类型的消息:以本批最大 id 作为下次 minId,直到接口返回空列表或游标不再前进 + * + * 取消语义两层守卫: + * 1. startEpoch:cancelPull() 递增 pullEpoch;离开 IM / 切账号时旧循环检测到漂移即跳出 + * 2. startUserId:每批 await 后比对当前登录 userId;防御 logout / 多 tab 异常下用户已切但 cancelPull 未触发 + * 两者任一不等都丢弃本批不入库,避免旧 session 的接口响应在新 store 落地 + */ + const pullByType = async ( + conversationType: number, + startMinId: number, + startEpoch: number, + startUserId: number + ) => { // 私聊 / 群聊 / 频道各自一套接口;按 conversationType 在循环内分支调度 let minId = startMinId || 0 const isPrivate = conversationType === ImConversationType.PRIVATE const isChannel = conversationType === ImConversationType.CHANNEL const size = isPrivate ? MESSAGE_PRIVATE_PULL_SIZE : MESSAGE_GROUP_PULL_SIZE + const isStillValid = () => pullEpoch === startEpoch && getCurrentUserId() === startUserId while (true) { + if (!isStillValid()) { + return + } let list: any[] | undefined if (isPrivate) { list = await apiPullPrivateMessages({ minId, size }) @@ -152,6 +167,10 @@ export const useMessagePuller = () => { } else { list = await apiPullGroupMessages({ minId, size }) } + // 接口返回期间发生 cancel / 切账号:丢弃本批不入库,也不再翻页 + if (!isStillValid()) { + return + } if (!list || list.length === 0) { break } @@ -234,6 +253,24 @@ export const useMessagePuller = () => { */ let initialPulled = false + /** + * pull 轮次计数;切账号 / 离开 IM 时 cancelPull() 递增,旧 pullByType 循环按 epoch 自检后跳出 + * 避免旧 session 的接口响应在新 session 落地,造成跨账号消息泄漏 + * + * 注意:普通断连(WS 短断)不取消 pull——网络抖动 / 服务端重启都属于本账号正常生命周期, + * 取消会导致首拉被中断后 initialPulled 永远停在 false,后续重连 watcher 不再补拉 + */ + let pullEpoch = 0 + + /** 显式取消:仅由 Index.vue onUnmounted(离开 IM / 切账号 / 路由跳出)调用 */ + const cancelPull = () => { + pullEpoch++ + // 旧 promise 仍在 finally 阶段跑,但 epoch 守卫已阻断后续副作用;这里立刻让 pullPromise = null 让新一轮可重入 + pullPromise = null + // 同步丢弃 WS 缓冲帧;旧 pull 已不会 flushBuffer,若不清下次进 IM 第一次 pullOnce 会把旧 session 的帧回放进新 store + wsStore.discardBuffer() + } + /** 执行一次全量增量拉取(重入安全:进行中再次调用复用同一个 promise) */ const pullOnce = (): Promise => { if (!currentUserId) { @@ -242,21 +279,52 @@ export const useMessagePuller = () => { if (pullPromise) { return pullPromise } + const startEpoch = pullEpoch + // 启动时的用户快照;pullByType 每批 await 后比对当前登录用户,账号变了立刻丢弃 + const startUserId = currentUserId + // 本轮 pull 仍属于当前 session:epoch 未漂 + 用户未切;任何动新 store 状态的副作用都要先过这道关 + const isCurrentPull = () => pullEpoch === startEpoch && getCurrentUserId() === startUserId pullPromise = (async () => { try { + // 旧 puller 在 cancelPull 未触发的异常路径上再进来时,先于任何副作用退出,避免污染新 session 的 loading + if (!isCurrentPull()) { + return + } conversationStore.loading = true try { // 并发拉取私聊 + 群聊 + 频道,降低初始加载耗时 await Promise.all([ - pullByType(ImConversationType.PRIVATE, conversationStore.privateMessageMaxId), - pullByType(ImConversationType.GROUP, conversationStore.groupMessageMaxId), - pullByType(ImConversationType.CHANNEL, conversationStore.channelMessageMaxId) + pullByType( + ImConversationType.PRIVATE, + conversationStore.privateMessageMaxId, + startEpoch, + startUserId + ), + pullByType( + ImConversationType.GROUP, + conversationStore.groupMessageMaxId, + startEpoch, + startUserId + ), + pullByType( + ImConversationType.CHANNEL, + conversationStore.channelMessageMaxId, + startEpoch, + startUserId + ) ]) } catch (e) { console.error('[IM] 拉取离线消息失败:', e) } finally { - // 关闭 buffer 模式必须早于 flushBuffer,否则 handler 看到 loading=true 会把消息又 push 回 buffer - conversationStore.loading = false + // 仍属本轮才复位 loading;旧轮被 cancel / 切账号时由新一轮自管,避免覆盖新 session 的 true + if (isCurrentPull()) { + conversationStore.loading = false + } + } + + // 取消 / 切账号后跳过 flushBuffer / 排序 / 已读位置补齐 + if (!isCurrentPull()) { + return } // 回放 WebSocket 在 loading 期间收到的缓冲消息(此刻走正常 insertMessage 路径) @@ -278,12 +346,12 @@ export const useMessagePuller = () => { // 离线期间错过的 RECEIPT 推送会被这里补回;其他私聊会话等用户点开时由 Index.vue 的 watch 触发 // 私聊已读关闭时跳过,避免打到已禁用接口触发错误日志 const active = conversationStore.activeConversation - if ( - MESSAGE_PRIVATE_READ_ENABLED - && active && active.type === ImConversationType.PRIVATE - ) { + if (MESSAGE_PRIVATE_READ_ENABLED && active && active.type === ImConversationType.PRIVATE) { try { const maxReadId = await apiGetPrivateMaxReadMessageId(active.targetId) + if (!isCurrentPull()) { + return + } if (maxReadId) { conversationStore.applyReadReceipt({ conversationType: ImConversationType.PRIVATE, @@ -296,9 +364,13 @@ export const useMessagePuller = () => { } } } finally { - // 整个 IIFE 全部完成(含已读位置补齐)后才允许下一次 pullOnce 重入 - pullPromise = null - initialPulled = true + // 仍属本轮:正常完成首拉;epoch 等但 userId 切了:清 pullPromise 防卡死、不标首拉;epoch 漂:cancelPull 已清,no-op + if (isCurrentPull()) { + pullPromise = null + initialPulled = true + } else if (pullEpoch === startEpoch) { + pullPromise = null + } } })() return pullPromise @@ -317,5 +389,5 @@ export const useMessagePuller = () => { } ) - return { pullOnce, convertPrivateMessage, convertGroupMessage } + return { pullOnce, cancelPull, convertPrivateMessage, convertGroupMessage } } diff --git a/src/views/im/home/index.vue b/src/views/im/home/index.vue index 94485666f..c38166548 100644 --- a/src/views/im/home/index.vue +++ b/src/views/im/home/index.vue @@ -65,7 +65,7 @@ const groupRequestStore = useGroupRequestStore() const draftStore = useDraftStore() const faceStore = useFaceStore() const channelStore = useChannelStore() -const { pullOnce } = useMessagePuller() +const { pullOnce, cancelPull } = useMessagePuller() const { readActive, syncPrivateReadStatus } = useMessageSender() const voicePlayer = useVoicePlayer() @@ -153,8 +153,9 @@ function onBeforeUnload() { } window.addEventListener('beforeunload', onBeforeUnload) -/** 离开 IM 主壳:主动断 WebSocket(disconnect 内部已清掉 onclose 防自动重连)+ flush 草稿 + 表情缓存 reset + 解绑 unload + 停语音 */ +/** 离开 IM 主壳:取消在飞的 pull(防止旧响应写新 session)+ 主动断 WebSocket + flush 草稿 + 表情缓存 reset + 解绑 unload + 停语音 */ onUnmounted(() => { + cancelPull() webSocketStore.disconnect() draftStore.flushPersist() faceStore.reset() diff --git a/src/views/im/home/store/websocketStore.ts b/src/views/im/home/store/websocketStore.ts index 0792d1aed..a0df7f7ed 100644 --- a/src/views/im/home/store/websocketStore.ts +++ b/src/views/im/home/store/websocketStore.ts @@ -19,7 +19,13 @@ import { playAudioTip, resolveCallEndReasonText } from '../../utils/message' -import { MESSAGE_PRIVATE_READ_ENABLED, MESSAGE_GROUP_READ_ENABLED } from '../../utils/config' +import { + MESSAGE_PRIVATE_READ_ENABLED, + MESSAGE_GROUP_READ_ENABLED, + WS_RECONNECT_BASE_MS, + WS_RECONNECT_MAX_MS, + WS_RECONNECT_JITTER_MS +} from '../../utils/config' import { useConversationStore } from './conversationStore' import { useFriendStore, type FriendNotificationPayload } from './friendStore' import { getFriendDisplayName } from '../../utils/user' @@ -120,6 +126,8 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { socket: null as WebSocket | null, isConnected: false, reconnectTimer: null as ReturnType | null, + /** 连续重连失败次数;onopen 成功 / disconnect 主动断开后清零,用于指数退避 */ + reconnectAttempts: 0, heartbeatTimer: null as ReturnType | null, messageBuffer: [] as Array< | { conversationType: typeof ImConversationType.PRIVATE; payload: ImPrivateMessageDTO } @@ -139,6 +147,11 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { return msgs }, + /** 直接丢弃缓冲帧不回放(cancelPull / 离开 IM 调用,防止下次进 IM 把旧 session 帧回放进新 store) */ + discardBuffer() { + this.messageBuffer = [] + }, + /** * 连接 WebSocket * 复用 yudao 内置 /infra/ws 通道,后端通过 sendObject(type, content) 下发 @@ -173,9 +186,10 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { const url = `${this.buildWsUrl()}/infra/ws?token=${refreshToken}` this.socket = new WebSocket(url) - // 连接建立:标记上线 + 启动心跳保活 + // 连接建立:标记上线 + 启动心跳保活;重连退避计数归零 this.socket.onopen = () => { this.isConnected = true + this.reconnectAttempts = 0 console.log('[IM WS] connected') this.startHeartbeat() } @@ -193,18 +207,20 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { } } - // 服务端关闭 / 网络断:标记下线,3 秒后自动重连 + // 服务端关闭 / 网络断:标记下线,按指数退避自动重连 this.socket.onclose = () => { this.isConnected = false console.log('[IM WS] disconnected') this.reconnect() } - // 异常同样走重连(onerror 后通常 onclose 也会触发,reconnect 内部已防重) + // 异常时不主动 reconnect,主动 close() 让 onclose 成为唯一重连入口: + // 1)避免 onerror / onclose 双触把 reconnectAttempts 一次断连 +2 + // 2)兜底某些平台 onerror 后 onclose 延迟 / 丢失导致重连卡住 this.socket.onerror = (error) => { console.error('[IM WS] error:', error) this.isConnected = false - this.reconnect() + this.socket?.close() } }, @@ -287,29 +303,26 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { typeof websocketMessage.sendTime === 'number' ? websocketMessage.sendTime : new Date(websocketMessage.sendTime).getTime() - conversationStore.insertMessage( - buildChannelConversationStub(websocketMessage.channelId), - { - id: websocketMessage.id, - clientMessageId: '', - type: websocketMessage.type, - content: websocketMessage.content, - status: ImMessageStatus.UNREAD, - sendTime: sendTimeMs, - senderId: 0, - targetId: websocketMessage.channelId, - selfSend: false, - materialId: websocketMessage.materialId - } - ) + conversationStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), { + id: websocketMessage.id, + clientMessageId: '', + type: websocketMessage.type, + content: websocketMessage.content, + status: ImMessageStatus.UNREAD, + sendTime: sendTimeMs, + senderId: 0, + targetId: websocketMessage.channelId, + selfSend: false, + materialId: websocketMessage.materialId + }) // 非当前会话 + 未免打扰:响一下提示音 const conversation = conversationStore.getConversation( ImConversationType.CHANNEL, websocketMessage.channelId ) const isActive = - conversationStore.activeConversation?.type === ImConversationType.CHANNEL - && conversationStore.activeConversation?.targetId === websocketMessage.channelId + conversationStore.activeConversation?.type === ImConversationType.CHANNEL && + conversationStore.activeConversation?.targetId === websocketMessage.channelId if (!isActive && !conversation?.silent && isNormalMessage(websocketMessage.type)) { playAudioTip() } @@ -432,6 +445,21 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { */ handlePrivateMessage(websocketMessage: ImPrivateMessageDTO) { const conversationStore = useConversationStore() + const userStore = useUserStore() + const friendStore = useFriendStore() + const currentUserId = Number(userStore.getUser?.id) || 0 + + // 0. 防御层:senderId / receiverId 均不含当前用户的私聊帧直接丢弃,避免后端路由 / 多端串号污染会话 + // (FRIEND_* 等系统通知也走这条通道,但 fromUserId=senderId、toUserId=receiverId 仍是当前用户视角) + if ( + currentUserId && + websocketMessage.senderId !== currentUserId && + websocketMessage.receiverId !== currentUserId + ) { + console.warn('[IM WS] 丢弃不属于当前用户的私聊帧', websocketMessage) + return + } + // 1. 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱 if (conversationStore.loading) { this.messageBuffer.push({ @@ -442,9 +470,6 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { } // 2. selfSend / peerId:自己发的消息属于「发给 receiverId 的会话」,别人发的属于「发送者的会话」 - const userStore = useUserStore() - const friendStore = useFriendStore() - const currentUserId = Number(userStore.getUser?.id) || 0 const selfSend = websocketMessage.senderId === currentUserId const peerId = getPrivateMessagePeerId(websocketMessage, currentUserId) // 未知对端(陌生人加好友前先收到消息等场景):异步补拉一次,下次再渲染就有 name/avatar @@ -507,7 +532,10 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { return } const conversationStore = useConversationStore() - conversationStore.markConversationAsRead(ImConversationType.PRIVATE, websocketMessage.receiverId) + conversationStore.markConversationAsRead( + ImConversationType.PRIVATE, + websocketMessage.receiverId + ) }, /** @@ -542,6 +570,25 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { */ handleGroupMessage(websocketMessage: ImGroupMessageDTO) { const conversationStore = useConversationStore() + const userStore = useUserStore() + const groupStore = useGroupStore() + const currentUserId = Number(userStore.getUser?.id) || 0 + const selfSend = websocketMessage.senderId === currentUserId + + // 0. 防御层:定向群消息 receiverUserIds 非空且未包含当前用户时丢弃 + // 自己发的(selfSend)始终通过;全员可见(receiverUserIds 为空 / 缺失)也通过 + const receiverUserIds = websocketMessage.receiverUserIds + if ( + currentUserId && + !selfSend && + Array.isArray(receiverUserIds) && + receiverUserIds.length > 0 && + !receiverUserIds.includes(currentUserId) + ) { + console.warn('[IM WS] 丢弃不属于当前用户的定向群消息', websocketMessage) + return + } + // 1. 离线加载期缓冲(与私聊对称) if (conversationStore.loading) { this.messageBuffer.push({ @@ -550,10 +597,6 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { }) return } - const userStore = useUserStore() - const groupStore = useGroupStore() - const currentUserId = Number(userStore.getUser?.id) || 0 - const selfSend = websocketMessage.senderId === currentUserId // 2. 未知群时自动拉群详情 + 成员(被拉入群但还没收到 GROUP_CREATE 时的兜底) const group = groupStore.getGroup(websocketMessage.groupId) @@ -668,10 +711,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { friendStore.applyFriendRequestRejectedNotification(payload) break case ImMessageType.FRIEND_ADD: - friendStore.applyFriendAddNotification(payload, this.computeFriendPeerId(websocketMessage)) + friendStore.applyFriendAddNotification( + payload, + this.computeFriendPeerId(websocketMessage) + ) break case ImMessageType.FRIEND_DELETE: - friendStore.applyFriendDeleteNotification(payload, this.computeFriendPeerId(websocketMessage)) + friendStore.applyFriendDeleteNotification( + payload, + this.computeFriendPeerId(websocketMessage) + ) break case ImMessageType.FRIEND_BLOCK: friendStore.applyFriendBlockNotification(payload) @@ -777,18 +826,32 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { clearTimeout(this.reconnectTimer) this.reconnectTimer = null } + // 主动断开(切账号 / 退出):清零退避计数,下次 connect 重新从最短间隔起算 + this.reconnectAttempts = 0 }, - /** 自动重连,3 秒后再试(onclose / onerror 都会进来,靠 reconnectTimer 自身防重) */ + /** + * 自动重连:指数退避 base * 2^attempt(封顶 max)+ 0~jitter ms 随机偏移 + * + * onclose 是唯一入口;onerror 不再调本方法(浏览器规范两者必同时触发,避免计数 +2) + * 不设次数上限,频率封顶在 WS_RECONNECT_MAX_MS(约 30s)持续重试,直到链路恢复或主动 disconnect + */ reconnect() { this.stopHeartbeat() if (this.reconnectTimer) { clearTimeout(this.reconnectTimer) + this.reconnectTimer = null } + const backoff = Math.min( + WS_RECONNECT_BASE_MS * 2 ** this.reconnectAttempts, + WS_RECONNECT_MAX_MS + ) + const delay = backoff + Math.floor(Math.random() * WS_RECONNECT_JITTER_MS) + this.reconnectAttempts++ + console.log(`[IM WS] reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`) this.reconnectTimer = setTimeout(() => { - console.log('[IM WS] reconnecting...') this.connect() - }, 3000) + }, delay) }, /** 心跳 5 秒一次,保活 + 探活(链路断了 onclose 会触发,由 reconnect 兜底) */ diff --git a/src/views/im/utils/config.ts b/src/views/im/utils/config.ts index 6c689961e..6d153f006 100644 --- a/src/views/im/utils/config.ts +++ b/src/views/im/utils/config.ts @@ -71,8 +71,26 @@ export const MESSAGE_VOICE_MAX_MB = 5 /** 可执行 / 脚本类扩展名黑名单;接收端点击下载后本地双击就跑,html 本地打开还能执行脚本 */ export const DANGEROUS_FILE_EXTENSIONS = [ - 'exe', 'bat', 'cmd', 'com', 'msi', 'scr', 'pif', 'vbs', 'vbe', 'wsf', 'ws', - 'js', 'jse', 'jar', 'sh', 'app', 'ps1', 'reg', 'html', 'htm' + 'exe', + 'bat', + 'cmd', + 'com', + 'msi', + 'scr', + 'pif', + 'vbs', + 'vbe', + 'wsf', + 'ws', + 'js', + 'jse', + 'jar', + 'sh', + 'app', + 'ps1', + 'reg', + 'html', + 'htm' ] // ==================== 前端独有:UI 阈值 ==================== @@ -100,3 +118,16 @@ export const CONVERSATION_RECENT_FORWARD_MAX = 12 * 实际超时阈值由后端 yudao.im.rtc.invite-timeout-minutes 决定,前端仅决定触发频率 */ export const RTC_NO_ANSWER_CALL_CHECK_INTERVAL_MS = 60 * 1000 + +// ==================== 前端独有:WebSocket 自动重连 ==================== +// 指数退避:base * 2^attempt,上限封顶;每次再叠加 0~jitter ms 随机偏移 +// 避免服务端重启时全量客户端在同一秒打过来形成「惊群」;不设次数上限,持续重连直到链路恢复 + +/** 首次重连等待,单位 ms */ +export const WS_RECONNECT_BASE_MS = 1000 + +/** 退避上限,单位 ms;连续失败 5 次后稳定在 30s 不再增长 */ +export const WS_RECONNECT_MAX_MS = 30 * 1000 + +/** 退避叠加的随机抖动上限,单位 ms */ +export const WS_RECONNECT_JITTER_MS = 1000