feat(im): 修一批 WS 健壮性与跨账号防御

- WS 重连改指数退避(1→2→4→8→16→30s + jitter),频率封顶不再固定 3s 形成惊群
- onerror 不再调 reconnect,主动 close 让 onclose 成为唯一重连入口,避免双触计数 +2
- 私聊 / 群消息入口加防御层,senderId / receiverId / 定向 receiverUserIds 不含当前用户的帧直接丢弃
- useMessagePuller 引入 epoch + userId 双重快照,离开 IM / 切账号时旧 pull 写入前自检跳出
- cancelPull 同步清 WS messageBuffer,防止下次进 IM 把旧 session 缓冲帧回放进新 store
im
YunaiV 2026-05-21 20:02:44 +08:00
parent 5a983bb1eb
commit 9893aedbb2
4 changed files with 224 additions and 57 deletions

View File

@ -29,9 +29,9 @@ import {
MESSAGE_GROUP_PULL_SIZE,
MESSAGE_PRIVATE_READ_ENABLED
} from '../../utils/config'
import { useUserStore } from '@/store/modules/user'
import { buildChannelConversationStub } from '../../utils/channel'
import { getPrivateMessagePeerId } from '../../utils/message'
import { getCurrentUserId } from '../../utils/storage'
import type { Message } from '../types'
/**
@ -48,10 +48,9 @@ import type { Message } from '../types'
export const useMessagePuller = () => {
const conversationStore = useConversationStore()
const wsStore = useImWebSocketStore()
const userStore = useUserStore()
const friendStore = useFriendStore()
const groupStore = useGroupStore()
const currentUserId = Number(userStore.getUser?.id) || 0
const currentUserId = getCurrentUserId()
/** 私聊会话归属:自己发的算"发给 receiverId 的会话",否则算"发送方的会话"curry currentUserId 进闭包减少 3 处调用方的样板 */
const getPrivatePeerId = (message: ImPrivateMessageRespVO) =>
@ -136,14 +135,30 @@ export const useMessagePuller = () => {
}
}
/** 循环拉取指定会话类型的消息:以本批最大 id 作为下次 minId直到接口返回空列表或游标不再前进 */
const pullByType = async (conversationType: number, startMinId: number) => {
/**
* id minId
*
*
* 1. startEpochcancelPull() pullEpoch IM /
* 2. startUserId await userId logout / tab cancelPull
* session store
*/
const pullByType = async (
conversationType: number,
startMinId: number,
startEpoch: number,
startUserId: number
) => {
// 私聊 / 群聊 / 频道各自一套接口;按 conversationType 在循环内分支调度
let minId = startMinId || 0
const isPrivate = conversationType === ImConversationType.PRIVATE
const isChannel = conversationType === ImConversationType.CHANNEL
const size = isPrivate ? MESSAGE_PRIVATE_PULL_SIZE : MESSAGE_GROUP_PULL_SIZE
const isStillValid = () => pullEpoch === startEpoch && getCurrentUserId() === startUserId
while (true) {
if (!isStillValid()) {
return
}
let list: any[] | undefined
if (isPrivate) {
list = await apiPullPrivateMessages({ minId, size })
@ -152,6 +167,10 @@ export const useMessagePuller = () => {
} else {
list = await apiPullGroupMessages({ minId, size })
}
// 接口返回期间发生 cancel / 切账号:丢弃本批不入库,也不再翻页
if (!isStillValid()) {
return
}
if (!list || list.length === 0) {
break
}
@ -234,6 +253,24 @@ export const useMessagePuller = () => {
*/
let initialPulled = false
/**
* pull / IM cancelPull() pullByType epoch
* session session
*
* WS pull /
* initialPulled false watcher
*/
let pullEpoch = 0
/** 显式取消:仅由 Index.vue onUnmounted离开 IM / 切账号 / 路由跳出)调用 */
const cancelPull = () => {
pullEpoch++
// 旧 promise 仍在 finally 阶段跑,但 epoch 守卫已阻断后续副作用;这里立刻让 pullPromise = null 让新一轮可重入
pullPromise = null
// 同步丢弃 WS 缓冲帧;旧 pull 已不会 flushBuffer若不清下次进 IM 第一次 pullOnce 会把旧 session 的帧回放进新 store
wsStore.discardBuffer()
}
/** 执行一次全量增量拉取(重入安全:进行中再次调用复用同一个 promise */
const pullOnce = (): Promise<void> => {
if (!currentUserId) {
@ -242,21 +279,52 @@ export const useMessagePuller = () => {
if (pullPromise) {
return pullPromise
}
const startEpoch = pullEpoch
// 启动时的用户快照pullByType 每批 await 后比对当前登录用户,账号变了立刻丢弃
const startUserId = currentUserId
// 本轮 pull 仍属于当前 sessionepoch 未漂 + 用户未切;任何动新 store 状态的副作用都要先过这道关
const isCurrentPull = () => pullEpoch === startEpoch && getCurrentUserId() === startUserId
pullPromise = (async () => {
try {
// 旧 puller 在 cancelPull 未触发的异常路径上再进来时,先于任何副作用退出,避免污染新 session 的 loading
if (!isCurrentPull()) {
return
}
conversationStore.loading = true
try {
// 并发拉取私聊 + 群聊 + 频道,降低初始加载耗时
await Promise.all([
pullByType(ImConversationType.PRIVATE, conversationStore.privateMessageMaxId),
pullByType(ImConversationType.GROUP, conversationStore.groupMessageMaxId),
pullByType(ImConversationType.CHANNEL, conversationStore.channelMessageMaxId)
pullByType(
ImConversationType.PRIVATE,
conversationStore.privateMessageMaxId,
startEpoch,
startUserId
),
pullByType(
ImConversationType.GROUP,
conversationStore.groupMessageMaxId,
startEpoch,
startUserId
),
pullByType(
ImConversationType.CHANNEL,
conversationStore.channelMessageMaxId,
startEpoch,
startUserId
)
])
} catch (e) {
console.error('[IM] 拉取离线消息失败:', e)
} finally {
// 关闭 buffer 模式必须早于 flushBuffer否则 handler 看到 loading=true 会把消息又 push 回 buffer
conversationStore.loading = false
// 仍属本轮才复位 loading旧轮被 cancel / 切账号时由新一轮自管,避免覆盖新 session 的 true
if (isCurrentPull()) {
conversationStore.loading = false
}
}
// 取消 / 切账号后跳过 flushBuffer / 排序 / 已读位置补齐
if (!isCurrentPull()) {
return
}
// 回放 WebSocket 在 loading 期间收到的缓冲消息(此刻走正常 insertMessage 路径)
@ -278,12 +346,12 @@ export const useMessagePuller = () => {
// 离线期间错过的 RECEIPT 推送会被这里补回;其他私聊会话等用户点开时由 Index.vue 的 watch 触发
// 私聊已读关闭时跳过,避免打到已禁用接口触发错误日志
const active = conversationStore.activeConversation
if (
MESSAGE_PRIVATE_READ_ENABLED
&& active && active.type === ImConversationType.PRIVATE
) {
if (MESSAGE_PRIVATE_READ_ENABLED && active && active.type === ImConversationType.PRIVATE) {
try {
const maxReadId = await apiGetPrivateMaxReadMessageId(active.targetId)
if (!isCurrentPull()) {
return
}
if (maxReadId) {
conversationStore.applyReadReceipt({
conversationType: ImConversationType.PRIVATE,
@ -296,9 +364,13 @@ export const useMessagePuller = () => {
}
}
} finally {
// 整个 IIFE 全部完成(含已读位置补齐)后才允许下一次 pullOnce 重入
pullPromise = null
initialPulled = true
// 仍属本轮正常完成首拉epoch 等但 userId 切了:清 pullPromise 防卡死、不标首拉epoch 漂cancelPull 已清no-op
if (isCurrentPull()) {
pullPromise = null
initialPulled = true
} else if (pullEpoch === startEpoch) {
pullPromise = null
}
}
})()
return pullPromise
@ -317,5 +389,5 @@ export const useMessagePuller = () => {
}
)
return { pullOnce, convertPrivateMessage, convertGroupMessage }
return { pullOnce, cancelPull, convertPrivateMessage, convertGroupMessage }
}

View File

@ -65,7 +65,7 @@ const groupRequestStore = useGroupRequestStore()
const draftStore = useDraftStore()
const faceStore = useFaceStore()
const channelStore = useChannelStore()
const { pullOnce } = useMessagePuller()
const { pullOnce, cancelPull } = useMessagePuller()
const { readActive, syncPrivateReadStatus } = useMessageSender()
const voicePlayer = useVoicePlayer()
@ -153,8 +153,9 @@ function onBeforeUnload() {
}
window.addEventListener('beforeunload', onBeforeUnload)
/** 离开 IM 主壳:主动断 WebSocketdisconnect 内部已清掉 onclose 防自动重连)+ flush 草稿 + 表情缓存 reset + 解绑 unload + 停语音 */
/** 离开 IM 主壳:取消在飞的 pull防止旧响应写新 session+ 主动断 WebSocket + flush 草稿 + 表情缓存 reset + 解绑 unload + 停语音 */
onUnmounted(() => {
cancelPull()
webSocketStore.disconnect()
draftStore.flushPersist()
faceStore.reset()

View File

@ -19,7 +19,13 @@ import {
playAudioTip,
resolveCallEndReasonText
} from '../../utils/message'
import { MESSAGE_PRIVATE_READ_ENABLED, MESSAGE_GROUP_READ_ENABLED } from '../../utils/config'
import {
MESSAGE_PRIVATE_READ_ENABLED,
MESSAGE_GROUP_READ_ENABLED,
WS_RECONNECT_BASE_MS,
WS_RECONNECT_MAX_MS,
WS_RECONNECT_JITTER_MS
} from '../../utils/config'
import { useConversationStore } from './conversationStore'
import { useFriendStore, type FriendNotificationPayload } from './friendStore'
import { getFriendDisplayName } from '../../utils/user'
@ -120,6 +126,8 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
socket: null as WebSocket | null,
isConnected: false,
reconnectTimer: null as ReturnType<typeof setTimeout> | null,
/** 连续重连失败次数onopen 成功 / disconnect 主动断开后清零,用于指数退避 */
reconnectAttempts: 0,
heartbeatTimer: null as ReturnType<typeof setInterval> | null,
messageBuffer: [] as Array<
| { conversationType: typeof ImConversationType.PRIVATE; payload: ImPrivateMessageDTO }
@ -139,6 +147,11 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
return msgs
},
/** 直接丢弃缓冲帧不回放cancelPull / 离开 IM 调用,防止下次进 IM 把旧 session 帧回放进新 store */
discardBuffer() {
this.messageBuffer = []
},
/**
* WebSocket
* yudao /infra/ws sendObject(type, content)
@ -173,9 +186,10 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
const url = `${this.buildWsUrl()}/infra/ws?token=${refreshToken}`
this.socket = new WebSocket(url)
// 连接建立:标记上线 + 启动心跳保活
// 连接建立:标记上线 + 启动心跳保活;重连退避计数归零
this.socket.onopen = () => {
this.isConnected = true
this.reconnectAttempts = 0
console.log('[IM WS] connected')
this.startHeartbeat()
}
@ -193,18 +207,20 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
}
}
// 服务端关闭 / 网络断:标记下线,3 秒后自动重连
// 服务端关闭 / 网络断:标记下线,按指数退避自动重连
this.socket.onclose = () => {
this.isConnected = false
console.log('[IM WS] disconnected')
this.reconnect()
}
// 异常同样走重连onerror 后通常 onclose 也会触发reconnect 内部已防重)
// 异常时不主动 reconnect主动 close() 让 onclose 成为唯一重连入口:
// 1避免 onerror / onclose 双触把 reconnectAttempts 一次断连 +2
// 2兜底某些平台 onerror 后 onclose 延迟 / 丢失导致重连卡住
this.socket.onerror = (error) => {
console.error('[IM WS] error:', error)
this.isConnected = false
this.reconnect()
this.socket?.close()
}
},
@ -287,29 +303,26 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
typeof websocketMessage.sendTime === 'number'
? websocketMessage.sendTime
: new Date(websocketMessage.sendTime).getTime()
conversationStore.insertMessage(
buildChannelConversationStub(websocketMessage.channelId),
{
id: websocketMessage.id,
clientMessageId: '',
type: websocketMessage.type,
content: websocketMessage.content,
status: ImMessageStatus.UNREAD,
sendTime: sendTimeMs,
senderId: 0,
targetId: websocketMessage.channelId,
selfSend: false,
materialId: websocketMessage.materialId
}
)
conversationStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), {
id: websocketMessage.id,
clientMessageId: '',
type: websocketMessage.type,
content: websocketMessage.content,
status: ImMessageStatus.UNREAD,
sendTime: sendTimeMs,
senderId: 0,
targetId: websocketMessage.channelId,
selfSend: false,
materialId: websocketMessage.materialId
})
// 非当前会话 + 未免打扰:响一下提示音
const conversation = conversationStore.getConversation(
ImConversationType.CHANNEL,
websocketMessage.channelId
)
const isActive =
conversationStore.activeConversation?.type === ImConversationType.CHANNEL
&& conversationStore.activeConversation?.targetId === websocketMessage.channelId
conversationStore.activeConversation?.type === ImConversationType.CHANNEL &&
conversationStore.activeConversation?.targetId === websocketMessage.channelId
if (!isActive && !conversation?.silent && isNormalMessage(websocketMessage.type)) {
playAudioTip()
}
@ -432,6 +445,21 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
*/
handlePrivateMessage(websocketMessage: ImPrivateMessageDTO) {
const conversationStore = useConversationStore()
const userStore = useUserStore()
const friendStore = useFriendStore()
const currentUserId = Number(userStore.getUser?.id) || 0
// 0. 防御层senderId / receiverId 均不含当前用户的私聊帧直接丢弃,避免后端路由 / 多端串号污染会话
// FRIEND_* 等系统通知也走这条通道,但 fromUserId=senderId、toUserId=receiverId 仍是当前用户视角)
if (
currentUserId &&
websocketMessage.senderId !== currentUserId &&
websocketMessage.receiverId !== currentUserId
) {
console.warn('[IM WS] 丢弃不属于当前用户的私聊帧', websocketMessage)
return
}
// 1. 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱
if (conversationStore.loading) {
this.messageBuffer.push({
@ -442,9 +470,6 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
}
// 2. selfSend / peerId自己发的消息属于「发给 receiverId 的会话」,别人发的属于「发送者的会话」
const userStore = useUserStore()
const friendStore = useFriendStore()
const currentUserId = Number(userStore.getUser?.id) || 0
const selfSend = websocketMessage.senderId === currentUserId
const peerId = getPrivateMessagePeerId(websocketMessage, currentUserId)
// 未知对端(陌生人加好友前先收到消息等场景):异步补拉一次,下次再渲染就有 name/avatar
@ -507,7 +532,10 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
return
}
const conversationStore = useConversationStore()
conversationStore.markConversationAsRead(ImConversationType.PRIVATE, websocketMessage.receiverId)
conversationStore.markConversationAsRead(
ImConversationType.PRIVATE,
websocketMessage.receiverId
)
},
/**
@ -542,6 +570,25 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
*/
handleGroupMessage(websocketMessage: ImGroupMessageDTO) {
const conversationStore = useConversationStore()
const userStore = useUserStore()
const groupStore = useGroupStore()
const currentUserId = Number(userStore.getUser?.id) || 0
const selfSend = websocketMessage.senderId === currentUserId
// 0. 防御层:定向群消息 receiverUserIds 非空且未包含当前用户时丢弃
// 自己发的selfSend始终通过全员可见receiverUserIds 为空 / 缺失)也通过
const receiverUserIds = websocketMessage.receiverUserIds
if (
currentUserId &&
!selfSend &&
Array.isArray(receiverUserIds) &&
receiverUserIds.length > 0 &&
!receiverUserIds.includes(currentUserId)
) {
console.warn('[IM WS] 丢弃不属于当前用户的定向群消息', websocketMessage)
return
}
// 1. 离线加载期缓冲(与私聊对称)
if (conversationStore.loading) {
this.messageBuffer.push({
@ -550,10 +597,6 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
})
return
}
const userStore = useUserStore()
const groupStore = useGroupStore()
const currentUserId = Number(userStore.getUser?.id) || 0
const selfSend = websocketMessage.senderId === currentUserId
// 2. 未知群时自动拉群详情 + 成员(被拉入群但还没收到 GROUP_CREATE 时的兜底)
const group = groupStore.getGroup(websocketMessage.groupId)
@ -668,10 +711,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
friendStore.applyFriendRequestRejectedNotification(payload)
break
case ImMessageType.FRIEND_ADD:
friendStore.applyFriendAddNotification(payload, this.computeFriendPeerId(websocketMessage))
friendStore.applyFriendAddNotification(
payload,
this.computeFriendPeerId(websocketMessage)
)
break
case ImMessageType.FRIEND_DELETE:
friendStore.applyFriendDeleteNotification(payload, this.computeFriendPeerId(websocketMessage))
friendStore.applyFriendDeleteNotification(
payload,
this.computeFriendPeerId(websocketMessage)
)
break
case ImMessageType.FRIEND_BLOCK:
friendStore.applyFriendBlockNotification(payload)
@ -777,18 +826,32 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
// 主动断开(切账号 / 退出):清零退避计数,下次 connect 重新从最短间隔起算
this.reconnectAttempts = 0
},
/** 自动重连3 秒后再试onclose / onerror 都会进来,靠 reconnectTimer 自身防重) */
/**
* 退 base * 2^attempt max+ 0~jitter ms
*
* onclose onerror +2
* WS_RECONNECT_MAX_MS 30s disconnect
*/
reconnect() {
this.stopHeartbeat()
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
const backoff = Math.min(
WS_RECONNECT_BASE_MS * 2 ** this.reconnectAttempts,
WS_RECONNECT_MAX_MS
)
const delay = backoff + Math.floor(Math.random() * WS_RECONNECT_JITTER_MS)
this.reconnectAttempts++
console.log(`[IM WS] reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`)
this.reconnectTimer = setTimeout(() => {
console.log('[IM WS] reconnecting...')
this.connect()
}, 3000)
}, delay)
},
/** 心跳 5 秒一次,保活 + 探活(链路断了 onclose 会触发,由 reconnect 兜底) */

View File

@ -71,8 +71,26 @@ export const MESSAGE_VOICE_MAX_MB = 5
/** 可执行 / 脚本类扩展名黑名单接收端点击下载后本地双击就跑html 本地打开还能执行脚本 */
export const DANGEROUS_FILE_EXTENSIONS = [
'exe', 'bat', 'cmd', 'com', 'msi', 'scr', 'pif', 'vbs', 'vbe', 'wsf', 'ws',
'js', 'jse', 'jar', 'sh', 'app', 'ps1', 'reg', 'html', 'htm'
'exe',
'bat',
'cmd',
'com',
'msi',
'scr',
'pif',
'vbs',
'vbe',
'wsf',
'ws',
'js',
'jse',
'jar',
'sh',
'app',
'ps1',
'reg',
'html',
'htm'
]
// ==================== 前端独有UI 阈值 ====================
@ -100,3 +118,16 @@ export const CONVERSATION_RECENT_FORWARD_MAX = 12
* yudao.im.rtc.invite-timeout-minutes
*/
export const RTC_NO_ANSWER_CALL_CHECK_INTERVAL_MS = 60 * 1000
// ==================== 前端独有WebSocket 自动重连 ====================
// 指数退避base * 2^attempt上限封顶每次再叠加 0~jitter ms 随机偏移
// 避免服务端重启时全量客户端在同一秒打过来形成「惊群」;不设次数上限,持续重连直到链路恢复
/** 首次重连等待,单位 ms */
export const WS_RECONNECT_BASE_MS = 1000
/** 退避上限,单位 ms连续失败 5 次后稳定在 30s 不再增长 */
export const WS_RECONNECT_MAX_MS = 30 * 1000
/** 退避叠加的随机抖动上限,单位 ms */
export const WS_RECONNECT_JITTER_MS = 1000