From 2685bc357f29540f1fe64f784ad3b69223287d15 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 15 Jun 2026 08:26:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(im):=20=E5=A2=9E=E5=BC=BA=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=8B=89=E5=8F=96=E4=B8=8E=E7=8A=B6=E6=80=81=E8=A1=A5?= =?UTF-8?q?=E5=81=BF=E5=8F=AF=E9=9D=A0=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增会话读位置持久化接口与前端同步逻辑 - 增加好友、好友申请、加群申请的增量拉取补偿 - 统一前端 pull 编排,增加回扫窗口、落库等待和账号切换守卫 - 调整群成员为按群懒加载缓存,并移除全局成员增量链路 - 修复消息落库、读位置补偿、READ 事件乱序下的未读状态一致性 - 完善群申请红点快照刷新和管理员角色变化补偿 - 更新消息存储设计与修复记录文档 --- src/api/im/conversation/read/index.ts | 19 ++ src/api/im/friend/index.ts | 6 + src/api/im/friend/request/index.ts | 10 + src/api/im/group/request/index.ts | 6 + .../im/home/components/group/GroupInfo.vue | 15 +- .../im/home/composables/useMediaUploader.ts | 4 +- .../im/home/composables/useMessagePuller.ts | 240 +++++++++++------- .../im/home/composables/useMessageSender.ts | 44 ++-- src/views/im/home/index.vue | 24 +- .../components/message/MessagePanel.vue | 10 +- src/views/im/home/store/friendStore.ts | 214 ++++++++++++---- src/views/im/home/store/groupRequestStore.ts | 121 ++++++++- src/views/im/home/store/groupStore.ts | 131 ++++++++-- src/views/im/home/store/messageStore.ts | 227 ++++++++++++++--- src/views/im/home/store/websocketStore.ts | 97 ++++--- src/views/im/home/types/index.ts | 3 +- src/views/im/utils/db.ts | 20 +- src/views/im/utils/pull.ts | 152 +++++++++++ 18 files changed, 1051 insertions(+), 292 deletions(-) create mode 100644 src/api/im/conversation/read/index.ts create mode 100644 src/views/im/utils/pull.ts diff --git a/src/api/im/conversation/read/index.ts b/src/api/im/conversation/read/index.ts new file mode 100644 index 000000000..123b33836 --- /dev/null +++ b/src/api/im/conversation/read/index.ts @@ -0,0 +1,19 @@ +import request from '@/config/axios' + +// IM 会话读位置 Response VO +export interface ImConversationReadRespVO { + id: number // 读位置编号(增量拉取游标用) + conversationType: number // 会话类型,参见 ImConversationType + targetId: number // 会话目标编号 + messageId: number // 最大已读消息编号 + updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用) +} + +// 增量拉取当前用户的会话读位置(重连 / 离线补偿) +export const pullMyConversationReadList = (params: { + lastUpdateTime?: number + lastId?: number + limit: number +}) => { + return request.get({ url: '/im/conversation-read/pull', params }) +} diff --git a/src/api/im/friend/index.ts b/src/api/im/friend/index.ts index c16e478e8..9cd35d121 100644 --- a/src/api/im/friend/index.ts +++ b/src/api/im/friend/index.ts @@ -13,6 +13,7 @@ export interface ImFriendRespVO { status?: number // 好友状态(0=正常,1=已删除) addTime?: string // 添加好友时间 deleteTime?: string // 删除好友时间 + updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用) // 聚合字段(自 AdminUser) nickname?: string // 好友昵称 nicknamePinyin?: string // 昵称的拼音(小写无空格,前端按首字母分桶 / 拼音搜索) @@ -32,6 +33,11 @@ export const getMyFriendList = () => { return request.get({ url: '/im/friend/list' }) } +// 增量拉取当前用户的好友关系(重连 / 离线补偿) +export const pullMyFriendList = (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => { + return request.get({ url: '/im/friend/pull', params }) +} + // 获得好友详情 export const getFriend = (friendUserId: number | string) => { return request.get({ url: '/im/friend/get', params: { friendUserId } }) diff --git a/src/api/im/friend/request/index.ts b/src/api/im/friend/request/index.ts index 63072b57e..660ff5297 100644 --- a/src/api/im/friend/request/index.ts +++ b/src/api/im/friend/request/index.ts @@ -11,6 +11,7 @@ export interface ImFriendRequestRespVO { addSource?: number // 添加来源;参见 ImFriendAddSourceEnum handleTime?: string // 处理时间 createTime: string // 申请创建时间 + updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用) // 聚合字段(自 AdminUser) fromNickname?: string // 发起方昵称 fromAvatar?: string // 发起方头像 @@ -56,6 +57,15 @@ export const getMyFriendRequestList = (limit: number, maxId?: number) => { }) } +// 增量拉取「我相关」的好友申请变更(重连 / 离线补偿) +export const pullMyFriendRequestList = (params: { + lastUpdateTime?: number + lastId?: number + limit: number +}) => { + return request.get({ url: '/im/friend-request/pull', params }) +} + // 按 id 单查「我相关」的申请记录(带越权过滤;WebSocket 通知到达后用) export const getMyFriendRequest = (id: number) => { return request.get({ diff --git a/src/api/im/group/request/index.ts b/src/api/im/group/request/index.ts index 14aa41a50..d9ced2b6f 100644 --- a/src/api/im/group/request/index.ts +++ b/src/api/im/group/request/index.ts @@ -13,6 +13,7 @@ export interface ImGroupRequestRespVO { addSource?: number // 加入来源;参见 ImGroupAddSourceEnum handleTime?: string // 处理时间 createTime: string // 申请创建时间 + updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用) // 聚合字段 userNickname?: string // 申请人 / 被邀请人昵称 userAvatar?: string // 申请人 / 被邀请人头像 @@ -69,3 +70,8 @@ export const getMyGroupRequest = (id: number) => { params: { id } }) } + +// 增量拉取我管理的所有群下加群申请变更(重连 / 离线补偿) +export const pullMyGroupRequestList = (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => { + return request.get({ url: '/im/group-request/pull', params }) +} diff --git a/src/views/im/home/components/group/GroupInfo.vue b/src/views/im/home/components/group/GroupInfo.vue index 40bd40993..3e9389efe 100644 --- a/src/views/im/home/components/group/GroupInfo.vue +++ b/src/views/im/home/components/group/GroupInfo.vue @@ -24,7 +24,10 @@ -
+
{ } if (cached.membersLoaded && cached.members) { const myId = getCurrentUserId() - return cached.members.some( - (m) => m.userId === myId && m.status === CommonStatusEnum.ENABLE - ) + return cached.members.some((m) => m.userId === myId && m.status === CommonStatusEnum.ENABLE) } return true }) @@ -125,13 +126,11 @@ watch( if (!id || !member) { return } - const list = await groupStore.fetchGroupMemberList(id) + const list = await groupStore.fetchGroupMemberList(id, true) if (props.group?.id !== id) { return } - members.value = list.map((m) => - convertGroupMemberLite(m, friendStore.getFriend(m.userId)) - ) + members.value = list.map((m) => convertGroupMemberLite(m, friendStore.getFriend(m.userId))) }, { immediate: true } ) diff --git a/src/views/im/home/composables/useMediaUploader.ts b/src/views/im/home/composables/useMediaUploader.ts index 2b3ab0124..cac807995 100644 --- a/src/views/im/home/composables/useMediaUploader.ts +++ b/src/views/im/home/composables/useMediaUploader.ts @@ -198,7 +198,7 @@ export const useMediaUploader = () => { uploadProgress: 0, _localFile: opts.file } - messageStore.insertMessage( + void messageStore.insertMessage( { type: conversation.type, targetId: conversation.targetId, @@ -206,7 +206,7 @@ export const useMediaUploader = () => { avatar: conversation.avatar || '' }, placeholder - ) + ).catch(() => undefined) return { clientMessageId, blobUrl } } diff --git a/src/views/im/home/composables/useMessagePuller.ts b/src/views/im/home/composables/useMessagePuller.ts index 5adf59eeb..85b778c18 100644 --- a/src/views/im/home/composables/useMessagePuller.ts +++ b/src/views/im/home/composables/useMessagePuller.ts @@ -5,6 +5,7 @@ import { useImWebSocketStore } from '../store/websocketStore' import { useFriendStore } from '../store/friendStore' import { getFriendDisplayName } from '../../utils/user' import { useGroupStore } from '../store/groupStore' +import { useGroupRequestStore } from '../store/groupRequestStore' import { pullPrivateMessages as apiPullPrivateMessages, getPrivateMaxReadMessageId as apiGetPrivateMaxReadMessageId, @@ -18,6 +19,7 @@ import { pullChannelMessages as apiPullChannelMessages, type ImChannelMessageRespVO } from '@/api/im/message/channel' +import { pullMyConversationReadList as apiPullMyConversationReadList } from '@/api/im/conversation/read' import { ImConversationType, ImMessageStatus, @@ -32,9 +34,14 @@ import { } from '../../utils/config' import { buildChannelConversationStub } from '../../utils/channel' import { generateClientMessageId, getPrivateMessagePeerId } from '../../utils/message' +import { runIncrementalPull, runMinIdPull } from '../../utils/pull' +import { StorageKeys } from '../../utils/db' import { getCurrentUserId } from '@/utils/auth' import type { Message } from '../types' +/** 三类消息 pull 接口返回的原始 VO 联合类型;runMinIdPull 只需 id 推进游标,具体分发在 applyPage 内按类型 cast */ +type PulledRawMessage = ImPrivateMessageRespVO | ImGroupMessageRespVO | ImChannelMessageRespVO + /** * 消息增量拉取:登录后分页拉取离线期间的新消息 * @@ -52,6 +59,7 @@ export const useMessagePuller = () => { const wsStore = useImWebSocketStore() const friendStore = useFriendStore() const groupStore = useGroupStore() + const groupRequestStore = useGroupRequestStore() const currentUserId = getCurrentUserId() /** 判断请求是否被主动取消 */ @@ -150,12 +158,12 @@ export const useMessagePuller = () => { } /** - * 循环拉取指定会话类型的消息:以本批最大 id 作为下次 minId,直到接口返回空列表或游标不再前进 + * 分类型拉取离线消息:翻页 / minId 游标推进 / 空页停由 runMinIdPull 负责,这里只做接口分支 + 逐条业务分发 + * (撤回 / 好友通知 / 普通消息)+ 入库。 * - * 取消语义两层守卫: - * 1. startEpoch:cancelPull() 递增 pullEpoch;离开 IM / 切账号时旧循环检测到漂移即跳出 - * 2. startUserId:每批 await 后比对当前登录 userId;防御 logout / 多 tab 异常下用户已切但 cancelPull 未触发 - * 两者任一不等都丢弃本批不入库,避免旧 session 的接口响应在新 store 落地 + * 取消语义两层守卫,经 isActive 传入 runMinIdPull,任一不等即丢弃本批不入库、停止翻页,避免旧 session 响应落到新 store: + * 1. startEpoch:cancelPull() 递增 pullEpoch;离开 IM / 切账号时跳出 + * 2. startUserId:每批 await 后比对当前登录 userId;防御 logout / 多 tab 下用户已切但 cancelPull 未触发 */ const pullByType = async ( conversationType: number, @@ -164,108 +172,90 @@ export const useMessagePuller = () => { startUserId: number, signal: AbortSignal ) => { - // 私聊 / 群聊 / 频道各自一套接口;按 conversationType 在循环内分支调度 - let minId = startMinId || 0 + // 私聊 / 群聊 / 频道各自一套接口;按 conversationType 分支调度。翻页机制(minId 游标 / 空页判断 / 防死翻)交给 runMinIdPull const isPrivate = conversationType === ImConversationType.PRIVATE const isChannel = conversationType === ImConversationType.CHANNEL const size = isPrivate ? MESSAGE_PRIVATE_PULL_SIZE : MESSAGE_GROUP_PULL_SIZE const isStillValid = () => !signal.aborted && pullEpoch === startEpoch && getCurrentUserId() === startUserId - while (true) { - if (!isStillValid()) { - return - } - let list: any[] | undefined - if (isPrivate) { - list = await apiPullPrivateMessages({ minId, size }, signal) - } else if (isChannel) { - list = await apiPullChannelMessages({ minId, size }, signal) - } else { - list = await apiPullGroupMessages({ minId, size }, signal) - } - // 接口返回期间发生 cancel / 切账号:丢弃本批不入库,也不再翻页 - if (!isStillValid()) { - return - } - if (!list || list.length === 0) { - break - } - - const pulledMessages: PulledMessage[] = [] - // 逐条 dispatch:原消息走批量 insert;RECALL 信号走批量 recall 把同批内已 insert 的原消息更新为撤回提示。 - // 后端按 id 升序返回,且信号 id 一定 > 原消息 id(先更新 status 再插信号),所以原消息一定先到、recallMessage 找得到 - for (const raw of list) { - if (isChannel) { - const message = raw as ImChannelMessageRespVO - pulledMessages.push({ - kind: 'insert', - conversationInfo: convertChannelConversation(message), - message: convertChannelMessage(message) - }) - continue - } + await runMinIdPull({ + initialMinId: startMinId, + pageSize: size, + isActive: isStillValid, + fetchPage: ({ minId, size }) => { if (isPrivate) { - const message = raw as ImPrivateMessageRespVO - // 特殊:撤回消息的处理 - if (message.type === ImMessageType.RECALL) { + return apiPullPrivateMessages({ minId, size }, signal) + } + if (isChannel) { + return apiPullChannelMessages({ minId, size }, signal) + } + return apiPullGroupMessages({ minId, size }, signal) + }, + applyPage: async (list, nextMinId) => { + const pulledMessages: PulledMessage[] = [] + // 逐条 dispatch:原消息走批量 insert;RECALL 信号走批量 recall 把同批内已 insert 的原消息更新为撤回提示。 + // 后端按 id 升序返回,且信号 id 一定 > 原消息 id(先更新 status 再插信号),所以原消息一定先到、recallMessage 找得到 + for (const raw of list) { + if (isChannel) { + const message = raw as ImChannelMessageRespVO pulledMessages.push({ - kind: 'recall', - conversationType: ImConversationType.PRIVATE, - targetId: getPrivatePeerId(message), - recallSignalContent: message.content + kind: 'insert', + conversationInfo: convertChannelConversation(message), + message: convertChannelMessage(message) }) continue } - // 特殊:离线 pull 期间入库的 FRIEND_* 帧(目前仅 FRIEND_ADD persistent=true)也要走好友数据分发, - // 否则断线期间的好友列表更新会丢失;与 WebSocket 路径 dispatchPrivateFrame 保持对称 - if (isFriendNotification(message.type)) { - wsStore.handleFriendNotification(message) - // 仅 FRIEND_ADD / FRIEND_DELETE 才作为会话气泡入消息列表 - if (!isFriendChatTip(message.type)) { + if (isPrivate) { + const message = raw as ImPrivateMessageRespVO + // 特殊:撤回消息的处理 + if (message.type === ImMessageType.RECALL) { + pulledMessages.push({ + kind: 'recall', + conversationType: ImConversationType.PRIVATE, + targetId: getPrivatePeerId(message), + recallSignalContent: message.content + }) continue } - } - // 其它消息正常入会话消息列表 - pulledMessages.push({ - kind: 'insert', - conversationInfo: convertPrivateConversation(message), - message: convertPrivateMessage(message) - }) - } else { - const message = raw as ImGroupMessageRespVO - // 特殊:撤回消息的处理 - if (message.type === ImMessageType.RECALL) { + // 特殊:离线 pull 期间入库的 FRIEND_* 帧(目前仅 FRIEND_ADD persistent=true)也要走好友数据分发, + // 否则断线期间的好友列表更新会丢失;与 WebSocket 路径 dispatchPrivateFrame 保持对称 + if (isFriendNotification(message.type)) { + wsStore.handleFriendNotification(message) + // 仅 FRIEND_ADD / FRIEND_DELETE 才作为会话气泡入消息列表 + if (!isFriendChatTip(message.type)) { + continue + } + } + // 其它消息正常入会话消息列表 pulledMessages.push({ - kind: 'recall', - conversationType: ImConversationType.GROUP, - targetId: message.groupId, - recallSignalContent: message.content + kind: 'insert', + conversationInfo: convertPrivateConversation(message), + message: convertPrivateMessage(message) + }) + } else { + const message = raw as ImGroupMessageRespVO + // 特殊:撤回消息的处理 + if (message.type === ImMessageType.RECALL) { + pulledMessages.push({ + kind: 'recall', + conversationType: ImConversationType.GROUP, + targetId: message.groupId, + recallSignalContent: message.content + }) + continue + } + // 其它消息正常入会话消息列表 + pulledMessages.push({ + kind: 'insert', + conversationInfo: convertGroupConversation(message), + message: convertGroupMessage(message) }) - continue } - // 其它消息正常入会话消息列表 - pulledMessages.push({ - kind: 'insert', - conversationInfo: convertGroupConversation(message), - message: convertGroupMessage(message) - }) } + // 入库 + 推进 messageMaxId;nextMinId 为空(本批无有效 id)时不推进游标,与旧逻辑一致 + await messageStore.applyPulledMessageList(pulledMessages, conversationType, nextMinId) } - - // 游标推进到本批最大消息编号 - const validIds = list.map((message) => message.id).filter((id): id is number => id != null) - if (validIds.length === 0) { - await messageStore.applyPulledMessageList(pulledMessages, conversationType) - break - } - const nextMinId = Math.max(...validIds) - await messageStore.applyPulledMessageList(pulledMessages, conversationType, nextMinId) - // 游标没前进就停:当前后端契约是 id > minId,理论不会出现;防御后端契约变更或边界数据死翻 - if (nextMinId <= minId) { - break - } - minId = nextMinId - } + }) } /** 同一时刻只允许一次 pull:Index.vue 的手动调用与重连 watch 触发可能并发,共用同一个 promise 即可去重 */ @@ -298,6 +288,47 @@ export const useMessagePuller = () => { wsStore.discardBuffer() } + /** 增量拉取我的会话读位置并合并到本地展示态 */ + const pullConversationReads = async (isActive: () => boolean): Promise => { + await runIncrementalPull( + StorageKeys.settings.conversationReadPullCursor, + apiPullMyConversationReadList, + async (records) => { + if (!isActive()) { + return false + } + await messageStore.applyConversationReadList(records, isActive) + if (!isActive()) { + return false + } + return true + }, + isActive + ) + } + + /** + * 状态事件补偿:好友 / 好友申请走增量;群列表和群申请红点走快照刷新 + * + * 首登主数据由 index.vue 驱动,重连时各 store 已就位,多路 allSettled 并发互不影响,单路失败仅记日志。 + * 群成员不做全局增量同步,重连只标记本地群成员 cache 过期,进入群会话或成员列表时再按 groupId 刷新。 + */ + const pullStateEvents = async (): Promise => { + groupStore.markAllGroupMembersExpired() + const results = await Promise.allSettled([ + friendStore.pullFriends(), + friendStore.pullFriendRequests(), + groupStore.fetchGroupList(true), + groupRequestStore.pullGroupRequests(), + groupRequestStore.fetchUnhandledGroupRequestList() + ]) + for (const result of results) { + if (result.status === 'rejected') { + console.warn('[IM] 状态事件增量补偿失败', result.reason) + } + } + } + /** 执行一次全量增量拉取(重入安全:进行中再次调用复用同一个 promise) */ const pullOnce = (): Promise => { if (!currentUserId) { @@ -323,8 +354,9 @@ export const useMessagePuller = () => { return } conversationStore.loading = true + let messagePullSucceeded = false try { - // 并发拉取私聊 + 群聊 + 频道,降低初始加载耗时 + // 并发拉取私聊 + 群聊 + 频道消息,降低初始加载耗时 await Promise.all([ pullByType( ImConversationType.PRIVATE, @@ -348,6 +380,7 @@ export const useMessagePuller = () => { abortController.signal ) ]) + messagePullSucceeded = true } catch (e) { if (isAbortError(e)) { return @@ -364,22 +397,33 @@ export const useMessagePuller = () => { if (!isCurrentPull()) { return } + if (!messagePullSucceeded) { + return + } - // 回放 WebSocket 在 loading 期间收到的缓冲消息(此刻走正常 insertMessage 路径) + // 回放 WebSocket 在 loading 期间收到的缓冲消息 const buffered = wsStore.flushBuffer() + const replayPersistPromises: Promise[] = [] for (const item of buffered) { if (item.conversationType === ImConversationType.PRIVATE) { - wsStore.handlePrivateMessage(item.payload) + replayPersistPromises.push(wsStore.handlePrivateMessage(item.payload)) } else if (item.conversationType === ImConversationType.CHANNEL) { - wsStore.handleChannelMessage(item.payload) + replayPersistPromises.push(wsStore.handleChannelMessage(item.payload)) } else { - wsStore.handleGroupMessage(item.payload) + replayPersistPromises.push(wsStore.handleGroupMessage(item.payload)) } } + await Promise.all(replayPersistPromises) // pull + replay 都完成后再排序,避免回放消息打乱顺序 conversationStore.sortConversationList() + // 消息和缓冲帧落库后再补读位置,避免读位置游标先推进导致新消息展示态漏更新 + await pullConversationReads(isCurrentPull) + if (!isCurrentPull()) { + return + } + // 重连 / 冷启动后补齐当前激活私聊会话的「对方已读位置」 // 离线期间错过的 RECEIPT 推送会被这里补回;其他私聊会话等用户点开时由 Index.vue 的 watch 触发 // 私聊已读关闭时跳过,避免打到已禁用接口触发错误日志 @@ -427,14 +471,16 @@ export const useMessagePuller = () => { } /** - * 断网期间 WS 收不到推送,期间产生的消息只能靠拉取接口按 minId 游标补齐; - * 首次连接由 Index.vue 显式调 pullOnce 完成首拉,这里仅覆盖之后的重连 + * 断网期间 WS 收不到推送:重连后既要按 minId 补齐消息,也要按 update_time + id 补齐好友 / 群 / 群申请状态。 + * 首次连接由 Index.vue 显式驱动(pullOnce 拉消息 + 各 store 首拉),这里仅覆盖之后的重连。 + * 重连时 store 已就位,pullStateEvents 与 pullOnce 并发即可,无需「先就位再拉消息」的首登顺序约束。 */ watch( () => wsStore.isConnected, (isConnected) => { if (isConnected && initialPulled) { void pullOnce() + void pullStateEvents() } } ) diff --git a/src/views/im/home/composables/useMessageSender.ts b/src/views/im/home/composables/useMessageSender.ts index 5c1acb398..2d5251611 100644 --- a/src/views/im/home/composables/useMessageSender.ts +++ b/src/views/im/home/composables/useMessageSender.ts @@ -131,7 +131,7 @@ export const useMessageSender = () => { name: conversation.name || String(realTarget), avatar: conversation.avatar || '' } - messageStore.insertMessage(conversationInfo, message) + void messageStore.insertMessage(conversationInfo, message).catch(() => undefined) } // 3. 发送请求:按会话类型分发到不同接口;成功后 ackMessage 更新为 NORMAL,失败更新为 FAILED @@ -143,13 +143,15 @@ export const useMessageSender = () => { type, content }) - void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, { - id: data.id, - sendTime: new Date(data.sendTime).getTime(), - status: data.status, - receiptStatus: data.receiptStatus, - content: data.content - }) + void messageStore + .ackMessage(conversation.type, realTarget, clientMessageId, { + id: data.id, + sendTime: new Date(data.sendTime).getTime(), + status: data.status, + receiptStatus: data.receiptStatus, + content: data.content + }) + .catch(() => undefined) } else if (conversation.type === ImConversationType.GROUP) { const data = await apiSendGroupMessage({ clientMessageId, @@ -159,21 +161,25 @@ export const useMessageSender = () => { atUserIds: options?.atUserIds, receipt: options?.receipt }) - void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, { - id: data.id, - sendTime: new Date(data.sendTime).getTime(), - status: data.status, - receiptStatus: data.receiptStatus, - readCount: data.readCount, - content: data.content - }) + void messageStore + .ackMessage(conversation.type, realTarget, clientMessageId, { + id: data.id, + sendTime: new Date(data.sendTime).getTime(), + status: data.status, + receiptStatus: data.receiptStatus, + readCount: data.readCount, + content: data.content + }) + .catch(() => undefined) } return true } catch (e) { console.error('[IM] 消息发送失败', { type, realTarget, clientMessageId }, e) - void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, { - status: ImMessageStatus.FAILED - }) + void messageStore + .ackMessage(conversation.type, realTarget, clientMessageId, { + status: ImMessageStatus.FAILED + }) + .catch(() => undefined) return false } } diff --git a/src/views/im/home/index.vue b/src/views/im/home/index.vue index fc7a216f5..a84e8c7a7 100644 --- a/src/views/im/home/index.vue +++ b/src/views/im/home/index.vue @@ -87,34 +87,44 @@ onMounted(async () => { channelStore.loadChannelList(), groupRequestStore.loadGroupRequestList() ]) - // 1.4 我管理的群下未处理加群申请后台刷新 + groupStore.markAllGroupMembersExpired() + // 1.4 我管理的群下未处理加群申请红点:首登用 unhandled-list(服务端直接过滤未处理,语义精准、启动轻); + // pullGroupRequests 只在重连 / 后续补偿时跑(见 useMessagePuller.pullStateEvents),不进首登主链路 void groupRequestStore .fetchUnhandledGroupRequestList() .catch((e) => console.warn('[IM] 拉取未处理加群申请失败', e)) - // 2.1 有缓存:异步背景刷新,失败仅记日志(IDB 数据已经够撑首屏,pullOnce 也能正常入库) + // 2. 好友主数据恢复走 pull;群列表用快照刷新,覆盖离线期间自己的入群 / 退群状态变化 + // 2.1 有缓存:异步背景增量刷新,失败仅记日志(IDB 数据已经够撑首屏,pullOnce 也能正常入库) // 2.2 无缓存(首登 / 切账号回切):必须 await + 失败抛出中断本轮 onMounted, - // 否则 pullOnce 会用 senderId 数字给会话起名落到 IDB 后续基本无法自愈;无缓存分支两个 fetch 并发 Promise.all 省一个 RTT + // 否则 pullOnce 会用 senderId 数字给会话起名落到 IDB 后续基本无法自愈;无缓存分支并发 Promise.all 省一个 RTT const requiredFetches: Promise[] = [] if (hasCachedFriends) { - void friendStore.fetchFriendList().catch((e) => console.warn('[IM] 后台刷好友失败', e)) + void friendStore.pullFriends().catch((e) => console.warn('[IM] 后台增量拉好友失败', e)) } else { - requiredFetches.push(friendStore.fetchFriendList()) + requiredFetches.push(friendStore.pullFriends()) } if (hasCachedGroups) { - void groupStore.fetchGroupList().catch((e) => console.warn('[IM] 后台刷群列表失败', e)) + void groupStore.fetchGroupList(true).catch((e) => console.warn('[IM] 后台刷新群列表失败', e)) } else { - requiredFetches.push(groupStore.fetchGroupList()) + requiredFetches.push(groupStore.fetchGroupList(true)) } + // 2.3 频道无增量 pull 接口,继续走 list if (hasCachedChannels) { void channelStore.fetchChannelList().catch((e) => console.warn('[IM] 后台刷频道列表失败', e)) } else { requiredFetches.push(channelStore.fetchChannelList()) } + // 2.4 执行加载 if (requiredFetches.length > 0) { await Promise.all(requiredFetches) } + // 2.5 好友申请增量补偿:首登也要跑,离线期间好友申请变更不会影响好友主表 + void friendStore + .pullFriendRequests() + .catch((e) => console.warn('[IM] 后台增量拉好友申请失败', e)) + // 3. 实时通信:建 WebSocket 长连接 + 拉离线消息(pullOnce finally 把 loading 归位) webSocketStore.connect() await pullOnce() diff --git a/src/views/im/home/pages/conversation/components/message/MessagePanel.vue b/src/views/im/home/pages/conversation/components/message/MessagePanel.vue index 32db1f631..1bf687fa9 100644 --- a/src/views/im/home/pages/conversation/components/message/MessagePanel.vue +++ b/src/views/im/home/pages/conversation/components/message/MessagePanel.vue @@ -449,10 +449,12 @@ async function ensureGroupData(groupId: number) { console.warn('[IM MessagePanel] loadGroupMemberList 失败', { groupId }, error) return null }) - // 再从远程异步拉成员,强刷以跳过 in-memory 缓存,每次进群都能拿到最新成员状态 - groupStore.fetchGroupMemberList(groupId, true).catch((error) => { - console.warn('[IM MessagePanel] fetchGroupMemberList 失败', { groupId }, error) - }) + const group = groupStore.getGroup(groupId) + if (!group?.membersLoaded || group.membersExpired) { + groupStore.fetchGroupMemberList(groupId, true).catch((error) => { + console.warn('[IM MessagePanel] fetchGroupMemberList 失败', { groupId }, error) + }) + } } /** 群信息抽屉里点"刷新":强拉一次最新群元数据 + 群成员 */ diff --git a/src/views/im/home/store/friendStore.ts b/src/views/im/home/store/friendStore.ts index 6983328e1..820bd0653 100644 --- a/src/views/im/home/store/friendStore.ts +++ b/src/views/im/home/store/friendStore.ts @@ -4,6 +4,7 @@ import { store } from '@/store' import { CommonStatusEnum } from '@/utils/constants' import { getMyFriendList as apiGetMyFriendList, + pullMyFriendList as apiPullMyFriendList, getFriend as apiGetFriend, deleteFriend as apiDeleteFriend, updateFriend as apiUpdateFriend, @@ -16,6 +17,7 @@ import { agreeFriendRequest as apiAgreeFriendRequest, refuseFriendRequest as apiRefuseFriendRequest, getMyFriendRequestList as apiGetMyFriendRequestList, + pullMyFriendRequestList as apiPullMyFriendRequestList, getMyFriendRequest as apiGetMyFriendRequest, type ImFriendRequestApplyReqVO, type ImFriendRequestRespVO @@ -23,17 +25,20 @@ import { import { useConversationStore } from './conversationStore' import { ImConversationType, ImFriendRequestHandleResult } from '../../utils/constants' import { FRIEND_REQUEST_PAGE_SIZE } from '../../utils/config' -import { getDb } from '../../utils/db' +import { getDb, StorageKeys } from '../../utils/db' +import { runIncrementalPull } from '../../utils/pull' import { getCurrentUserId } from '@/utils/auth' import { getFriendDisplayName } from '../../utils/user' import type { Friend, FriendDO, FriendLite, FriendRequest, FriendRequestDO } from '../types' +type PendingRequest = { epoch: number; userId: number; promise: Promise } + /** 当前正在进行的好友列表拉取;多 dispatcher 同时触发时复用同一 Promise,避免雪崩重拉 */ -let pendingFetchFriends: Promise | null = null +let pendingFetchFriends: PendingRequest | null = null /** 当前正在进行的好友申请列表拉取;多端连续多条申请到达时复用同一 Promise,避免雪崩重拉 */ -let pendingFetchRequests: Promise | null = null +let pendingFetchRequests: PendingRequest | null = null /** 当前正在进行的「加载更多申请」请求 */ -let pendingLoadMoreRequests: Promise | null = null +let pendingLoadMoreRequests: PendingRequest | null = null /** clear() 时递增;旧账号那次还没返回的请求 resolve 后比对一致才写 store,防跨账号数据泄漏 */ let storeEpoch = 0 @@ -176,13 +181,18 @@ export const useFriendStore = defineStore('imFriendStore', { }, /** 保存单个好友 */ - saveFriend(friend: Friend | undefined): void { + async saveFriendRecord(friend: Friend | undefined): Promise { if (!friend?.id) { return } - void getDb() - .put('friends', friend) - .catch((e) => console.warn('[IM friendStore] 本地好友写入失败', e)) + await getDb().put('friends', friend) + }, + + /** 保存单个好友 */ + saveFriend(friend: Friend | undefined): void { + void this.saveFriendRecord(friend).catch((e) => + console.warn('[IM friendStore] 本地好友写入失败', e) + ) }, /** 保存好友申请列表 */ @@ -199,13 +209,18 @@ export const useFriendStore = defineStore('imFriendStore', { }, /** 保存单条好友申请 */ - saveFriendRequest(request: FriendRequest | undefined): void { + async saveFriendRequestRecord(request: FriendRequest | undefined): Promise { if (!request) { return } - void getDb() - .put('friendRequests', request) - .catch((e) => console.warn('[IM friendStore] 本地好友申请写入失败', e)) + await getDb().put('friendRequests', request) + }, + + /** 保存单条好友申请 */ + saveFriendRequest(request: FriendRequest | undefined): void { + void this.saveFriendRequestRecord(request).catch((e) => + console.warn('[IM friendStore] 本地好友申请写入失败', e) + ) }, // ==================== 远端拉取 ==================== @@ -215,14 +230,18 @@ export const useFriendStore = defineStore('imFriendStore', { if (this.loaded && !force) { return } - if (pendingFetchFriends) { - return pendingFetchFriends - } // 快照 epoch;clear() 之后到 .then 之间触发的 epoch++ 表示账号已切,旧结果不能写入新 store const requestEpoch = storeEpoch - pendingFetchFriends = apiGetMyFriendList() + const requestUserId = getCurrentUserId() + if ( + pendingFetchFriends?.epoch === requestEpoch && + pendingFetchFriends.userId === requestUserId + ) { + return pendingFetchFriends.promise + } + const promise = apiGetMyFriendList() .then((list) => { - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } this.friends = (list || []).map(convertFriend) @@ -241,23 +260,56 @@ export const useFriendStore = defineStore('imFriendStore', { this.saveFriendList() }) .finally(() => { - if (requestEpoch === storeEpoch) { + if ( + pendingFetchFriends?.epoch === requestEpoch && + pendingFetchFriends.userId === requestUserId + ) { pendingFetchFriends = null } }) - return pendingFetchFriends + pendingFetchFriends = { epoch: requestEpoch, userId: requestUserId, promise } + return promise + }, + + /** + * 增量拉取好友变更并合并:进入 IM 首屏主数据恢复(空游标 = 首次全量增量拉)+ 重连 / 离线补偿 + * + * 含已删除好友,按 status 走 upsert + */ + async pullFriends() { + // 快照 epoch;账号在拉取途中切换(clear() → epoch++)时丢弃旧账号那几页结果,防跨账号数据泄漏 + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() + const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId + await runIncrementalPull( + StorageKeys.settings.friendPullCursor, + apiPullMyFriendList, + async (records) => { + if (!isActive()) { + return false + } + await Promise.all(records.map((vo) => this.upsertFriendForPull(convertFriend(vo)))) + return true + }, + isActive + ) + // 置 loaded,供通讯录页 fetchFriendList(force=false) 复用缓存而非重复全量拉 + if (isActive()) { + this.loaded = true + } }, /** 按 friendUserId 获取详情并合并到本地(保证 nickname / avatar 最新) */ async fetchFriendInfo(friendUserId: number) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() try { const data = await apiGetFriend(friendUserId) if (!data) { return } // clear() 已切账号:旧请求的好友详情不能再 upsert 进新账号的 friends - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } this.upsertFriend(convertFriend(data)) @@ -307,12 +359,19 @@ export const useFriendStore = defineStore('imFriendStore', { /** 拉取「我相关」的好友申请列表首页(页面打开 / 收到 FRIEND_REQUEST_RECEIVED 时刷新);pending 期间复用同一 Promise */ async fetchFriendRequestList() { if (pendingFetchRequests) { - return pendingFetchRequests + const currentUserId = getCurrentUserId() + if ( + pendingFetchRequests.epoch === storeEpoch && + pendingFetchRequests.userId === currentUserId + ) { + return pendingFetchRequests.promise + } } const requestEpoch = storeEpoch - pendingFetchRequests = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE) + const requestUserId = getCurrentUserId() + const promise = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE) .then((list) => { - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const items = (list || []).map(convertFriendRequest) @@ -322,26 +381,39 @@ export const useFriendStore = defineStore('imFriendStore', { this.saveFriendRequestList() }) .finally(() => { - if (requestEpoch === storeEpoch) { + if ( + pendingFetchRequests?.epoch === requestEpoch && + pendingFetchRequests.userId === requestUserId + ) { pendingFetchRequests = null } }) - return pendingFetchRequests + pendingFetchRequests = { epoch: requestEpoch, userId: requestUserId, promise } + return promise }, /** 加载更多申请(按本地最旧 requestId 游标分页);无更多 / pending 中直接返回 */ async loadMoreFriendRequestList() { - if (!this.hasMoreFriendRequests || pendingLoadMoreRequests || pendingFetchRequests) { + const requestUserId = getCurrentUserId() + const hasSameFetchPending = + pendingFetchRequests?.epoch === storeEpoch && pendingFetchRequests.userId === requestUserId + if (!this.hasMoreFriendRequests || hasSameFetchPending) { return } + if ( + pendingLoadMoreRequests?.epoch === storeEpoch && + pendingLoadMoreRequests.userId === requestUserId + ) { + return pendingLoadMoreRequests.promise + } const oldest = this.friendRequests[this.friendRequests.length - 1] if (!oldest) { return this.fetchFriendRequestList() } const requestEpoch = storeEpoch - pendingLoadMoreRequests = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE, oldest.id) + const promise = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE, oldest.id) .then((list) => { - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const items = (list || []).map(convertFriendRequest) @@ -350,11 +422,15 @@ export const useFriendStore = defineStore('imFriendStore', { this.saveFriendRequestList() }) .finally(() => { - if (requestEpoch === storeEpoch) { + if ( + pendingLoadMoreRequests?.epoch === requestEpoch && + pendingLoadMoreRequests.userId === requestUserId + ) { pendingLoadMoreRequests = null } }) - return pendingLoadMoreRequests + pendingLoadMoreRequests = { epoch: requestEpoch, userId: requestUserId, promise } + return promise }, /** 按 id 查申请记录;列表是按 id 倒序的小列表,O(n) find 即可,不再维护 Map 索引 */ @@ -365,34 +441,67 @@ export const useFriendStore = defineStore('imFriendStore', { /** 按 id 从后端单查并 upsert 到本地(dispatcher 兜底用,避免全量重拉);后端带越权过滤 */ async fetchFriendRequest(requestId: number) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() const data = await apiGetMyFriendRequest(requestId) if (!data) { return } // clear() 已切账号:旧请求的申请记录不能再写进新账号的 friendRequests - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } - const next = convertFriendRequest(data) - const existing = this.getFriendRequest(requestId) + this.upsertFriendRequest(convertFriendRequest(data)) + }, + + /** 合并单条好友申请:已有则按 id 覆盖;新记录按 id 倒序插入(比本地最旧还老则跳过,留给 loadMore 带回) */ + upsertFriendRequest(next: FriendRequest) { + void this.upsertFriendRequestForPull(next).catch((e) => + console.warn('[IM friendStore] 本地好友申请写入失败', e) + ) + }, + + /** 合并单条好友申请 */ + async upsertFriendRequestForPull(next: FriendRequest): Promise { + const existing = this.getFriendRequest(next.id) if (existing) { Object.assign(existing, next) - this.saveFriendRequest(existing) + await this.saveFriendRequestRecord(existing) return } // 比本地最旧 id 还老:不入列表,让 loadMore 自然带回,避免破坏 id 倒序 / 后续 loadMore 重复 push const oldest = this.friendRequests[this.friendRequests.length - 1] - if (oldest && requestId < oldest.id) { + if (oldest && next.id < oldest.id) { return } // 按 id 倒序找首个比自己小的位置插入;找不到则追加末尾 - const insertIndex = this.friendRequests.findIndex((request) => request.id < requestId) + const insertIndex = this.friendRequests.findIndex((request) => request.id < next.id) if (insertIndex < 0) { this.friendRequests.push(next) } else { this.friendRequests.splice(insertIndex, 0, next) } - this.saveFriendRequest(next) + await this.saveFriendRequestRecord(next) + }, + + /** 增量拉取好友申请变更并合并(重连 / 离线补偿);按 update_time + id 游标,已处理的按 handleResult 覆盖 */ + async pullFriendRequests() { + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() + const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId + await runIncrementalPull( + StorageKeys.settings.friendRequestPullCursor, + apiPullMyFriendRequestList, + async (records) => { + if (!isActive()) { + return false + } + await Promise.all( + records.map((vo) => this.upsertFriendRequestForPull(convertFriendRequest(vo))) + ) + return true + }, + isActive + ) }, // ==================== 好友关系操作 ==================== @@ -400,8 +509,9 @@ export const useFriendStore = defineStore('imFriendStore', { /** 删除好友(单向软删,本端置 DISABLE);clear=true 时级联清理本地相关数据(如私聊会话),并透传后端给多端同步 */ async deleteFriend(friendUserId: number, clear: boolean = true) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() await apiDeleteFriend(friendUserId, clear) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } this.removeFriend(friendUserId, clear) @@ -410,8 +520,9 @@ export const useFriendStore = defineStore('imFriendStore', { /** 切换免打扰:同步会话的 silent 字段,避免会话列表 silent 图标等 1210 推到才更新 */ async setFriendSilent(friendUserId: number, silent: boolean) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() await apiUpdateFriend({ friendUserId, silent }) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const friend = this.getFriend(friendUserId) @@ -426,8 +537,9 @@ export const useFriendStore = defineStore('imFriendStore', { /** 切换联系人置顶 */ async setFriendPinned(friendUserId: number, pinned: boolean) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() await apiUpdateFriend({ friendUserId, pinned }) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const friend = this.getFriend(friendUserId) @@ -440,8 +552,9 @@ export const useFriendStore = defineStore('imFriendStore', { /** 拉黑好友:本端乐观更新 + 调接口;后端 FRIEND_BLOCK 推到时由 dispatcher 兜底同步多端 */ async blockFriend(friendUserId: number) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() await apiBlockFriend(friendUserId) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const friend = this.getFriend(friendUserId) @@ -454,8 +567,9 @@ export const useFriendStore = defineStore('imFriendStore', { /** 移出黑名单:本端乐观更新 + 调接口;后端 FRIEND_UNBLOCK 推到时由 dispatcher 兜底同步多端 */ async unblockFriend(friendUserId: number) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() await apiUnblockFriend(friendUserId) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const friend = this.getFriend(friendUserId) @@ -468,10 +582,11 @@ export const useFriendStore = defineStore('imFriendStore', { /** 修改好友展示备注(仅自己可见) */ async setFriendDisplayName(friendUserId: number, displayName: string) { const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() const value = displayName.trim() // 后端 displayName 语义:null/undefined = 不改,"" = 清空,所以这里直接传 value(可能是空串) await apiUpdateFriend({ friendUserId, displayName: value }) - if (requestEpoch !== storeEpoch) { + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { return } const friend = this.getFriend(friendUserId) @@ -487,7 +602,16 @@ export const useFriendStore = defineStore('imFriendStore', { /** 本地合并 / 新增某个好友(WebSocket 事件 & 手动刷新都用) */ upsertFriend(friend: Friend) { - const index = this.friends.findIndex((existing) => existing.friendUserId === friend.friendUserId) + void this.upsertFriendForPull(friend).catch((e) => + console.warn('[IM friendStore] 本地好友写入失败', e) + ) + }, + + /** 本地合并 / 新增某个好友 */ + async upsertFriendForPull(friend: Friend): Promise { + const index = this.friends.findIndex( + (existing) => existing.friendUserId === friend.friendUserId + ) if (index >= 0) { this.friends[index] = { ...this.friends[index], @@ -507,7 +631,7 @@ export const useFriendStore = defineStore('imFriendStore', { avatar: friend.avatar, silent: friend.silent }) - this.saveFriend(merged) + await this.saveFriendRecord(merged) }, /** 本地标记删除(WebSocket FRIEND_DELETE 事件触发;clear=true 时级联清相关数据如私聊会话) */ diff --git a/src/views/im/home/store/groupRequestStore.ts b/src/views/im/home/store/groupRequestStore.ts index 5721c9624..25366632d 100644 --- a/src/views/im/home/store/groupRequestStore.ts +++ b/src/views/im/home/store/groupRequestStore.ts @@ -5,13 +5,22 @@ import { agreeGroupRequest as apiAgreeGroupRequest, getMyGroupRequest as apiGetMyGroupRequest, getUnhandledRequestList as apiGetUnhandledRequestList, + pullMyGroupRequestList as apiPullMyGroupRequestList, refuseGroupRequest as apiRefuseGroupRequest, type ImGroupRequestRespVO } from '@/api/im/group/request' import { ImGroupRequestHandleResult } from '@/views/im/utils/constants' -import { getDb } from '../../utils/db' +import { getDb, StorageKeys } from '../../utils/db' +import { runIncrementalPull } from '../../utils/pull' +import { getCurrentUserId } from '@/utils/auth' import type { GroupRequestDO } from '../types' +type PendingRequest = { epoch: number; userId: number; promise: Promise } + +/** clear() 时递增;旧账号 in-flight 的 pullGroupRequests 结果 resolve 后比对一致才写 store,防跨账号红点污染(与 friendStore 同口径) */ +let storeEpoch = 0 +let pendingUnhandledFetch: PendingRequest | null = null + /** * IM 加群申请 Store * @@ -86,34 +95,113 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', { .catch((e) => console.warn('[IM groupRequestStore] 本地加群申请缓存写入失败', e)) }, + /** 保存单条加群申请 */ + async saveGroupRequestRecord(request: ImGroupRequestRespVO): Promise { + await getDb().put('groupRequests', request) + }, + /** 保存单条加群申请 */ saveGroupRequest(request: ImGroupRequestRespVO): void { - void getDb() - .put('groupRequests', request) - .catch((e) => console.warn('[IM groupRequestStore] 本地加群申请写入失败', e)) + void this.saveGroupRequestRecord(request).catch((e) => + console.warn('[IM groupRequestStore] 本地加群申请写入失败', e) + ) }, /** 拉取我管理的所有群下未处理申请;进 IM 后 / 升级 admin 后 / WS 推送有冲突时调用 */ async fetchUnhandledGroupRequestList() { - const list = await apiGetUnhandledRequestList() - this.unhandledList = list || [] - this.loaded = true - this.saveGroupRequestList() + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() + if ( + pendingUnhandledFetch?.epoch === requestEpoch && + pendingUnhandledFetch.userId === requestUserId + ) { + return pendingUnhandledFetch.promise + } + const promise = (async () => { + const list = await apiGetUnhandledRequestList() + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { + return + } + this.unhandledList = list || [] + this.loaded = true + this.saveGroupRequestList() + })().finally(() => { + if ( + pendingUnhandledFetch?.epoch === requestEpoch && + pendingUnhandledFetch.userId === requestUserId + ) { + pendingUnhandledFetch = null + } + }) + pendingUnhandledFetch = { epoch: requestEpoch, userId: requestUserId, promise } + return promise }, /** * WS 收到 1503:拉最新内容并置顶 * * 同一对 group_id, user_id 复用记录时 requestId 不变但 applyContent / inviterUserId 会刷新,所以无条件 fetch + 排到头部 - * 校验 handleResult:HTTP 在途时若已收到 1505 / 1506,returnedRequest 可能已是已处理状态,不能再塞回未处理列表 */ async addGroupRequestById(requestId: number) { + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() const request = await apiGetMyGroupRequest(requestId) - if (!request || request.handleResult !== ImGroupRequestHandleResult.UNHANDLED) { + if (!request) { return } - this.unhandledList = [request, ...this.unhandledList.filter((r) => r.id !== requestId)] - this.saveGroupRequest(request) + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { + return + } + this.upsertGroupRequest(request) + }, + + /** + * 本地合并 / 新增单条加群申请(WS 推送 & 增量拉取共用) + * + * 未处理的按 id 去重后置顶;已处理的从未处理列表移除,避免补偿时把已同意 / 拒绝的记录塞回红点 + */ + upsertGroupRequest(request: ImGroupRequestRespVO) { + void this.upsertGroupRequestForPull(request).catch((e) => + console.warn('[IM groupRequestStore] 本地加群申请写入失败', e) + ) + }, + + /** 本地合并 / 新增单条加群申请 */ + async upsertGroupRequestForPull(request: ImGroupRequestRespVO): Promise { + if (request.handleResult !== ImGroupRequestHandleResult.UNHANDLED) { + await this.removeGroupRequestByIdForPull(request.id) + return + } + this.unhandledList = [request, ...this.unhandledList.filter((r) => r.id !== request.id)] + await this.saveGroupRequestRecord(request) + }, + + /** + * 增量拉取加群申请变更并合并;含已处理的按 handleResult 走移除(已处理 → 从红点列表剔除) + * + * 只做重连 / 后续离线补偿,不负责首登:首登红点走 fetchUnhandledGroupRequestList(服务端直接过滤未处理,语义更精准、启动更轻)。 + * 故首次重连时游标为空 = 一次性全量走一遍(已处理记录命中 removeGroupRequestById 为 no-op,红点不受影响),之后增量。 + */ + async pullGroupRequests() { + // 快照 epoch;账号在拉取途中切换(clear() → epoch++)时丢弃旧账号那几页结果,防跨账号红点污染 + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() + const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId + await runIncrementalPull( + StorageKeys.settings.groupRequestPullCursor, + apiPullMyGroupRequestList, + async (records) => { + if (!isActive()) { + return false + } + await Promise.all(records.map((vo) => this.upsertGroupRequestForPull(vo))) + return true + }, + isActive + ) + if (isActive()) { + this.loaded = true + } }, /** WS 收到 1505 / 1506 或本端处理完一条:按 requestId 从列表移除 */ @@ -124,6 +212,12 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', { .catch((e) => console.warn('[IM groupRequestStore] 本地加群申请删除失败', e)) }, + /** 删除单条加群申请 */ + async removeGroupRequestByIdForPull(requestId: number): Promise { + this.unhandledList = this.unhandledList.filter((r) => r.id !== requestId) + await getDb().delete('groupRequests', requestId) + }, + /** 同意申请;本端处理后立即从列表移除,避免被反复点击 */ async agreeGroupRequest(requestId: number) { await apiAgreeGroupRequest(requestId) @@ -140,6 +234,9 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', { clear() { this.unhandledList = [] this.loaded = false + // 账号切换:递增 epoch 废弃旧账号 in-flight 的 pullGroupRequests 结果,避免写进新账号红点列表 + storeEpoch++ + pendingUnhandledFetch = null } } }) diff --git a/src/views/im/home/store/groupStore.ts b/src/views/im/home/store/groupStore.ts index 6881cff88..5b09ae54a 100644 --- a/src/views/im/home/store/groupStore.ts +++ b/src/views/im/home/store/groupStore.ts @@ -27,6 +27,9 @@ import { getGroupDisplayName } from '../../utils/user' import { type GroupNotificationPayload } from '../../utils/message' import type { Group, GroupDO, GroupMember, GroupMemberDO, Message } from '../types' +/** clear() 时递增;旧账号 in-flight 的成员请求返回后比对一致才写 store */ +let storeEpoch = 0 + /** * fetchGroupMemberList 并发去重表:同 groupId 同时进的请求共用一个 Promise * @@ -47,7 +50,12 @@ const pendingSingleMemberKey = (userId: number, groupId: number, memberUserId: n /** 构建群 IndexedDB 记录 */ function buildGroupDO(group: Group): GroupDO { - const { members: _members, membersLoaded: _membersLoaded, ...record } = group + const { + members: _members, + membersLoaded: _membersLoaded, + membersExpired: _membersExpired, + ...record + } = group return record } @@ -57,6 +65,13 @@ function isSelfInPayloadMembers(payload: GroupNotificationPayload): boolean { return !!selfUserId && (payload.memberUserIds || []).includes(selfUserId) } +/** 刷新我管理的群申请红点 */ +function refreshUnhandledGroupRequests(): void { + useGroupRequestStore() + .fetchUnhandledGroupRequestList() + .catch(() => undefined) +} + /** * IM 群 Store * @@ -68,8 +83,8 @@ function isSelfInPayloadMembers(payload: GroupNotificationPayload): boolean { export const useGroupStore = defineStore('imGroupStore', { state: () => ({ groups: [] as Group[], - // 仅 fetchGroupList 成功后置位;loadGroupList(IDB)不置位,否则后台 SWR 刷新会被缓存命中跳过 - loaded: false + loaded: false, // 仅 fetchGroupList 成功后置位;loadGroupList(IDB)不置位,否则后台 SWR 刷新会被缓存命中跳过 + groupMembersExpired: false // 进入 IM / 重连后置位;IDB 里的成员桶延迟加载到内存时,也要按过期处理 }), getters: { @@ -119,13 +134,18 @@ export const useGroupStore = defineStore('imGroupStore', { }, /** 保存单个群 */ - saveGroup(group: Group | undefined): void { + async saveGroupRecord(group: Group | undefined): Promise { if (!group) { return } - void getDb() - .put('groups', buildGroupDO(group)) - .catch((e) => console.warn('[IM groupStore] 本地群写入失败', e)) + await getDb().put('groups', buildGroupDO(group)) + }, + + /** 保存单个群 */ + saveGroup(group: Group | undefined): void { + void this.saveGroupRecord(group).catch((e) => + console.warn('[IM groupStore] 本地群写入失败', e) + ) }, /** 从 IndexedDB 恢复指定群成员 */ @@ -137,7 +157,11 @@ export const useGroupStore = defineStore('imGroupStore', { return cachedGroup.members } try { - const cached = await getDb().getAllByIndex('groupMembers', 'groupId', groupId) + const cached = await getDb().getAllByIndex( + 'groupMembers', + 'groupId', + groupId + ) if (!cached || cached.length === 0) { return null } @@ -151,12 +175,14 @@ export const useGroupStore = defineStore('imGroupStore', { name: '', members: cached, memberCount: cached.length, - membersLoaded: true + membersLoaded: true, + membersExpired: this.groupMembersExpired }) } else { group.members = cached group.memberCount = cached.length group.membersLoaded = true + group.membersExpired = this.groupMembersExpired } return cached } catch (e) { @@ -193,8 +219,13 @@ export const useGroupStore = defineStore('imGroupStore', { if (this.loaded && !force) { return } + const requestEpoch = storeEpoch + const requestUserId = getCurrentUserId() // 拉取当前登录用户加入的所有群(不带成员;成员按需再走 fetchGroupMemberList) const list = await apiGetMyGroupList() + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { + return + } const fresh = (list || []).map(convertGroup) // 合并而非全量替换:silent / groupRemark / 成员缓存这些字段不在 ImGroupRespVO 里,得从旧 group 保留 const groupMap = new Map(this.groups.map((group) => [group.id, group])) @@ -209,7 +240,8 @@ export const useGroupStore = defineStore('imGroupStore', { memberCount: existing.memberCount ?? group.memberCount, silent: existing.silent ?? group.silent, groupRemark: existing.groupRemark, - membersLoaded: existing.membersLoaded + membersLoaded: existing.membersLoaded, + membersExpired: existing.membersExpired } }) this.loaded = true @@ -224,6 +256,24 @@ export const useGroupStore = defineStore('imGroupStore', { this.saveGroupList() }, + /** 失效全部群成员缓存 */ + markAllGroupMembersExpired() { + this.groupMembersExpired = true + for (const group of this.groups) { + if (group.membersLoaded) { + group.membersExpired = true + } + } + }, + + /** 失效指定群成员缓存 */ + markGroupMembersExpired(groupId: number) { + const group = this.getGroup(groupId) + if (group?.membersLoaded) { + group.membersExpired = true + } + }, + /** 单群刷新:用 /im/group/get 拉一份最新元数据再 upsert,常用于 GROUP_UPDATE 推送后或手动 reload */ async fetchGroupInfo(groupId: number) { try { @@ -241,7 +291,7 @@ export const useGroupStore = defineStore('imGroupStore', { fetchGroupMemberList(groupId: number, force = false): Promise { // in-memory "完整"加载过才命中——单成员补齐写入的 partial members 不在此返回(membersLoaded=false) const cached = this.getGroup(groupId) - if (cached && cached.members && cached.membersLoaded && !force) { + if (cached && cached.members && cached.membersLoaded && !cached.membersExpired && !force) { return Promise.resolve(cached.members) } // 未登录:不发起请求也不登记 in-flight,避免污染单飞表 @@ -249,6 +299,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (!requestUserId) { return Promise.resolve([]) } + const requestEpoch = storeEpoch // 同 (userId, groupId) 已经有正在飞的请求:直接复用,避免重复打接口 const key = pendingMemberKey(requestUserId, groupId) const inflight = pendingMemberFetches.get(key) @@ -258,6 +309,9 @@ export const useGroupStore = defineStore('imGroupStore', { const promise = (async () => { // 拉接口 + 单 pass 转换:同时捕获 me 的原始 VO,给下面回填 user-per-group 字段(silent / groupRemark)用 const list = await apiGetGroupMemberList(groupId) + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { + return [] + } let meRaw: ImGroupMemberRespVO | undefined const members = (list || []).map((member) => { if (member.userId === requestUserId) { @@ -282,12 +336,14 @@ export const useGroupStore = defineStore('imGroupStore', { memberCount: members.length, silent, groupRemark, - membersLoaded: true + membersLoaded: true, + membersExpired: false }) } else { group.members = members group.memberCount = members.length group.membersLoaded = true + group.membersExpired = false // silent / groupRemark 任一变化才同步到 conversation 和 IDB;groupRemark 变化要顺带刷会话名 if (group.silent !== silent || group.groupRemark !== groupRemark) { group.silent = silent @@ -332,6 +388,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (!requestUserId) { return Promise.resolve(null) } + const requestEpoch = storeEpoch // 同 (userId, groupId, memberUserId) 已经有正在飞的请求:直接复用 const key = pendingSingleMemberKey(requestUserId, groupId, memberUserId) const inflight = pendingSingleMemberFetches.get(key) @@ -343,6 +400,9 @@ export const useGroupStore = defineStore('imGroupStore', { if (!data) { return null } + if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) { + return null + } const member = convertGroupMember(data, groupId) // 把这一条 upsert 进 group.members 仅供 in-memory 渲染兜底;group 还没就位则用 placeholder // 注意:不写 IDB——成员桶语义是"全量",存"1 人桶"会污染下次冷启动的 loadGroupMemberList @@ -373,6 +433,13 @@ export const useGroupStore = defineStore('imGroupStore', { /** 按 id 插入或合并群(命中则浅合并保留旧字段,未命中则追加),同步把展示名 / 头像 / 免打扰推到对应会话 */ upsertGroup(group: Group) { + void this.upsertGroupAndSave(group).catch((e) => + console.warn('[IM groupStore] 本地群写入失败', e) + ) + }, + + /** 按 id 插入或合并群 */ + async upsertGroupAndSave(group: Group): Promise { const index = this.groups.findIndex((g) => g.id === group.id) if (index >= 0) { this.groups[index] = { ...this.groups[index], ...group } @@ -388,7 +455,7 @@ export const useGroupStore = defineStore('imGroupStore', { silent: merged.silent }) // 持久化到 IDB(fire-and-forget) - this.saveGroup(merged) + await this.saveGroupRecord(merged) }, /** 本地移除群缓存和群会话;群解散(GROUP_DEL)、退群、被踢都复用 */ @@ -567,16 +634,27 @@ export const useGroupStore = defineStore('imGroupStore', { this.applyGroupMemberNicknameUpdateNotification(groupId, payload) break case ImMessageType.GROUP_ADMIN_ADD: - this.updateGroupMemberRoleList(groupId, payload.memberUserIds || [], ImGroupMemberRole.ADMIN) + this.updateGroupMemberRoleList( + groupId, + payload.memberUserIds || [], + ImGroupMemberRole.ADMIN + ) + this.markGroupMembersExpired(groupId) // 自己被加为管理员,原本看不到的群下未处理申请现在变可见,重新拉一次 unhandledList if (isSelfInPayloadMembers(payload)) { - useGroupRequestStore() - .fetchUnhandledGroupRequestList() - .catch(() => undefined) + refreshUnhandledGroupRequests() } break case ImMessageType.GROUP_ADMIN_REMOVE: - this.updateGroupMemberRoleList(groupId, payload.memberUserIds || [], ImGroupMemberRole.NORMAL) + this.updateGroupMemberRoleList( + groupId, + payload.memberUserIds || [], + ImGroupMemberRole.NORMAL + ) + this.markGroupMembersExpired(groupId) + if (isSelfInPayloadMembers(payload)) { + refreshUnhandledGroupRequests() + } break case ImMessageType.GROUP_OWNER_TRANSFER: this.applyGroupOwnerTransferNotification(groupId, payload) @@ -649,6 +727,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (isSelfInPayloadMembers(payload) && !this.getGroup(groupId)) { await this.fetchGroupInfo(groupId) } + this.markGroupMembersExpired(groupId) this.fetchGroupMemberList(groupId, true).catch(() => undefined) }, @@ -659,6 +738,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (selfUserId && payload.entrantUserId === selfUserId && !this.getGroup(groupId)) { await this.fetchGroupInfo(groupId) } + this.markGroupMembersExpired(groupId) this.fetchGroupMemberList(groupId, true).catch(() => undefined) }, @@ -670,6 +750,7 @@ export const useGroupStore = defineStore('imGroupStore', { this.removeGroup(groupId) } else if (payload.operatorUserId) { this.removeLocalGroupMemberList(groupId, [payload.operatorUserId]) + this.markGroupMembersExpired(groupId) } }, @@ -684,6 +765,7 @@ export const useGroupStore = defineStore('imGroupStore', { this.removeGroup(groupId) } else if (memberIds.length) { this.removeLocalGroupMemberList(groupId, memberIds) + this.markGroupMembersExpired(groupId) } }, @@ -695,6 +777,7 @@ export const useGroupStore = defineStore('imGroupStore', { payload.operatorUserId, payload.displayUserName ?? '' ) + this.markGroupMembersExpired(groupId) } }, @@ -702,13 +785,14 @@ export const useGroupStore = defineStore('imGroupStore', { applyGroupOwnerTransferNotification(groupId: number, payload: GroupNotificationPayload) { if (payload.operatorUserId && payload.newOwnerUserId) { this.transferGroupOwner(groupId, payload.operatorUserId, payload.newOwnerUserId) + this.markGroupMembersExpired(groupId) } // 自己接管群主:原本看不到的群下未处理申请现在变可见,重新拉一次 unhandledList const selfUserId = getCurrentUserId() if (selfUserId && payload.newOwnerUserId === selfUserId) { - useGroupRequestStore() - .fetchUnhandledGroupRequestList() - .catch(() => undefined) + refreshUnhandledGroupRequests() + } else if (selfUserId && payload.operatorUserId === selfUserId) { + refreshUnhandledGroupRequests() } }, @@ -770,6 +854,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (member && payload.muteEndTime) { member.muteEndTime = payload.muteEndTime this.saveGroupMemberList(groupId) + this.markGroupMembersExpired(groupId) } }, @@ -780,6 +865,7 @@ export const useGroupStore = defineStore('imGroupStore', { if (member) { member.muteEndTime = undefined this.saveGroupMemberList(groupId) + this.markGroupMembersExpired(groupId) } }, @@ -787,6 +873,9 @@ export const useGroupStore = defineStore('imGroupStore', { clear() { this.groups = [] this.loaded = false + this.groupMembersExpired = false + // 账号切换:递增 epoch 废弃旧账号 in-flight 的成员请求 + storeEpoch++ // 单飞表跟 in-memory state 一起重置;旧账号 in-flight 的请求 finally 也会自己 delete key,提前清空只是更干脆 pendingMemberFetches.clear() pendingSingleMemberFetches.clear() diff --git a/src/views/im/home/store/messageStore.ts b/src/views/im/home/store/messageStore.ts index 90597c33e..7f6555f77 100644 --- a/src/views/im/home/store/messageStore.ts +++ b/src/views/im/home/store/messageStore.ts @@ -30,6 +30,7 @@ import { getCurrentUserId } from '@/utils/auth' import { tryGetSenderDisplayName } from '../../utils/user' import { useGroupStore } from './groupStore' import { useConversationStore } from './conversationStore' +import type { ImConversationReadRespVO } from '@/api/im/conversation/read' import type { Conversation, Message, MessageDO } from '../types' const MESSAGE_CACHE_RECENT_CONVERSATION_LIMIT = 5 @@ -136,9 +137,10 @@ function deriveLastSenderDisplayName( if (conversation.type === ImConversationType.GROUP) { const groupStore = useGroupStore() const group = groupStore.getGroup(conversation.targetId) - const fetchPromise = group?.membersLoaded - ? groupStore.fetchGroupMember(conversation.targetId, senderId) - : groupStore.fetchGroupMemberList(conversation.targetId) + const fetchPromise = + group?.membersLoaded && !group.membersExpired + ? groupStore.fetchGroupMember(conversation.targetId, senderId) + : groupStore.fetchGroupMemberList(conversation.targetId) fetchPromise.catch((e) => console.warn( '[IM messageStore] 兜底拉群成员失败', @@ -233,6 +235,24 @@ function isSameMessage(left: Message, right: Message): boolean { return !!left.clientMessageId && left.clientMessageId === right.clientMessageId } +/** 获取对方普通消息最大编号 */ +function getMaxIncomingNormalMessageId( + messages: Array> +): number { + return messages.reduce((maxMessageId, message) => { + if ( + message.id && + !message.selfSend && + isNormalMessage(message.type) && + message.status !== ImMessageStatus.RECALL && + message.id > maxMessageId + ) { + return message.id + } + return maxMessageId + }, 0) +} + export const useMessageStore = defineStore('imMessageStore', { state: () => ({ messagesByConversation: {} as Record, @@ -390,8 +410,8 @@ export const useMessageStore = defineStore('imMessageStore', { /** 保存消息游标 */ async saveMessageCursor(conversationType: number, messageId?: number, tx?: DbTransaction) { - this.updateMessageCursor(conversationType, messageId) await setMessageMaxId(conversationType, messageId, tx) + this.updateMessageCursor(conversationType, messageId) }, /** 应用撤回到内存 */ @@ -501,7 +521,6 @@ export const useMessageStore = defineStore('imMessageStore', { recomputeConversationLast(conversation, messages) syncConversationAtFlags(conversation, message) } - this.updateMessageCursor(conversationInfo.type, message.id) addChanged(conversation, messages[existingIndex], { mergeClientRecord: hasServerClientMessageId }) @@ -535,29 +554,33 @@ export const useMessageStore = defineStore('imMessageStore', { } } messages.splice(insertIndex, 0, message) - this.updateMessageCursor(conversationInfo.type, message.id) addChanged(conversation, message, { mergeClientRecord: hasServerClientMessageId && !!message.id }) } - // 2. 更新内存游标 - this.updateMessageCursor(conversationType, maxMessageId) - // 3. 单事务写入消息、会话摘要和游标 - await getDb() - .transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => { - // 3.1 写入本批变更消息 + // 2. 单事务写入消息、会话摘要和游标 + await getDb().transaction( + ['messages', 'conversations', 'settings'], + 'readwrite', + async (tx) => { + // 2.1 写入本批变更消息 for (const item of persistedMessages.values()) { await this.saveMessageRecord(item.message, item.conversationType, tx, { mergeClientRecord: item.mergeClientRecord }) } - // 3.2 写入本批变更会话 + // 2.2 写入本批变更会话 await conversationStore.saveConversationRecord([...changedConversations.values()], tx) - // 3.3 写入本批游标 + // 2.3 写入本批游标 await setMessageMaxId(conversationType, maxMessageId, tx) - }) - .catch((e) => console.error('[IM messageStore] 批量消息写入失败', e)) + } + ) + // 3. 持久化成功后推进内存游标 + this.updateMessageCursor(conversationType, maxMessageId) + for (const item of persistedMessages.values()) { + this.updateMessageCursor(item.conversationType, item.message.id) + } }, /** 插入消息 */ @@ -565,7 +588,7 @@ export const useMessageStore = defineStore('imMessageStore', { conversationInfo: MessageConversationInfo, messageInfo: Message, options?: { saveMaxId?: boolean } - ) { + ): Promise { const conversationStore = useConversationStore() const hasIncomingClientMessageId = !!messageInfo.clientMessageId const message = ensureClientMessageId(messageInfo) @@ -589,8 +612,7 @@ export const useMessageStore = defineStore('imMessageStore', { recomputeConversationLast(conversation, messages) syncConversationAtFlags(conversation, message) } - this.updateMessageCursor(conversationInfo.type, message.id) - void getDb() + return getDb() .transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => { await this.saveMessageRecord(messages[existingIndex], conversationInfo.type, tx, { mergeClientRecord: hasIncomingClientMessageId @@ -600,8 +622,13 @@ export const useMessageStore = defineStore('imMessageStore', { await setMessageMaxId(conversationInfo.type, message.id, tx) } }) - .catch((e) => console.error('[IM messageStore] 消息写入失败', e)) - return + .catch((e) => { + console.error('[IM messageStore] 消息写入失败', e) + throw e + }) + .then(() => { + this.updateMessageCursor(conversationInfo.type, message.id) + }) } // 4. 新消息更新会话摘要和未读状态 @@ -632,9 +659,8 @@ export const useMessageStore = defineStore('imMessageStore', { } } messages.splice(insertIndex, 0, message) - this.updateMessageCursor(conversationInfo.type, message.id) // 6. 单事务写入消息、会话摘要和游标 - void getDb() + return getDb() .transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => { await this.saveMessageRecord(message, conversationInfo.type, tx, { mergeClientRecord: hasIncomingClientMessageId && !!message.id @@ -644,7 +670,13 @@ export const useMessageStore = defineStore('imMessageStore', { await setMessageMaxId(conversationInfo.type, message.id, tx) } }) - .catch((e) => console.error('[IM messageStore] 消息写入失败', e)) + .catch((e) => { + console.error('[IM messageStore] 消息写入失败', e) + throw e + }) + .then(() => { + this.updateMessageCursor(conversationInfo.type, message.id) + }) }, /** ack 合并 */ @@ -696,7 +728,6 @@ export const useMessageStore = defineStore('imMessageStore', { if (messages[messages.length - 1] === message) { recomputeConversationLast(conversation, messages) } - this.updateMessageCursor(conversationType, message.id) // 3. 单事务写入消息、会话摘要和游标 await getDb() .transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => { @@ -706,7 +737,11 @@ export const useMessageStore = defineStore('imMessageStore', { await conversationStore.saveConversationRecord(conversation, tx) await setMessageMaxId(conversationType, message.id, tx) }) - .catch((e) => console.error('[IM messageStore] ack 写入失败', e)) + .catch((e) => { + console.error('[IM messageStore] ack 写入失败', e) + throw e + }) + this.updateMessageCursor(conversationType, message.id) } finally { // 4. 清理合并标记 message._ackMerging = false @@ -743,7 +778,11 @@ export const useMessageStore = defineStore('imMessageStore', { }, /** 撤回消息 */ - recallMessage(conversationType: number, targetId: number, recallSignalContent: string) { + async recallMessage( + conversationType: number, + targetId: number, + recallSignalContent: string + ): Promise { const conversationStore = useConversationStore() const changed = this.applyRecallMessageInMemory( conversationType, @@ -753,10 +792,15 @@ export const useMessageStore = defineStore('imMessageStore', { if (!changed) { return } - this.saveMessageRecord(changed.message, conversationType).catch((e) => - console.error('[IM messageStore] 撤回消息写入失败', e) - ) - conversationStore.saveConversation(changed.conversation) + await getDb() + .transaction(['messages', 'conversations'], 'readwrite', async (tx) => { + await this.saveMessageRecord(changed.message, conversationType, tx) + await conversationStore.saveConversationRecord(changed.conversation, tx) + }) + .catch((e) => { + console.error('[IM messageStore] 撤回消息写入失败', e) + throw e + }) }, /** 应用已读回执 */ @@ -809,6 +853,127 @@ export const useMessageStore = defineStore('imMessageStore', { .catch((e) => console.warn('[IM messageStore] 回执写入失败', e)) }, + /** 应用会话读位置补偿 */ + async applyConversationReadList( + records: ImConversationReadRespVO[], + isActive?: () => boolean + ): Promise { + if (records.length === 0) { + return + } + const conversationStore = useConversationStore() + const changedConversations = new Map() + const changedMessages = new Map() + const db = getDb() + + // 1. 按读位置更新会话未读和频道已读态 + for (const record of records) { + if (isActive && !isActive()) { + return + } + if (!record.conversationType || !record.targetId || !record.messageId) { + continue + } + const clientConversationId = getClientConversationId( + record.conversationType, + record.targetId + ) + let storedMessages: MessageDO[] | undefined + const getStoredMessages = async () => { + if (!storedMessages) { + storedMessages = await db.getAllByIndex( + 'messages', + 'clientConversationId', + clientConversationId + ) + } + return storedMessages + } + const conversation = conversationStore.getConversation( + record.conversationType, + record.targetId + ) + if ( + conversation && + (conversation.unreadCount > 0 || conversation.atMe || conversation.atAll) + ) { + const memoryMessages = this.messagesByConversation[clientConversationId] + let readCovered = + !!conversation.lastMessageId && conversation.lastMessageId <= record.messageId + const latestMessageLoaded = + !!conversation.lastMessageId && + memoryMessages?.some((message) => message.id === conversation.lastMessageId) + if (!readCovered && latestMessageLoaded && memoryMessages) { + const maxIncomingMessageId = getMaxIncomingNormalMessageId(memoryMessages) + readCovered = maxIncomingMessageId > 0 && maxIncomingMessageId <= record.messageId + } + if (!readCovered && !latestMessageLoaded) { + const storedMessages = await getStoredMessages() + const latestMessageStored = + !!conversation.lastMessageId && + storedMessages.some((message) => message.id === conversation.lastMessageId) + if (latestMessageStored) { + const storedMaxIncomingMessageId = getMaxIncomingNormalMessageId(storedMessages) + readCovered = + storedMaxIncomingMessageId > 0 && storedMaxIncomingMessageId <= record.messageId + } + } + if (readCovered) { + conversation.unreadCount = 0 + conversation.atMe = false + conversation.atAll = false + changedConversations.set(clientConversationId, conversation) + } + } + if (record.conversationType !== ImConversationType.CHANNEL) { + continue + } + const memoryMessages = this.messagesByConversation[clientConversationId] || [] + for (const message of memoryMessages) { + if ( + message.id && + message.id <= record.messageId && + message.receiptStatus !== ImMessageReceiptStatus.DONE + ) { + message.receiptStatus = ImMessageReceiptStatus.DONE + } + } + for (const message of await getStoredMessages()) { + if ( + message.id && + message.id <= record.messageId && + message.receiptStatus !== ImMessageReceiptStatus.DONE + ) { + message.receiptStatus = ImMessageReceiptStatus.DONE + changedMessages.set(message.messageKey, message) + } + } + } + + // 2. 持久化本轮变更 + if (changedConversations.size === 0 && changedMessages.size === 0) { + return + } + if (isActive && !isActive()) { + return + } + const stores: Array<'conversations' | 'messages'> = [] + if (changedConversations.size > 0) { + stores.push('conversations') + } + if (changedMessages.size > 0) { + stores.push('messages') + } + await db.transaction(stores, 'readwrite', async (tx) => { + if (changedConversations.size > 0) { + await conversationStore.saveConversationRecord([...changedConversations.values()], tx) + } + for (const message of changedMessages.values()) { + await db.put('messages', message, tx) + } + }) + }, + /** 前置历史消息 */ prependMessageList(conversationType: number, targetId: number, earlierMessages: Message[]) { if (earlierMessages.length === 0) { diff --git a/src/views/im/home/store/websocketStore.ts b/src/views/im/home/store/websocketStore.ts index 513bf1daa..ea7cb98ee 100644 --- a/src/views/im/home/store/websocketStore.ts +++ b/src/views/im/home/store/websocketStore.ts @@ -69,6 +69,11 @@ const isFriendDeleteWithClear = (frame: ImPrivateMessageDTO): boolean => { const RTC_LIVEKIT_PROTOCOLS = new Set(['ws:', 'wss:', 'http:', 'https:']) const RTC_MEDIA_TYPES = new Set(Object.values(ImRtcCallMediaType)) +/** 忽略普通实时帧持久化失败 */ +function ignoreRealtimePersistError(promise: Promise): void { + void promise.catch(() => undefined) +} + /** 校验 LiveKit 连接地址 */ function isValidLiveKitUrl(url?: string): boolean { if (!url) { @@ -302,27 +307,28 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { this.handleChannelRead(websocketMessage) return } - this.handleChannelMessage(websocketMessage) + ignoreRealtimePersistError(this.handleChannelMessage(websocketMessage)) }, /** 频道 READ:自己其它终端在某频道里标为已读,本端同步清零该频道未读 */ handleChannelRead(websocketMessage: ImChannelMessageRespVO) { - const conversationStore = useConversationStore() - const conversation = conversationStore.getConversation( - ImConversationType.CHANNEL, - websocketMessage.channelId - ) - if (conversation) { - conversation.unreadCount = 0 - conversationStore.saveConversation(conversation) - } + void useMessageStore() + .applyConversationReadList([ + { + id: websocketMessage.id, + conversationType: ImConversationType.CHANNEL, + targetId: websocketMessage.channelId, + messageId: websocketMessage.id + } + ]) + .catch((e) => console.warn('[IM WS] 频道已读同步失败', e)) }, /** * 频道消息实时入会话;频道消息单向 + 无状态机,直接 insertMessage 即可 * pull 与 WS 拿到同一条 id 时,messageStore.insertMessage 内部按 id 去重,不会重复 */ - handleChannelMessage(websocketMessage: ImChannelMessageRespVO) { + handleChannelMessage(websocketMessage: ImChannelMessageRespVO): Promise { const conversationStore = useConversationStore() const messageStore = useMessageStore() // 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱 @@ -331,7 +337,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { conversationType: ImConversationType.CHANNEL, payload: websocketMessage }) - return + return Promise.resolve() } const sendTimeMs = typeof websocketMessage.sendTime === 'number' @@ -345,7 +351,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { conversationStore.activeConversation?.type === ImConversationType.CHANNEL && conversationStore.activeConversation?.targetId === websocketMessage.channelId // 频道单向订阅,receiptStatus 表达「我是否已读这条」:会话打开即已读 DONE,否则 PENDING(与 pull 口径一致) - messageStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), { + const persistPromise = messageStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), { id: websocketMessage.id, clientMessageId: '', type: websocketMessage.type, @@ -368,6 +374,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { // 非当前会话且未免打扰:响一下提示音 playAudioTip() } + return persistPromise }, /** content 既可能已是对象也可能是 JSON 字符串(后端用 Map 序列化下发) */ @@ -410,7 +417,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { case ImMessageType.RTC_CALL_END: // 入库 + 关闭通话窗 + 渲染聊天 tip(私聊场景) this.handleRtcCallEnd(websocketMessage) - this.handlePrivateMessage(websocketMessage) + ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage)) break default: if (isFriendNotification(websocketMessage.type)) { @@ -422,14 +429,14 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { isFriendChatTip(websocketMessage.type) && !isFriendDeleteWithClear(websocketMessage) ) { - this.handlePrivateMessage(websocketMessage) + ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage)) } } else if (isGroupRequestNotification(websocketMessage.type)) { // 加群申请通知(1503 / 1505 / 1506)走私聊通道,与好友通知同段位但分开 dispatcher this.handleGroupRequestNotification(websocketMessage) } else { // TEXT / IMAGE / FILE / VOICE / VIDEO 等普通消息 - this.handlePrivateMessage(websocketMessage) + ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage)) } } } catch (e) { @@ -457,16 +464,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { break case ImMessageType.RTC_CALL_START: // 入库 + 渲染聊天 tip;胶囊条状态走 1602/1603,本帧不动 rtcStore,避免与首次填充竞争 - this.handleGroupMessage(websocketMessage) + ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage)) break case ImMessageType.RTC_CALL_END: // 入库 + 移除胶囊条 + 关闭通话窗(如果当前在该群通话内) this.handleRtcCallEnd(websocketMessage) - this.handleGroupMessage(websocketMessage) + ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage)) break default: // TEXT / IMAGE / FILE / VOICE / VIDEO + GROUP_* 群广播事件 - this.handleGroupMessage(websocketMessage) + ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage)) } } catch (e) { // 单条帧的处理异常不应阻断后续帧;打印完整 websocketMessage 便于排查 @@ -484,7 +491,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { * 4. 构造前端 Message,插入到对应私聊会话 * 5. 当前会话激活时自动上报已读;否则非免打扰响提示音 */ - handlePrivateMessage(websocketMessage: ImPrivateMessageDTO) { + handlePrivateMessage(websocketMessage: ImPrivateMessageDTO): Promise { const conversationStore = useConversationStore() const friendStore = useFriendStore() const currentUserId = getCurrentUserId() @@ -497,7 +504,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { websocketMessage.receiverId !== currentUserId ) { console.warn('[IM WS] 丢弃不属于当前用户的私聊帧', websocketMessage) - return + return Promise.resolve() } // 1. 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱 @@ -506,7 +513,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { conversationType: ImConversationType.PRIVATE, payload: websocketMessage }) - return + return Promise.resolve() } // 2. selfSend / peerId:自己发的消息属于「发给 receiverId 的会话」,别人发的属于「发送者的会话」 @@ -523,17 +530,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { // 3. 后端撤回:下发一条 RECALL 消息,content 为 `{"messageId": xxx}`(对齐 ImMessageTypeEnum.RECALL → RecallMessage) // 这里拦截下来改走 recallMessage(把原消息更新为 RECALL 态),不让它作为新消息进列表 if (websocketMessage.type === ImMessageType.RECALL) { - useMessageStore().recallMessage( + return useMessageStore().recallMessage( ImConversationType.PRIVATE, peerId, websocketMessage.content ) - return } // 4. 后端 DTO → 前端 Message:发送人名渲染时实时算,不写入消息字段 const message = convertPrivateMessage(websocketMessage, currentUserId) - useMessageStore().insertMessage( + const persistPromise = useMessageStore().insertMessage( { type: ImConversationType.PRIVATE, targetId: peerId, @@ -564,6 +570,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { playAudioTip() } } + return persistPromise }, /** 私聊 READ 事件:自己的其它终端在对方会话里标为已读,本端同步清零未读;私聊已读关闭时兜底忽略 */ @@ -571,11 +578,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { if (!MESSAGE_PRIVATE_READ_ENABLED) { return } - const conversationStore = useConversationStore() - conversationStore.markConversationRead( - ImConversationType.PRIVATE, - websocketMessage.receiverId - ) + void useMessageStore() + .applyConversationReadList([ + { + id: websocketMessage.id, + conversationType: ImConversationType.PRIVATE, + targetId: websocketMessage.receiverId, + messageId: websocketMessage.id + } + ]) + .catch((e) => console.warn('[IM WS] 私聊已读同步失败', e)) }, /** @@ -607,7 +619,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { * 4. 构造 Message + at 字段,插入到对应群聊会话(发送人名渲染时实时算) * 5. 当前会话激活时自动上报已读(带 lastMessageId);否则非免打扰响提示音 */ - handleGroupMessage(websocketMessage: ImGroupMessageDTO) { + handleGroupMessage(websocketMessage: ImGroupMessageDTO): Promise { const conversationStore = useConversationStore() const groupStore = useGroupStore() const currentUserId = getCurrentUserId() @@ -624,7 +636,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { !receiverUserIds.includes(currentUserId) ) { console.warn('[IM WS] 丢弃不属于当前用户的定向群消息', websocketMessage) - return + return Promise.resolve() } // 1. 离线加载期缓冲(与私聊对称) @@ -633,7 +645,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { conversationType: ImConversationType.GROUP, payload: websocketMessage }) - return + return Promise.resolve() } // 2. 未知群时自动拉群详情 + 成员(被拉入群但还没收到 GROUP_CREATE 时的兜底) @@ -645,17 +657,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { // 3. 后端撤回:下发一条 RECALL 消息,content 为 `{"messageId": xxx}` // 这里拦截下来改走 recallMessage(把原消息更新为 RECALL 态) if (websocketMessage.type === ImMessageType.RECALL) { - useMessageStore().recallMessage( + return useMessageStore().recallMessage( ImConversationType.GROUP, websocketMessage.groupId, websocketMessage.content ) - return } // 4. 后端 DTO → 前端 Message:发送人名渲染时实时算,不写入消息字段 const message = convertGroupMessage(websocketMessage, currentUserId) - useMessageStore().insertMessage( + const persistPromise = useMessageStore().insertMessage( { type: ImConversationType.GROUP, targetId: websocketMessage.groupId, @@ -691,6 +702,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { playAudioTip() } } + return persistPromise }, // ==================== 群聊已读 / 回执 ==================== @@ -700,8 +712,17 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', { if (!MESSAGE_GROUP_READ_ENABLED) { return } - const conversationStore = useConversationStore() - conversationStore.markConversationRead(ImConversationType.GROUP, websocketMessage.groupId) + const readMessageId = websocketMessage.readId || websocketMessage.id + void useMessageStore() + .applyConversationReadList([ + { + id: readMessageId, + conversationType: ImConversationType.GROUP, + targetId: websocketMessage.groupId, + messageId: readMessageId + } + ]) + .catch((e) => console.warn('[IM WS] 群聊已读同步失败', e)) }, /** 群聊 RECEIPT:更新某条群消息的 readCount / receiptStatus;群已读关闭时兜底忽略 */ diff --git a/src/views/im/home/types/index.ts b/src/views/im/home/types/index.ts index 8c990da9d..6ed7c3f58 100644 --- a/src/views/im/home/types/index.ts +++ b/src/views/im/home/types/index.ts @@ -149,10 +149,11 @@ export interface Group { groupRemark?: string // 群备注。从当前用户的 GroupMember 回填(当前用户对该群的自定义名) members?: GroupMember[] // 群成员缓存(按需懒加载) membersLoaded?: boolean // members 是否"完整加载"——只有整群 loadGroupMemberList / fetchGroupMemberList 命中时为 true;fetchGroupMember 单成员补齐不置位,避免 fetchGroupMemberList(force=false) 命中缓存时误判整群已加载 + membersExpired?: boolean // 群成员缓存是否已过期;重连 / 重新进入 IM 后只标记不删除,下次进入群会话再刷新 memberCount?: number // 成员总数 } -export type GroupDO = Omit +export type GroupDO = Omit // 群成员实体(前端内部结构) export interface GroupMember { diff --git a/src/views/im/utils/db.ts b/src/views/im/utils/db.ts index a29195291..586619919 100644 --- a/src/views/im/utils/db.ts +++ b/src/views/im/utils/db.ts @@ -35,7 +35,17 @@ export const StorageKeys = { /** 频道消息拉取游标 */ channelMessageMaxId: 'channelMessageMaxId', /** 最近转发会话 key 列表 */ - recentForwardConversationKeys: 'recentForwardConversationKeys' + recentForwardConversationKeys: 'recentForwardConversationKeys', + // 状态事件补偿增量拉取游标:与上面消息 maxId 游标共用同一 settings keyspace,统一登记在此避免撞 key; + // 走 update_time + id 复合游标(非单条 maxId),故用 PullCursor 后缀区分语义 + /** 好友关系增量拉取游标 */ + friendPullCursor: 'friendPullCursor', + /** 好友申请增量拉取游标 */ + friendRequestPullCursor: 'friendRequestPullCursor', + /** 加群申请增量拉取游标 */ + groupRequestPullCursor: 'groupRequestPullCursor', + /** 会话读位置增量拉取游标 */ + conversationReadPullCursor: 'conversationReadPullCursor' } } as const @@ -208,9 +218,7 @@ class DbClient { if (tx) { return requestToPromise(tx.objectStore(storeName).getAll()) } - return this.transaction([storeName], 'readonly', (tx) => - this.getAll(storeName, tx) - ) + return this.transaction([storeName], 'readonly', (tx) => this.getAll(storeName, tx)) } /** 按唯一索引获取单条记录 */ @@ -258,9 +266,7 @@ class DbClient { await requestToPromise(tx.objectStore(storeName).delete(key)) return } - await this.transaction([storeName], 'readwrite', (tx) => - this.delete(storeName, key, tx) - ) + await this.transaction([storeName], 'readwrite', (tx) => this.delete(storeName, key, tx)) } /** 清空 store 记录 */ diff --git a/src/views/im/utils/pull.ts b/src/views/im/utils/pull.ts new file mode 100644 index 000000000..f64d7432c --- /dev/null +++ b/src/views/im/utils/pull.ts @@ -0,0 +1,152 @@ +import { getDb } from './db' + +/** + * IM 状态事件补偿(增量拉取)通用编排 + * + * 各业务模块(好友、群、申请、读位置)共用一套 update_time + id 正向游标: + * 从持久化游标出发循环拉取变更并合并进本地 store,直到某页不满(没有更多), + * 用于 WebSocket 漏推后的兜底补偿(进入 IM、断线重连时触发) + */ + +/** 增量拉取游标:上次拉到的位置 */ +export interface PullCursor { + lastUpdateTime?: number + lastId?: number +} + +/** 可作为游标的拉取记录:服务端按 update_time + id 返回,客户端取最后一条推进游标 */ +interface PullRecord { + id: number + updateTime?: number +} + +/** 单次拉取条数(与后端 limit 上限对齐) */ +const PULL_PAGE_SIZE = 100 +/** 单轮最多翻页数,兜底防御异常游标导致的死循环 */ +const PULL_MAX_PAGES = 100 +/** 状态事件拉取回扫窗口,覆盖同秒内旧行更新和客户端 / 服务端时钟精度差 */ +const PULL_OVERLAP_MS = 5000 +/** 消息类 minId 拉取的单轮翻页上限,兜底防御异常游标死翻;消息量可能远大于状态事件,放宽到 1000 */ +const MIN_ID_PULL_MAX_PAGES = 1000 + +/** 读取某模块的拉取游标;无则返回空游标(首次拉全量) */ +export async function getPullCursor(key: string): Promise { + return (await getDb().getSetting(key)) ?? {} +} + +/** + * 通用增量拉取:从持久化游标出发,循环拉取并应用变更,直到某页不满 + * + * @param cursorKey 游标在 settings 里的 key(每个模块一份) + * @param fetchPage 按游标拉一页(调对应 pull 接口,返回 VO 列表) + * @param apply 把一页变更合并进本地 store;返回 false 表示本页未完全落地(如账号已切、依赖资源补拉失败), + * 此时不推进游标并终止本轮,避免游标越过未落地的记录、导致后续增量永久漏拉。可返回 Promise + */ +export async function runIncrementalPull( + cursorKey: string, + fetchPage: (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => Promise, + apply: (records: T[]) => boolean | Promise, + isActive?: () => boolean +): Promise { + const storedCursor = await getPullCursor(cursorKey) + const highWater = { ...storedCursor } + let cursor = + storedCursor.lastUpdateTime != null + ? { lastUpdateTime: Math.max(0, storedCursor.lastUpdateTime - PULL_OVERLAP_MS), lastId: 0 } + : {} + for (let page = 0; page < PULL_MAX_PAGES; page++) { + if (isActive && !isActive()) { + return + } + const list = await fetchPage({ + lastUpdateTime: cursor.lastUpdateTime, + lastId: cursor.lastId, + limit: PULL_PAGE_SIZE + }) + if (isActive && !isActive()) { + return + } + if (list.length) { + // apply 未完全落地(返回 false)时直接终止:游标只能跟着已落地的数据走,否则会跳过本页记录 + if ((await apply(list)) === false) { + return + } + if (isActive && !isActive()) { + return + } + // 推进游标到本页最后一条并持久化:下次从这里接着拉 + const last = list[list.length - 1] + if (last.updateTime == null) { + return + } + cursor = { lastUpdateTime: last.updateTime, lastId: last.id } + if ( + highWater.lastUpdateTime == null || + cursor.lastUpdateTime > highWater.lastUpdateTime || + (cursor.lastUpdateTime === highWater.lastUpdateTime && + cursor.lastId > (highWater.lastId ?? 0)) + ) { + highWater.lastUpdateTime = cursor.lastUpdateTime + highWater.lastId = cursor.lastId + await getDb().setSetting(cursorKey, highWater) + } + } + // 不满一页 = 没有更多变更 + if (list.length < PULL_PAGE_SIZE) { + return + } + } + console.warn(`[IM pull] ${cursorKey} 达到单轮翻页上限,提前结束本轮补偿`) +} + +/** + * 消息类增量拉取通用编排:按单调 minId 游标循环翻页,直到空页或游标不再前进 + * + * 与 runIncrementalPull 的区别:消息游标是单调消息 id(非 update_time + id 复合游标),且 messageMaxId 的持久化 + * 跟随消息入库在 applyPage 内完成(messageStore),故本函数只负责「循环翻页机制」,不碰 settings,也不掺消息业务。 + * + * @param initialMinId 起始游标:上次入库的最大消息 id + * @param pageSize 单页条数 + * @param fetchPage 按 minId 拉一页 + * @param applyPage 处理本页(建会话 / 分发 / 入库 + 推进 messageMaxId);返回 false 表示本页未落地,停止且不推进游标 + * @param isActive 每次 await 前后自检:取消 / 切账号时返回 false,丢弃本批不入库、不再翻页 + * @param maxPages 单轮翻页上限 + */ +export async function runMinIdPull(options: { + initialMinId: number + pageSize: number + fetchPage: (params: { minId: number; size: number }) => Promise + applyPage: (records: T[], nextMinId?: number) => Promise + isActive?: () => boolean + maxPages?: number +}): Promise { + const { initialMinId, pageSize, fetchPage, applyPage, isActive } = options + const maxPages = options.maxPages ?? MIN_ID_PULL_MAX_PAGES + let minId = initialMinId || 0 + for (let page = 0; page < maxPages; page++) { + if (isActive && !isActive()) { + return + } + const list = await fetchPage({ minId, size: pageSize }) + // 拉取期间取消 / 切账号:丢弃本批不入库,也不再翻页 + if (isActive && !isActive()) { + return + } + if (!list || list.length === 0) { + return + } + // 本批最大消息 id 作为下次游标;无有效 id 则本批 apply 后停(无法继续翻页) + const validIds = list.map((record) => record.id).filter((id): id is number => id != null) + const nextMinId = validIds.length > 0 ? Math.max(...validIds) : undefined + // applyPage 返回 false:本页未落地(如入库失败),不推进游标并终止,避免漏消息 + if ((await applyPage(list, nextMinId)) === false) { + return + } + // 无有效 id,或游标没前进(后端契约是 id > minId,理论不会出现):停,防御死翻 + if (nextMinId == null || nextMinId <= minId) { + return + } + minId = nextMinId + } + console.warn('[IM pull] runMinIdPull 达到单轮翻页上限,提前结束本轮') +}