feat(im): 增强消息拉取与状态补偿可靠性

- 新增会话读位置持久化接口与前端同步逻辑
- 增加好友、好友申请、加群申请的增量拉取补偿
- 统一前端 pull 编排,增加回扫窗口、落库等待和账号切换守卫
- 调整群成员为按群懒加载缓存,并移除全局成员增量链路
- 修复消息落库、读位置补偿、READ 事件乱序下的未读状态一致性
- 完善群申请红点快照刷新和管理员角色变化补偿
- 更新消息存储设计与修复记录文档
pull/884/MERGE
YunaiV 2026-06-15 08:26:32 +08:00
parent cf85fd4c86
commit 2685bc357f
18 changed files with 1051 additions and 292 deletions

View File

@ -0,0 +1,19 @@
import request from '@/config/axios'
// IM 会话读位置 Response VO
export interface ImConversationReadRespVO {
id: number // 读位置编号(增量拉取游标用)
conversationType: number // 会话类型,参见 ImConversationType
targetId: number // 会话目标编号
messageId: number // 最大已读消息编号
updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用)
}
// 增量拉取当前用户的会话读位置(重连 / 离线补偿)
export const pullMyConversationReadList = (params: {
lastUpdateTime?: number
lastId?: number
limit: number
}) => {
return request.get<ImConversationReadRespVO[]>({ url: '/im/conversation-read/pull', params })
}

View File

@ -13,6 +13,7 @@ export interface ImFriendRespVO {
status?: number // 好友状态0=正常1=已删除)
addTime?: string // 添加好友时间
deleteTime?: string // 删除好友时间
updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用)
// 聚合字段(自 AdminUser
nickname?: string // 好友昵称
nicknamePinyin?: string // 昵称的拼音(小写无空格,前端按首字母分桶 / 拼音搜索)
@ -32,6 +33,11 @@ export const getMyFriendList = () => {
return request.get<ImFriendRespVO[]>({ url: '/im/friend/list' })
}
// 增量拉取当前用户的好友关系(重连 / 离线补偿)
export const pullMyFriendList = (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => {
return request.get<ImFriendRespVO[]>({ url: '/im/friend/pull', params })
}
// 获得好友详情
export const getFriend = (friendUserId: number | string) => {
return request.get<ImFriendRespVO>({ url: '/im/friend/get', params: { friendUserId } })

View File

@ -11,6 +11,7 @@ export interface ImFriendRequestRespVO {
addSource?: number // 添加来源;参见 ImFriendAddSourceEnum
handleTime?: string // 处理时间
createTime: string // 申请创建时间
updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用)
// 聚合字段(自 AdminUser
fromNickname?: string // 发起方昵称
fromAvatar?: string // 发起方头像
@ -56,6 +57,15 @@ export const getMyFriendRequestList = (limit: number, maxId?: number) => {
})
}
// 增量拉取「我相关」的好友申请变更(重连 / 离线补偿)
export const pullMyFriendRequestList = (params: {
lastUpdateTime?: number
lastId?: number
limit: number
}) => {
return request.get<ImFriendRequestRespVO[]>({ url: '/im/friend-request/pull', params })
}
// 按 id 单查「我相关」的申请记录带越权过滤WebSocket 通知到达后用)
export const getMyFriendRequest = (id: number) => {
return request.get<ImFriendRequestRespVO | null>({

View File

@ -13,6 +13,7 @@ export interface ImGroupRequestRespVO {
addSource?: number // 加入来源;参见 ImGroupAddSourceEnum
handleTime?: string // 处理时间
createTime: string // 申请创建时间
updateTime?: number // 最近更新时间(毫秒时间戳,增量拉取游标用)
// 聚合字段
userNickname?: string // 申请人 / 被邀请人昵称
userAvatar?: string // 申请人 / 被邀请人头像
@ -69,3 +70,8 @@ export const getMyGroupRequest = (id: number) => {
params: { id }
})
}
// 增量拉取我管理的所有群下加群申请变更(重连 / 离线补偿)
export const pullMyGroupRequestList = (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => {
return request.get<ImGroupRequestRespVO[]>({ url: '/im/group-request/pull', params })
}

View File

@ -24,7 +24,10 @@
</div>
<!-- 成员宫格 member 渲染陌生群拉不到成员 -->
<div v-if="isMember && members.length" class="flex flex-wrap gap-2 justify-center w-full pt-2">
<div
v-if="isMember && members.length"
class="flex flex-wrap gap-2 justify-center w-full pt-2"
>
<GroupMemberGrid
v-for="member in members"
:key="member.userId"
@ -95,9 +98,7 @@ const isMember = computed(() => {
}
if (cached.membersLoaded && cached.members) {
const myId = getCurrentUserId()
return cached.members.some(
(m) => m.userId === myId && m.status === CommonStatusEnum.ENABLE
)
return cached.members.some((m) => m.userId === myId && m.status === CommonStatusEnum.ENABLE)
}
return true
})
@ -125,13 +126,11 @@ watch(
if (!id || !member) {
return
}
const list = await groupStore.fetchGroupMemberList(id)
const list = await groupStore.fetchGroupMemberList(id, true)
if (props.group?.id !== id) {
return
}
members.value = list.map((m) =>
convertGroupMemberLite(m, friendStore.getFriend(m.userId))
)
members.value = list.map((m) => convertGroupMemberLite(m, friendStore.getFriend(m.userId)))
},
{ immediate: true }
)

View File

@ -198,7 +198,7 @@ export const useMediaUploader = () => {
uploadProgress: 0,
_localFile: opts.file
}
messageStore.insertMessage(
void messageStore.insertMessage(
{
type: conversation.type,
targetId: conversation.targetId,
@ -206,7 +206,7 @@ export const useMediaUploader = () => {
avatar: conversation.avatar || ''
},
placeholder
)
).catch(() => undefined)
return { clientMessageId, blobUrl }
}

View File

@ -5,6 +5,7 @@ import { useImWebSocketStore } from '../store/websocketStore'
import { useFriendStore } from '../store/friendStore'
import { getFriendDisplayName } from '../../utils/user'
import { useGroupStore } from '../store/groupStore'
import { useGroupRequestStore } from '../store/groupRequestStore'
import {
pullPrivateMessages as apiPullPrivateMessages,
getPrivateMaxReadMessageId as apiGetPrivateMaxReadMessageId,
@ -18,6 +19,7 @@ import {
pullChannelMessages as apiPullChannelMessages,
type ImChannelMessageRespVO
} from '@/api/im/message/channel'
import { pullMyConversationReadList as apiPullMyConversationReadList } from '@/api/im/conversation/read'
import {
ImConversationType,
ImMessageStatus,
@ -32,9 +34,14 @@ import {
} from '../../utils/config'
import { buildChannelConversationStub } from '../../utils/channel'
import { generateClientMessageId, getPrivateMessagePeerId } from '../../utils/message'
import { runIncrementalPull, runMinIdPull } from '../../utils/pull'
import { StorageKeys } from '../../utils/db'
import { getCurrentUserId } from '@/utils/auth'
import type { Message } from '../types'
/** 三类消息 pull 接口返回的原始 VO 联合类型runMinIdPull 只需 id 推进游标,具体分发在 applyPage 内按类型 cast */
type PulledRawMessage = ImPrivateMessageRespVO | ImGroupMessageRespVO | ImChannelMessageRespVO
/**
* 线
*
@ -52,6 +59,7 @@ export const useMessagePuller = () => {
const wsStore = useImWebSocketStore()
const friendStore = useFriendStore()
const groupStore = useGroupStore()
const groupRequestStore = useGroupRequestStore()
const currentUserId = getCurrentUserId()
/** 判断请求是否被主动取消 */
@ -150,12 +158,12 @@ export const useMessagePuller = () => {
}
/**
* id minId
* 线 / minId / runMinIdPull +
* / / +
*
*
* 1. startEpochcancelPull() pullEpoch IM /
* 2. startUserId await userId logout / tab cancelPull
* session store
* isActive runMinIdPull session store
* 1. startEpochcancelPull() pullEpoch IM /
* 2. startUserId await userId logout / tab cancelPull
*/
const pullByType = async (
conversationType: number,
@ -164,108 +172,90 @@ export const useMessagePuller = () => {
startUserId: number,
signal: AbortSignal
) => {
// 私聊 / 群聊 / 频道各自一套接口;按 conversationType 在循环内分支调度
let minId = startMinId || 0
// 私聊 / 群聊 / 频道各自一套接口;按 conversationType 分支调度。翻页机制minId 游标 / 空页判断 / 防死翻)交给 runMinIdPull
const isPrivate = conversationType === ImConversationType.PRIVATE
const isChannel = conversationType === ImConversationType.CHANNEL
const size = isPrivate ? MESSAGE_PRIVATE_PULL_SIZE : MESSAGE_GROUP_PULL_SIZE
const isStillValid = () =>
!signal.aborted && pullEpoch === startEpoch && getCurrentUserId() === startUserId
while (true) {
if (!isStillValid()) {
return
}
let list: any[] | undefined
if (isPrivate) {
list = await apiPullPrivateMessages({ minId, size }, signal)
} else if (isChannel) {
list = await apiPullChannelMessages({ minId, size }, signal)
} else {
list = await apiPullGroupMessages({ minId, size }, signal)
}
// 接口返回期间发生 cancel / 切账号:丢弃本批不入库,也不再翻页
if (!isStillValid()) {
return
}
if (!list || list.length === 0) {
break
}
const pulledMessages: PulledMessage[] = []
// 逐条 dispatch原消息走批量 insertRECALL 信号走批量 recall 把同批内已 insert 的原消息更新为撤回提示。
// 后端按 id 升序返回,且信号 id 一定 > 原消息 id先更新 status 再插信号所以原消息一定先到、recallMessage 找得到
for (const raw of list) {
if (isChannel) {
const message = raw as ImChannelMessageRespVO
pulledMessages.push({
kind: 'insert',
conversationInfo: convertChannelConversation(message),
message: convertChannelMessage(message)
})
continue
}
await runMinIdPull<PulledRawMessage>({
initialMinId: startMinId,
pageSize: size,
isActive: isStillValid,
fetchPage: ({ minId, size }) => {
if (isPrivate) {
const message = raw as ImPrivateMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImMessageType.RECALL) {
return apiPullPrivateMessages({ minId, size }, signal)
}
if (isChannel) {
return apiPullChannelMessages({ minId, size }, signal)
}
return apiPullGroupMessages({ minId, size }, signal)
},
applyPage: async (list, nextMinId) => {
const pulledMessages: PulledMessage[] = []
// 逐条 dispatch原消息走批量 insertRECALL 信号走批量 recall 把同批内已 insert 的原消息更新为撤回提示。
// 后端按 id 升序返回,且信号 id 一定 > 原消息 id先更新 status 再插信号所以原消息一定先到、recallMessage 找得到
for (const raw of list) {
if (isChannel) {
const message = raw as ImChannelMessageRespVO
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.PRIVATE,
targetId: getPrivatePeerId(message),
recallSignalContent: message.content
kind: 'insert',
conversationInfo: convertChannelConversation(message),
message: convertChannelMessage(message)
})
continue
}
// 特殊:离线 pull 期间入库的 FRIEND_* 帧(目前仅 FRIEND_ADD persistent=true也要走好友数据分发
// 否则断线期间的好友列表更新会丢失;与 WebSocket 路径 dispatchPrivateFrame 保持对称
if (isFriendNotification(message.type)) {
wsStore.handleFriendNotification(message)
// 仅 FRIEND_ADD / FRIEND_DELETE 才作为会话气泡入消息列表
if (!isFriendChatTip(message.type)) {
if (isPrivate) {
const message = raw as ImPrivateMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImMessageType.RECALL) {
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.PRIVATE,
targetId: getPrivatePeerId(message),
recallSignalContent: message.content
})
continue
}
}
// 其它消息正常入会话消息列表
pulledMessages.push({
kind: 'insert',
conversationInfo: convertPrivateConversation(message),
message: convertPrivateMessage(message)
})
} else {
const message = raw as ImGroupMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImMessageType.RECALL) {
// 特殊:离线 pull 期间入库的 FRIEND_* 帧(目前仅 FRIEND_ADD persistent=true也要走好友数据分发
// 否则断线期间的好友列表更新会丢失;与 WebSocket 路径 dispatchPrivateFrame 保持对称
if (isFriendNotification(message.type)) {
wsStore.handleFriendNotification(message)
// 仅 FRIEND_ADD / FRIEND_DELETE 才作为会话气泡入消息列表
if (!isFriendChatTip(message.type)) {
continue
}
}
// 其它消息正常入会话消息列表
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.GROUP,
targetId: message.groupId,
recallSignalContent: message.content
kind: 'insert',
conversationInfo: convertPrivateConversation(message),
message: convertPrivateMessage(message)
})
} else {
const message = raw as ImGroupMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImMessageType.RECALL) {
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.GROUP,
targetId: message.groupId,
recallSignalContent: message.content
})
continue
}
// 其它消息正常入会话消息列表
pulledMessages.push({
kind: 'insert',
conversationInfo: convertGroupConversation(message),
message: convertGroupMessage(message)
})
continue
}
// 其它消息正常入会话消息列表
pulledMessages.push({
kind: 'insert',
conversationInfo: convertGroupConversation(message),
message: convertGroupMessage(message)
})
}
// 入库 + 推进 messageMaxIdnextMinId 为空(本批无有效 id时不推进游标与旧逻辑一致
await messageStore.applyPulledMessageList(pulledMessages, conversationType, nextMinId)
}
// 游标推进到本批最大消息编号
const validIds = list.map((message) => message.id).filter((id): id is number => id != null)
if (validIds.length === 0) {
await messageStore.applyPulledMessageList(pulledMessages, conversationType)
break
}
const nextMinId = Math.max(...validIds)
await messageStore.applyPulledMessageList(pulledMessages, conversationType, nextMinId)
// 游标没前进就停:当前后端契约是 id > minId理论不会出现防御后端契约变更或边界数据死翻
if (nextMinId <= minId) {
break
}
minId = nextMinId
}
})
}
/** 同一时刻只允许一次 pullIndex.vue 的手动调用与重连 watch 触发可能并发,共用同一个 promise 即可去重 */
@ -298,6 +288,47 @@ export const useMessagePuller = () => {
wsStore.discardBuffer()
}
/** 增量拉取我的会话读位置并合并到本地展示态 */
const pullConversationReads = async (isActive: () => boolean): Promise<void> => {
await runIncrementalPull(
StorageKeys.settings.conversationReadPullCursor,
apiPullMyConversationReadList,
async (records) => {
if (!isActive()) {
return false
}
await messageStore.applyConversationReadList(records, isActive)
if (!isActive()) {
return false
}
return true
},
isActive
)
}
/**
* /
*
* index.vue store allSettled
* cache groupId
*/
const pullStateEvents = async (): Promise<void> => {
groupStore.markAllGroupMembersExpired()
const results = await Promise.allSettled([
friendStore.pullFriends(),
friendStore.pullFriendRequests(),
groupStore.fetchGroupList(true),
groupRequestStore.pullGroupRequests(),
groupRequestStore.fetchUnhandledGroupRequestList()
])
for (const result of results) {
if (result.status === 'rejected') {
console.warn('[IM] 状态事件增量补偿失败', result.reason)
}
}
}
/** 执行一次全量增量拉取(重入安全:进行中再次调用复用同一个 promise */
const pullOnce = (): Promise<void> => {
if (!currentUserId) {
@ -323,8 +354,9 @@ export const useMessagePuller = () => {
return
}
conversationStore.loading = true
let messagePullSucceeded = false
try {
// 并发拉取私聊 + 群聊 + 频道,降低初始加载耗时
// 并发拉取私聊 + 群聊 + 频道消息,降低初始加载耗时
await Promise.all([
pullByType(
ImConversationType.PRIVATE,
@ -348,6 +380,7 @@ export const useMessagePuller = () => {
abortController.signal
)
])
messagePullSucceeded = true
} catch (e) {
if (isAbortError(e)) {
return
@ -364,22 +397,33 @@ export const useMessagePuller = () => {
if (!isCurrentPull()) {
return
}
if (!messagePullSucceeded) {
return
}
// 回放 WebSocket 在 loading 期间收到的缓冲消息(此刻走正常 insertMessage 路径)
// 回放 WebSocket 在 loading 期间收到的缓冲消息
const buffered = wsStore.flushBuffer()
const replayPersistPromises: Promise<void>[] = []
for (const item of buffered) {
if (item.conversationType === ImConversationType.PRIVATE) {
wsStore.handlePrivateMessage(item.payload)
replayPersistPromises.push(wsStore.handlePrivateMessage(item.payload))
} else if (item.conversationType === ImConversationType.CHANNEL) {
wsStore.handleChannelMessage(item.payload)
replayPersistPromises.push(wsStore.handleChannelMessage(item.payload))
} else {
wsStore.handleGroupMessage(item.payload)
replayPersistPromises.push(wsStore.handleGroupMessage(item.payload))
}
}
await Promise.all(replayPersistPromises)
// pull + replay 都完成后再排序,避免回放消息打乱顺序
conversationStore.sortConversationList()
// 消息和缓冲帧落库后再补读位置,避免读位置游标先推进导致新消息展示态漏更新
await pullConversationReads(isCurrentPull)
if (!isCurrentPull()) {
return
}
// 重连 / 冷启动后补齐当前激活私聊会话的「对方已读位置」
// 离线期间错过的 RECEIPT 推送会被这里补回;其他私聊会话等用户点开时由 Index.vue 的 watch 触发
// 私聊已读关闭时跳过,避免打到已禁用接口触发错误日志
@ -427,14 +471,16 @@ export const useMessagePuller = () => {
}
/**
* WS minId
* Index.vue pullOnce
* WS minId update_time + id / /
* Index.vue pullOnce + store
* store pullStateEvents pullOnce
*/
watch(
() => wsStore.isConnected,
(isConnected) => {
if (isConnected && initialPulled) {
void pullOnce()
void pullStateEvents()
}
}
)

View File

@ -131,7 +131,7 @@ export const useMessageSender = () => {
name: conversation.name || String(realTarget),
avatar: conversation.avatar || ''
}
messageStore.insertMessage(conversationInfo, message)
void messageStore.insertMessage(conversationInfo, message).catch(() => undefined)
}
// 3. 发送请求:按会话类型分发到不同接口;成功后 ackMessage 更新为 NORMAL失败更新为 FAILED
@ -143,13 +143,15 @@ export const useMessageSender = () => {
type,
content
})
void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, {
id: data.id,
sendTime: new Date(data.sendTime).getTime(),
status: data.status,
receiptStatus: data.receiptStatus,
content: data.content
})
void messageStore
.ackMessage(conversation.type, realTarget, clientMessageId, {
id: data.id,
sendTime: new Date(data.sendTime).getTime(),
status: data.status,
receiptStatus: data.receiptStatus,
content: data.content
})
.catch(() => undefined)
} else if (conversation.type === ImConversationType.GROUP) {
const data = await apiSendGroupMessage({
clientMessageId,
@ -159,21 +161,25 @@ export const useMessageSender = () => {
atUserIds: options?.atUserIds,
receipt: options?.receipt
})
void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, {
id: data.id,
sendTime: new Date(data.sendTime).getTime(),
status: data.status,
receiptStatus: data.receiptStatus,
readCount: data.readCount,
content: data.content
})
void messageStore
.ackMessage(conversation.type, realTarget, clientMessageId, {
id: data.id,
sendTime: new Date(data.sendTime).getTime(),
status: data.status,
receiptStatus: data.receiptStatus,
readCount: data.readCount,
content: data.content
})
.catch(() => undefined)
}
return true
} catch (e) {
console.error('[IM] 消息发送失败', { type, realTarget, clientMessageId }, e)
void messageStore.ackMessage(conversation.type, realTarget, clientMessageId, {
status: ImMessageStatus.FAILED
})
void messageStore
.ackMessage(conversation.type, realTarget, clientMessageId, {
status: ImMessageStatus.FAILED
})
.catch(() => undefined)
return false
}
}

View File

@ -87,34 +87,44 @@ onMounted(async () => {
channelStore.loadChannelList(),
groupRequestStore.loadGroupRequestList()
])
// 1.4
groupStore.markAllGroupMembersExpired()
// 1.4 unhandled-list
// pullGroupRequests / useMessagePuller.pullStateEvents
void groupRequestStore
.fetchUnhandledGroupRequestList()
.catch((e) => console.warn('[IM] 拉取未处理加群申请失败', e))
// 2.1 IDB pullOnce
// 2. pull线 / 退
// 2.1 IDB pullOnce
// 2.2 / await + onMounted
// pullOnce senderId IDB fetch Promise.all RTT
// pullOnce senderId IDB Promise.all RTT
const requiredFetches: Promise<unknown>[] = []
if (hasCachedFriends) {
void friendStore.fetchFriendList().catch((e) => console.warn('[IM] 后台好友失败', e))
void friendStore.pullFriends().catch((e) => console.warn('[IM] 后台增量拉好友失败', e))
} else {
requiredFetches.push(friendStore.fetchFriendList())
requiredFetches.push(friendStore.pullFriends())
}
if (hasCachedGroups) {
void groupStore.fetchGroupList().catch((e) => console.warn('[IM] 后台刷群列表失败', e))
void groupStore.fetchGroupList(true).catch((e) => console.warn('[IM] 后台刷群列表失败', e))
} else {
requiredFetches.push(groupStore.fetchGroupList())
requiredFetches.push(groupStore.fetchGroupList(true))
}
// 2.3 pull list
if (hasCachedChannels) {
void channelStore.fetchChannelList().catch((e) => console.warn('[IM] 后台刷频道列表失败', e))
} else {
requiredFetches.push(channelStore.fetchChannelList())
}
// 2.4
if (requiredFetches.length > 0) {
await Promise.all(requiredFetches)
}
// 2.5 线
void friendStore
.pullFriendRequests()
.catch((e) => console.warn('[IM] 后台增量拉好友申请失败', e))
// 3. WebSocket + 线pullOnce finally loading
webSocketStore.connect()
await pullOnce()

View File

@ -449,10 +449,12 @@ async function ensureGroupData(groupId: number) {
console.warn('[IM MessagePanel] loadGroupMemberList 失败', { groupId }, error)
return null
})
// in-memory
groupStore.fetchGroupMemberList(groupId, true).catch((error) => {
console.warn('[IM MessagePanel] fetchGroupMemberList 失败', { groupId }, error)
})
const group = groupStore.getGroup(groupId)
if (!group?.membersLoaded || group.membersExpired) {
groupStore.fetchGroupMemberList(groupId, true).catch((error) => {
console.warn('[IM MessagePanel] fetchGroupMemberList 失败', { groupId }, error)
})
}
}
/** 群信息抽屉里点"刷新":强拉一次最新群元数据 + 群成员 */

View File

@ -4,6 +4,7 @@ import { store } from '@/store'
import { CommonStatusEnum } from '@/utils/constants'
import {
getMyFriendList as apiGetMyFriendList,
pullMyFriendList as apiPullMyFriendList,
getFriend as apiGetFriend,
deleteFriend as apiDeleteFriend,
updateFriend as apiUpdateFriend,
@ -16,6 +17,7 @@ import {
agreeFriendRequest as apiAgreeFriendRequest,
refuseFriendRequest as apiRefuseFriendRequest,
getMyFriendRequestList as apiGetMyFriendRequestList,
pullMyFriendRequestList as apiPullMyFriendRequestList,
getMyFriendRequest as apiGetMyFriendRequest,
type ImFriendRequestApplyReqVO,
type ImFriendRequestRespVO
@ -23,17 +25,20 @@ import {
import { useConversationStore } from './conversationStore'
import { ImConversationType, ImFriendRequestHandleResult } from '../../utils/constants'
import { FRIEND_REQUEST_PAGE_SIZE } from '../../utils/config'
import { getDb } from '../../utils/db'
import { getDb, StorageKeys } from '../../utils/db'
import { runIncrementalPull } from '../../utils/pull'
import { getCurrentUserId } from '@/utils/auth'
import { getFriendDisplayName } from '../../utils/user'
import type { Friend, FriendDO, FriendLite, FriendRequest, FriendRequestDO } from '../types'
type PendingRequest = { epoch: number; userId: number; promise: Promise<void> }
/** 当前正在进行的好友列表拉取;多 dispatcher 同时触发时复用同一 Promise避免雪崩重拉 */
let pendingFetchFriends: Promise<void> | null = null
let pendingFetchFriends: PendingRequest | null = null
/** 当前正在进行的好友申请列表拉取;多端连续多条申请到达时复用同一 Promise避免雪崩重拉 */
let pendingFetchRequests: Promise<void> | null = null
let pendingFetchRequests: PendingRequest | null = null
/** 当前正在进行的「加载更多申请」请求 */
let pendingLoadMoreRequests: Promise<void> | null = null
let pendingLoadMoreRequests: PendingRequest | null = null
/** clear() 时递增;旧账号那次还没返回的请求 resolve 后比对一致才写 store防跨账号数据泄漏 */
let storeEpoch = 0
@ -176,13 +181,18 @@ export const useFriendStore = defineStore('imFriendStore', {
},
/** 保存单个好友 */
saveFriend(friend: Friend | undefined): void {
async saveFriendRecord(friend: Friend | undefined): Promise<void> {
if (!friend?.id) {
return
}
void getDb()
.put('friends', friend)
.catch((e) => console.warn('[IM friendStore] 本地好友写入失败', e))
await getDb().put('friends', friend)
},
/** 保存单个好友 */
saveFriend(friend: Friend | undefined): void {
void this.saveFriendRecord(friend).catch((e) =>
console.warn('[IM friendStore] 本地好友写入失败', e)
)
},
/** 保存好友申请列表 */
@ -199,13 +209,18 @@ export const useFriendStore = defineStore('imFriendStore', {
},
/** 保存单条好友申请 */
saveFriendRequest(request: FriendRequest | undefined): void {
async saveFriendRequestRecord(request: FriendRequest | undefined): Promise<void> {
if (!request) {
return
}
void getDb()
.put('friendRequests', request)
.catch((e) => console.warn('[IM friendStore] 本地好友申请写入失败', e))
await getDb().put('friendRequests', request)
},
/** 保存单条好友申请 */
saveFriendRequest(request: FriendRequest | undefined): void {
void this.saveFriendRequestRecord(request).catch((e) =>
console.warn('[IM friendStore] 本地好友申请写入失败', e)
)
},
// ==================== 远端拉取 ====================
@ -215,14 +230,18 @@ export const useFriendStore = defineStore('imFriendStore', {
if (this.loaded && !force) {
return
}
if (pendingFetchFriends) {
return pendingFetchFriends
}
// 快照 epochclear() 之后到 .then 之间触发的 epoch++ 表示账号已切,旧结果不能写入新 store
const requestEpoch = storeEpoch
pendingFetchFriends = apiGetMyFriendList()
const requestUserId = getCurrentUserId()
if (
pendingFetchFriends?.epoch === requestEpoch &&
pendingFetchFriends.userId === requestUserId
) {
return pendingFetchFriends.promise
}
const promise = apiGetMyFriendList()
.then((list) => {
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
this.friends = (list || []).map(convertFriend)
@ -241,23 +260,56 @@ export const useFriendStore = defineStore('imFriendStore', {
this.saveFriendList()
})
.finally(() => {
if (requestEpoch === storeEpoch) {
if (
pendingFetchFriends?.epoch === requestEpoch &&
pendingFetchFriends.userId === requestUserId
) {
pendingFetchFriends = null
}
})
return pendingFetchFriends
pendingFetchFriends = { epoch: requestEpoch, userId: requestUserId, promise }
return promise
},
/**
* IM = + / 线
*
* status upsert
*/
async pullFriends() {
// 快照 epoch账号在拉取途中切换clear() → epoch++)时丢弃旧账号那几页结果,防跨账号数据泄漏
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId
await runIncrementalPull(
StorageKeys.settings.friendPullCursor,
apiPullMyFriendList,
async (records) => {
if (!isActive()) {
return false
}
await Promise.all(records.map((vo) => this.upsertFriendForPull(convertFriend(vo))))
return true
},
isActive
)
// 置 loaded供通讯录页 fetchFriendList(force=false) 复用缓存而非重复全量拉
if (isActive()) {
this.loaded = true
}
},
/** 按 friendUserId 获取详情并合并到本地(保证 nickname / avatar 最新) */
async fetchFriendInfo(friendUserId: number) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
try {
const data = await apiGetFriend(friendUserId)
if (!data) {
return
}
// clear() 已切账号:旧请求的好友详情不能再 upsert 进新账号的 friends
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
this.upsertFriend(convertFriend(data))
@ -307,12 +359,19 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 拉取「我相关」的好友申请列表首页(页面打开 / 收到 FRIEND_REQUEST_RECEIVED 时刷新pending 期间复用同一 Promise */
async fetchFriendRequestList() {
if (pendingFetchRequests) {
return pendingFetchRequests
const currentUserId = getCurrentUserId()
if (
pendingFetchRequests.epoch === storeEpoch &&
pendingFetchRequests.userId === currentUserId
) {
return pendingFetchRequests.promise
}
}
const requestEpoch = storeEpoch
pendingFetchRequests = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE)
const requestUserId = getCurrentUserId()
const promise = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE)
.then((list) => {
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const items = (list || []).map(convertFriendRequest)
@ -322,26 +381,39 @@ export const useFriendStore = defineStore('imFriendStore', {
this.saveFriendRequestList()
})
.finally(() => {
if (requestEpoch === storeEpoch) {
if (
pendingFetchRequests?.epoch === requestEpoch &&
pendingFetchRequests.userId === requestUserId
) {
pendingFetchRequests = null
}
})
return pendingFetchRequests
pendingFetchRequests = { epoch: requestEpoch, userId: requestUserId, promise }
return promise
},
/** 加载更多申请(按本地最旧 requestId 游标分页);无更多 / pending 中直接返回 */
async loadMoreFriendRequestList() {
if (!this.hasMoreFriendRequests || pendingLoadMoreRequests || pendingFetchRequests) {
const requestUserId = getCurrentUserId()
const hasSameFetchPending =
pendingFetchRequests?.epoch === storeEpoch && pendingFetchRequests.userId === requestUserId
if (!this.hasMoreFriendRequests || hasSameFetchPending) {
return
}
if (
pendingLoadMoreRequests?.epoch === storeEpoch &&
pendingLoadMoreRequests.userId === requestUserId
) {
return pendingLoadMoreRequests.promise
}
const oldest = this.friendRequests[this.friendRequests.length - 1]
if (!oldest) {
return this.fetchFriendRequestList()
}
const requestEpoch = storeEpoch
pendingLoadMoreRequests = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE, oldest.id)
const promise = apiGetMyFriendRequestList(FRIEND_REQUEST_PAGE_SIZE, oldest.id)
.then((list) => {
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const items = (list || []).map(convertFriendRequest)
@ -350,11 +422,15 @@ export const useFriendStore = defineStore('imFriendStore', {
this.saveFriendRequestList()
})
.finally(() => {
if (requestEpoch === storeEpoch) {
if (
pendingLoadMoreRequests?.epoch === requestEpoch &&
pendingLoadMoreRequests.userId === requestUserId
) {
pendingLoadMoreRequests = null
}
})
return pendingLoadMoreRequests
pendingLoadMoreRequests = { epoch: requestEpoch, userId: requestUserId, promise }
return promise
},
/** 按 id 查申请记录;列表是按 id 倒序的小列表O(n) find 即可,不再维护 Map 索引 */
@ -365,34 +441,67 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 按 id 从后端单查并 upsert 到本地dispatcher 兜底用,避免全量重拉);后端带越权过滤 */
async fetchFriendRequest(requestId: number) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const data = await apiGetMyFriendRequest(requestId)
if (!data) {
return
}
// clear() 已切账号:旧请求的申请记录不能再写进新账号的 friendRequests
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const next = convertFriendRequest(data)
const existing = this.getFriendRequest(requestId)
this.upsertFriendRequest(convertFriendRequest(data))
},
/** 合并单条好友申请:已有则按 id 覆盖;新记录按 id 倒序插入(比本地最旧还老则跳过,留给 loadMore 带回) */
upsertFriendRequest(next: FriendRequest) {
void this.upsertFriendRequestForPull(next).catch((e) =>
console.warn('[IM friendStore] 本地好友申请写入失败', e)
)
},
/** 合并单条好友申请 */
async upsertFriendRequestForPull(next: FriendRequest): Promise<void> {
const existing = this.getFriendRequest(next.id)
if (existing) {
Object.assign(existing, next)
this.saveFriendRequest(existing)
await this.saveFriendRequestRecord(existing)
return
}
// 比本地最旧 id 还老:不入列表,让 loadMore 自然带回,避免破坏 id 倒序 / 后续 loadMore 重复 push
const oldest = this.friendRequests[this.friendRequests.length - 1]
if (oldest && requestId < oldest.id) {
if (oldest && next.id < oldest.id) {
return
}
// 按 id 倒序找首个比自己小的位置插入;找不到则追加末尾
const insertIndex = this.friendRequests.findIndex((request) => request.id < requestId)
const insertIndex = this.friendRequests.findIndex((request) => request.id < next.id)
if (insertIndex < 0) {
this.friendRequests.push(next)
} else {
this.friendRequests.splice(insertIndex, 0, next)
}
this.saveFriendRequest(next)
await this.saveFriendRequestRecord(next)
},
/** 增量拉取好友申请变更并合并(重连 / 离线补偿);按 update_time + id 游标,已处理的按 handleResult 覆盖 */
async pullFriendRequests() {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId
await runIncrementalPull(
StorageKeys.settings.friendRequestPullCursor,
apiPullMyFriendRequestList,
async (records) => {
if (!isActive()) {
return false
}
await Promise.all(
records.map((vo) => this.upsertFriendRequestForPull(convertFriendRequest(vo)))
)
return true
},
isActive
)
},
// ==================== 好友关系操作 ====================
@ -400,8 +509,9 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 删除好友(单向软删,本端置 DISABLEclear=true 时级联清理本地相关数据(如私聊会话),并透传后端给多端同步 */
async deleteFriend(friendUserId: number, clear: boolean = true) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
await apiDeleteFriend(friendUserId, clear)
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
this.removeFriend(friendUserId, clear)
@ -410,8 +520,9 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 切换免打扰:同步会话的 silent 字段,避免会话列表 silent 图标等 1210 推到才更新 */
async setFriendSilent(friendUserId: number, silent: boolean) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
await apiUpdateFriend({ friendUserId, silent })
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const friend = this.getFriend(friendUserId)
@ -426,8 +537,9 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 切换联系人置顶 */
async setFriendPinned(friendUserId: number, pinned: boolean) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
await apiUpdateFriend({ friendUserId, pinned })
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const friend = this.getFriend(friendUserId)
@ -440,8 +552,9 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 拉黑好友:本端乐观更新 + 调接口;后端 FRIEND_BLOCK 推到时由 dispatcher 兜底同步多端 */
async blockFriend(friendUserId: number) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
await apiBlockFriend(friendUserId)
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const friend = this.getFriend(friendUserId)
@ -454,8 +567,9 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 移出黑名单:本端乐观更新 + 调接口;后端 FRIEND_UNBLOCK 推到时由 dispatcher 兜底同步多端 */
async unblockFriend(friendUserId: number) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
await apiUnblockFriend(friendUserId)
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const friend = this.getFriend(friendUserId)
@ -468,10 +582,11 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 修改好友展示备注(仅自己可见) */
async setFriendDisplayName(friendUserId: number, displayName: string) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const value = displayName.trim()
// 后端 displayName 语义null/undefined = 不改,"" = 清空,所以这里直接传 value可能是空串
await apiUpdateFriend({ friendUserId, displayName: value })
if (requestEpoch !== storeEpoch) {
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const friend = this.getFriend(friendUserId)
@ -487,7 +602,16 @@ export const useFriendStore = defineStore('imFriendStore', {
/** 本地合并 / 新增某个好友WebSocket 事件 & 手动刷新都用) */
upsertFriend(friend: Friend) {
const index = this.friends.findIndex((existing) => existing.friendUserId === friend.friendUserId)
void this.upsertFriendForPull(friend).catch((e) =>
console.warn('[IM friendStore] 本地好友写入失败', e)
)
},
/** 本地合并 / 新增某个好友 */
async upsertFriendForPull(friend: Friend): Promise<void> {
const index = this.friends.findIndex(
(existing) => existing.friendUserId === friend.friendUserId
)
if (index >= 0) {
this.friends[index] = {
...this.friends[index],
@ -507,7 +631,7 @@ export const useFriendStore = defineStore('imFriendStore', {
avatar: friend.avatar,
silent: friend.silent
})
this.saveFriend(merged)
await this.saveFriendRecord(merged)
},
/** 本地标记删除WebSocket FRIEND_DELETE 事件触发clear=true 时级联清相关数据如私聊会话) */

View File

@ -5,13 +5,22 @@ import {
agreeGroupRequest as apiAgreeGroupRequest,
getMyGroupRequest as apiGetMyGroupRequest,
getUnhandledRequestList as apiGetUnhandledRequestList,
pullMyGroupRequestList as apiPullMyGroupRequestList,
refuseGroupRequest as apiRefuseGroupRequest,
type ImGroupRequestRespVO
} from '@/api/im/group/request'
import { ImGroupRequestHandleResult } from '@/views/im/utils/constants'
import { getDb } from '../../utils/db'
import { getDb, StorageKeys } from '../../utils/db'
import { runIncrementalPull } from '../../utils/pull'
import { getCurrentUserId } from '@/utils/auth'
import type { GroupRequestDO } from '../types'
type PendingRequest = { epoch: number; userId: number; promise: Promise<void> }
/** clear() 时递增;旧账号 in-flight 的 pullGroupRequests 结果 resolve 后比对一致才写 store防跨账号红点污染与 friendStore 同口径) */
let storeEpoch = 0
let pendingUnhandledFetch: PendingRequest | null = null
/**
* IM Store
*
@ -86,34 +95,113 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', {
.catch((e) => console.warn('[IM groupRequestStore] 本地加群申请缓存写入失败', e))
},
/** 保存单条加群申请 */
async saveGroupRequestRecord(request: ImGroupRequestRespVO): Promise<void> {
await getDb().put('groupRequests', request)
},
/** 保存单条加群申请 */
saveGroupRequest(request: ImGroupRequestRespVO): void {
void getDb()
.put('groupRequests', request)
.catch((e) => console.warn('[IM groupRequestStore] 本地加群申请写入失败', e))
void this.saveGroupRequestRecord(request).catch((e) =>
console.warn('[IM groupRequestStore] 本地加群申请写入失败', e)
)
},
/** 拉取我管理的所有群下未处理申请;进 IM 后 / 升级 admin 后 / WS 推送有冲突时调用 */
async fetchUnhandledGroupRequestList() {
const list = await apiGetUnhandledRequestList()
this.unhandledList = list || []
this.loaded = true
this.saveGroupRequestList()
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
if (
pendingUnhandledFetch?.epoch === requestEpoch &&
pendingUnhandledFetch.userId === requestUserId
) {
return pendingUnhandledFetch.promise
}
const promise = (async () => {
const list = await apiGetUnhandledRequestList()
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
this.unhandledList = list || []
this.loaded = true
this.saveGroupRequestList()
})().finally(() => {
if (
pendingUnhandledFetch?.epoch === requestEpoch &&
pendingUnhandledFetch.userId === requestUserId
) {
pendingUnhandledFetch = null
}
})
pendingUnhandledFetch = { epoch: requestEpoch, userId: requestUserId, promise }
return promise
},
/**
* WS 1503
*
* group_id, user_id requestId applyContent / inviterUserId fetch +
* handleResultHTTP 1505 / 1506returnedRequest
*/
async addGroupRequestById(requestId: number) {
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const request = await apiGetMyGroupRequest(requestId)
if (!request || request.handleResult !== ImGroupRequestHandleResult.UNHANDLED) {
if (!request) {
return
}
this.unhandledList = [request, ...this.unhandledList.filter((r) => r.id !== requestId)]
this.saveGroupRequest(request)
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
this.upsertGroupRequest(request)
},
/**
* / WS &
*
* id /
*/
upsertGroupRequest(request: ImGroupRequestRespVO) {
void this.upsertGroupRequestForPull(request).catch((e) =>
console.warn('[IM groupRequestStore] 本地加群申请写入失败', e)
)
},
/** 本地合并 / 新增单条加群申请 */
async upsertGroupRequestForPull(request: ImGroupRequestRespVO): Promise<void> {
if (request.handleResult !== ImGroupRequestHandleResult.UNHANDLED) {
await this.removeGroupRequestByIdForPull(request.id)
return
}
this.unhandledList = [request, ...this.unhandledList.filter((r) => r.id !== request.id)]
await this.saveGroupRequestRecord(request)
},
/**
* handleResult
*
* / 线 fetchUnhandledGroupRequestList
* = removeGroupRequestById no-op
*/
async pullGroupRequests() {
// 快照 epoch账号在拉取途中切换clear() → epoch++)时丢弃旧账号那几页结果,防跨账号红点污染
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
const isActive = () => requestEpoch === storeEpoch && getCurrentUserId() === requestUserId
await runIncrementalPull(
StorageKeys.settings.groupRequestPullCursor,
apiPullMyGroupRequestList,
async (records) => {
if (!isActive()) {
return false
}
await Promise.all(records.map((vo) => this.upsertGroupRequestForPull(vo)))
return true
},
isActive
)
if (isActive()) {
this.loaded = true
}
},
/** WS 收到 1505 / 1506 或本端处理完一条:按 requestId 从列表移除 */
@ -124,6 +212,12 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', {
.catch((e) => console.warn('[IM groupRequestStore] 本地加群申请删除失败', e))
},
/** 删除单条加群申请 */
async removeGroupRequestByIdForPull(requestId: number): Promise<void> {
this.unhandledList = this.unhandledList.filter((r) => r.id !== requestId)
await getDb().delete('groupRequests', requestId)
},
/** 同意申请;本端处理后立即从列表移除,避免被反复点击 */
async agreeGroupRequest(requestId: number) {
await apiAgreeGroupRequest(requestId)
@ -140,6 +234,9 @@ export const useGroupRequestStore = defineStore('imGroupRequestStore', {
clear() {
this.unhandledList = []
this.loaded = false
// 账号切换:递增 epoch 废弃旧账号 in-flight 的 pullGroupRequests 结果,避免写进新账号红点列表
storeEpoch++
pendingUnhandledFetch = null
}
}
})

View File

@ -27,6 +27,9 @@ import { getGroupDisplayName } from '../../utils/user'
import { type GroupNotificationPayload } from '../../utils/message'
import type { Group, GroupDO, GroupMember, GroupMemberDO, Message } from '../types'
/** clear() 时递增;旧账号 in-flight 的成员请求返回后比对一致才写 store */
let storeEpoch = 0
/**
* fetchGroupMemberList groupId Promise
*
@ -47,7 +50,12 @@ const pendingSingleMemberKey = (userId: number, groupId: number, memberUserId: n
/** 构建群 IndexedDB 记录 */
function buildGroupDO(group: Group): GroupDO {
const { members: _members, membersLoaded: _membersLoaded, ...record } = group
const {
members: _members,
membersLoaded: _membersLoaded,
membersExpired: _membersExpired,
...record
} = group
return record
}
@ -57,6 +65,13 @@ function isSelfInPayloadMembers(payload: GroupNotificationPayload): boolean {
return !!selfUserId && (payload.memberUserIds || []).includes(selfUserId)
}
/** 刷新我管理的群申请红点 */
function refreshUnhandledGroupRequests(): void {
useGroupRequestStore()
.fetchUnhandledGroupRequestList()
.catch(() => undefined)
}
/**
* IM Store
*
@ -68,8 +83,8 @@ function isSelfInPayloadMembers(payload: GroupNotificationPayload): boolean {
export const useGroupStore = defineStore('imGroupStore', {
state: () => ({
groups: [] as Group[],
// 仅 fetchGroupList 成功后置位loadGroupListIDB不置位否则后台 SWR 刷新会被缓存命中跳过
loaded: false
loaded: false, // 仅 fetchGroupList 成功后置位loadGroupListIDB不置位否则后台 SWR 刷新会被缓存命中跳过
groupMembersExpired: false // 进入 IM / 重连后置位IDB 里的成员桶延迟加载到内存时,也要按过期处理
}),
getters: {
@ -119,13 +134,18 @@ export const useGroupStore = defineStore('imGroupStore', {
},
/** 保存单个群 */
saveGroup(group: Group | undefined): void {
async saveGroupRecord(group: Group | undefined): Promise<void> {
if (!group) {
return
}
void getDb()
.put('groups', buildGroupDO(group))
.catch((e) => console.warn('[IM groupStore] 本地群写入失败', e))
await getDb().put('groups', buildGroupDO(group))
},
/** 保存单个群 */
saveGroup(group: Group | undefined): void {
void this.saveGroupRecord(group).catch((e) =>
console.warn('[IM groupStore] 本地群写入失败', e)
)
},
/** 从 IndexedDB 恢复指定群成员 */
@ -137,7 +157,11 @@ export const useGroupStore = defineStore('imGroupStore', {
return cachedGroup.members
}
try {
const cached = await getDb().getAllByIndex<GroupMemberDO>('groupMembers', 'groupId', groupId)
const cached = await getDb().getAllByIndex<GroupMemberDO>(
'groupMembers',
'groupId',
groupId
)
if (!cached || cached.length === 0) {
return null
}
@ -151,12 +175,14 @@ export const useGroupStore = defineStore('imGroupStore', {
name: '',
members: cached,
memberCount: cached.length,
membersLoaded: true
membersLoaded: true,
membersExpired: this.groupMembersExpired
})
} else {
group.members = cached
group.memberCount = cached.length
group.membersLoaded = true
group.membersExpired = this.groupMembersExpired
}
return cached
} catch (e) {
@ -193,8 +219,13 @@ export const useGroupStore = defineStore('imGroupStore', {
if (this.loaded && !force) {
return
}
const requestEpoch = storeEpoch
const requestUserId = getCurrentUserId()
// 拉取当前登录用户加入的所有群(不带成员;成员按需再走 fetchGroupMemberList
const list = await apiGetMyGroupList()
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return
}
const fresh = (list || []).map(convertGroup)
// 合并而非全量替换silent / groupRemark / 成员缓存这些字段不在 ImGroupRespVO 里,得从旧 group 保留
const groupMap = new Map(this.groups.map((group) => [group.id, group]))
@ -209,7 +240,8 @@ export const useGroupStore = defineStore('imGroupStore', {
memberCount: existing.memberCount ?? group.memberCount,
silent: existing.silent ?? group.silent,
groupRemark: existing.groupRemark,
membersLoaded: existing.membersLoaded
membersLoaded: existing.membersLoaded,
membersExpired: existing.membersExpired
}
})
this.loaded = true
@ -224,6 +256,24 @@ export const useGroupStore = defineStore('imGroupStore', {
this.saveGroupList()
},
/** 失效全部群成员缓存 */
markAllGroupMembersExpired() {
this.groupMembersExpired = true
for (const group of this.groups) {
if (group.membersLoaded) {
group.membersExpired = true
}
}
},
/** 失效指定群成员缓存 */
markGroupMembersExpired(groupId: number) {
const group = this.getGroup(groupId)
if (group?.membersLoaded) {
group.membersExpired = true
}
},
/** 单群刷新:用 /im/group/get 拉一份最新元数据再 upsert常用于 GROUP_UPDATE 推送后或手动 reload */
async fetchGroupInfo(groupId: number) {
try {
@ -241,7 +291,7 @@ export const useGroupStore = defineStore('imGroupStore', {
fetchGroupMemberList(groupId: number, force = false): Promise<GroupMember[]> {
// in-memory "完整"加载过才命中——单成员补齐写入的 partial members 不在此返回membersLoaded=false
const cached = this.getGroup(groupId)
if (cached && cached.members && cached.membersLoaded && !force) {
if (cached && cached.members && cached.membersLoaded && !cached.membersExpired && !force) {
return Promise.resolve(cached.members)
}
// 未登录:不发起请求也不登记 in-flight避免污染单飞表
@ -249,6 +299,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (!requestUserId) {
return Promise.resolve([])
}
const requestEpoch = storeEpoch
// 同 (userId, groupId) 已经有正在飞的请求:直接复用,避免重复打接口
const key = pendingMemberKey(requestUserId, groupId)
const inflight = pendingMemberFetches.get(key)
@ -258,6 +309,9 @@ export const useGroupStore = defineStore('imGroupStore', {
const promise = (async () => {
// 拉接口 + 单 pass 转换:同时捕获 me 的原始 VO给下面回填 user-per-group 字段silent / groupRemark
const list = await apiGetGroupMemberList(groupId)
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return []
}
let meRaw: ImGroupMemberRespVO | undefined
const members = (list || []).map((member) => {
if (member.userId === requestUserId) {
@ -282,12 +336,14 @@ export const useGroupStore = defineStore('imGroupStore', {
memberCount: members.length,
silent,
groupRemark,
membersLoaded: true
membersLoaded: true,
membersExpired: false
})
} else {
group.members = members
group.memberCount = members.length
group.membersLoaded = true
group.membersExpired = false
// silent / groupRemark 任一变化才同步到 conversation 和 IDBgroupRemark 变化要顺带刷会话名
if (group.silent !== silent || group.groupRemark !== groupRemark) {
group.silent = silent
@ -332,6 +388,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (!requestUserId) {
return Promise.resolve(null)
}
const requestEpoch = storeEpoch
// 同 (userId, groupId, memberUserId) 已经有正在飞的请求:直接复用
const key = pendingSingleMemberKey(requestUserId, groupId, memberUserId)
const inflight = pendingSingleMemberFetches.get(key)
@ -343,6 +400,9 @@ export const useGroupStore = defineStore('imGroupStore', {
if (!data) {
return null
}
if (requestEpoch !== storeEpoch || getCurrentUserId() !== requestUserId) {
return null
}
const member = convertGroupMember(data, groupId)
// 把这一条 upsert 进 group.members 仅供 in-memory 渲染兜底group 还没就位则用 placeholder
// 注意:不写 IDB——成员桶语义是"全量",存"1 人桶"会污染下次冷启动的 loadGroupMemberList
@ -373,6 +433,13 @@ export const useGroupStore = defineStore('imGroupStore', {
/** 按 id 插入或合并群(命中则浅合并保留旧字段,未命中则追加),同步把展示名 / 头像 / 免打扰推到对应会话 */
upsertGroup(group: Group) {
void this.upsertGroupAndSave(group).catch((e) =>
console.warn('[IM groupStore] 本地群写入失败', e)
)
},
/** 按 id 插入或合并群 */
async upsertGroupAndSave(group: Group): Promise<void> {
const index = this.groups.findIndex((g) => g.id === group.id)
if (index >= 0) {
this.groups[index] = { ...this.groups[index], ...group }
@ -388,7 +455,7 @@ export const useGroupStore = defineStore('imGroupStore', {
silent: merged.silent
})
// 持久化到 IDBfire-and-forget
this.saveGroup(merged)
await this.saveGroupRecord(merged)
},
/** 本地移除群缓存和群会话群解散GROUP_DEL、退群、被踢都复用 */
@ -567,16 +634,27 @@ export const useGroupStore = defineStore('imGroupStore', {
this.applyGroupMemberNicknameUpdateNotification(groupId, payload)
break
case ImMessageType.GROUP_ADMIN_ADD:
this.updateGroupMemberRoleList(groupId, payload.memberUserIds || [], ImGroupMemberRole.ADMIN)
this.updateGroupMemberRoleList(
groupId,
payload.memberUserIds || [],
ImGroupMemberRole.ADMIN
)
this.markGroupMembersExpired(groupId)
// 自己被加为管理员,原本看不到的群下未处理申请现在变可见,重新拉一次 unhandledList
if (isSelfInPayloadMembers(payload)) {
useGroupRequestStore()
.fetchUnhandledGroupRequestList()
.catch(() => undefined)
refreshUnhandledGroupRequests()
}
break
case ImMessageType.GROUP_ADMIN_REMOVE:
this.updateGroupMemberRoleList(groupId, payload.memberUserIds || [], ImGroupMemberRole.NORMAL)
this.updateGroupMemberRoleList(
groupId,
payload.memberUserIds || [],
ImGroupMemberRole.NORMAL
)
this.markGroupMembersExpired(groupId)
if (isSelfInPayloadMembers(payload)) {
refreshUnhandledGroupRequests()
}
break
case ImMessageType.GROUP_OWNER_TRANSFER:
this.applyGroupOwnerTransferNotification(groupId, payload)
@ -649,6 +727,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (isSelfInPayloadMembers(payload) && !this.getGroup(groupId)) {
await this.fetchGroupInfo(groupId)
}
this.markGroupMembersExpired(groupId)
this.fetchGroupMemberList(groupId, true).catch(() => undefined)
},
@ -659,6 +738,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (selfUserId && payload.entrantUserId === selfUserId && !this.getGroup(groupId)) {
await this.fetchGroupInfo(groupId)
}
this.markGroupMembersExpired(groupId)
this.fetchGroupMemberList(groupId, true).catch(() => undefined)
},
@ -670,6 +750,7 @@ export const useGroupStore = defineStore('imGroupStore', {
this.removeGroup(groupId)
} else if (payload.operatorUserId) {
this.removeLocalGroupMemberList(groupId, [payload.operatorUserId])
this.markGroupMembersExpired(groupId)
}
},
@ -684,6 +765,7 @@ export const useGroupStore = defineStore('imGroupStore', {
this.removeGroup(groupId)
} else if (memberIds.length) {
this.removeLocalGroupMemberList(groupId, memberIds)
this.markGroupMembersExpired(groupId)
}
},
@ -695,6 +777,7 @@ export const useGroupStore = defineStore('imGroupStore', {
payload.operatorUserId,
payload.displayUserName ?? ''
)
this.markGroupMembersExpired(groupId)
}
},
@ -702,13 +785,14 @@ export const useGroupStore = defineStore('imGroupStore', {
applyGroupOwnerTransferNotification(groupId: number, payload: GroupNotificationPayload) {
if (payload.operatorUserId && payload.newOwnerUserId) {
this.transferGroupOwner(groupId, payload.operatorUserId, payload.newOwnerUserId)
this.markGroupMembersExpired(groupId)
}
// 自己接管群主:原本看不到的群下未处理申请现在变可见,重新拉一次 unhandledList
const selfUserId = getCurrentUserId()
if (selfUserId && payload.newOwnerUserId === selfUserId) {
useGroupRequestStore()
.fetchUnhandledGroupRequestList()
.catch(() => undefined)
refreshUnhandledGroupRequests()
} else if (selfUserId && payload.operatorUserId === selfUserId) {
refreshUnhandledGroupRequests()
}
},
@ -770,6 +854,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (member && payload.muteEndTime) {
member.muteEndTime = payload.muteEndTime
this.saveGroupMemberList(groupId)
this.markGroupMembersExpired(groupId)
}
},
@ -780,6 +865,7 @@ export const useGroupStore = defineStore('imGroupStore', {
if (member) {
member.muteEndTime = undefined
this.saveGroupMemberList(groupId)
this.markGroupMembersExpired(groupId)
}
},
@ -787,6 +873,9 @@ export const useGroupStore = defineStore('imGroupStore', {
clear() {
this.groups = []
this.loaded = false
this.groupMembersExpired = false
// 账号切换:递增 epoch 废弃旧账号 in-flight 的成员请求
storeEpoch++
// 单飞表跟 in-memory state 一起重置;旧账号 in-flight 的请求 finally 也会自己 delete key提前清空只是更干脆
pendingMemberFetches.clear()
pendingSingleMemberFetches.clear()

View File

@ -30,6 +30,7 @@ import { getCurrentUserId } from '@/utils/auth'
import { tryGetSenderDisplayName } from '../../utils/user'
import { useGroupStore } from './groupStore'
import { useConversationStore } from './conversationStore'
import type { ImConversationReadRespVO } from '@/api/im/conversation/read'
import type { Conversation, Message, MessageDO } from '../types'
const MESSAGE_CACHE_RECENT_CONVERSATION_LIMIT = 5
@ -136,9 +137,10 @@ function deriveLastSenderDisplayName(
if (conversation.type === ImConversationType.GROUP) {
const groupStore = useGroupStore()
const group = groupStore.getGroup(conversation.targetId)
const fetchPromise = group?.membersLoaded
? groupStore.fetchGroupMember(conversation.targetId, senderId)
: groupStore.fetchGroupMemberList(conversation.targetId)
const fetchPromise =
group?.membersLoaded && !group.membersExpired
? groupStore.fetchGroupMember(conversation.targetId, senderId)
: groupStore.fetchGroupMemberList(conversation.targetId)
fetchPromise.catch((e) =>
console.warn(
'[IM messageStore] 兜底拉群成员失败',
@ -233,6 +235,24 @@ function isSameMessage(left: Message, right: Message): boolean {
return !!left.clientMessageId && left.clientMessageId === right.clientMessageId
}
/** 获取对方普通消息最大编号 */
function getMaxIncomingNormalMessageId(
messages: Array<Pick<Message, 'id' | 'selfSend' | 'type' | 'status'>>
): number {
return messages.reduce((maxMessageId, message) => {
if (
message.id &&
!message.selfSend &&
isNormalMessage(message.type) &&
message.status !== ImMessageStatus.RECALL &&
message.id > maxMessageId
) {
return message.id
}
return maxMessageId
}, 0)
}
export const useMessageStore = defineStore('imMessageStore', {
state: () => ({
messagesByConversation: {} as Record<string, Message[]>,
@ -390,8 +410,8 @@ export const useMessageStore = defineStore('imMessageStore', {
/** 保存消息游标 */
async saveMessageCursor(conversationType: number, messageId?: number, tx?: DbTransaction) {
this.updateMessageCursor(conversationType, messageId)
await setMessageMaxId(conversationType, messageId, tx)
this.updateMessageCursor(conversationType, messageId)
},
/** 应用撤回到内存 */
@ -501,7 +521,6 @@ export const useMessageStore = defineStore('imMessageStore', {
recomputeConversationLast(conversation, messages)
syncConversationAtFlags(conversation, message)
}
this.updateMessageCursor(conversationInfo.type, message.id)
addChanged(conversation, messages[existingIndex], {
mergeClientRecord: hasServerClientMessageId
})
@ -535,29 +554,33 @@ export const useMessageStore = defineStore('imMessageStore', {
}
}
messages.splice(insertIndex, 0, message)
this.updateMessageCursor(conversationInfo.type, message.id)
addChanged(conversation, message, {
mergeClientRecord: hasServerClientMessageId && !!message.id
})
}
// 2. 更新内存游标
this.updateMessageCursor(conversationType, maxMessageId)
// 3. 单事务写入消息、会话摘要和游标
await getDb()
.transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => {
// 3.1 写入本批变更消息
// 2. 单事务写入消息、会话摘要和游标
await getDb().transaction(
['messages', 'conversations', 'settings'],
'readwrite',
async (tx) => {
// 2.1 写入本批变更消息
for (const item of persistedMessages.values()) {
await this.saveMessageRecord(item.message, item.conversationType, tx, {
mergeClientRecord: item.mergeClientRecord
})
}
// 3.2 写入本批变更会话
// 2.2 写入本批变更会话
await conversationStore.saveConversationRecord([...changedConversations.values()], tx)
// 3.3 写入本批游标
// 2.3 写入本批游标
await setMessageMaxId(conversationType, maxMessageId, tx)
})
.catch((e) => console.error('[IM messageStore] 批量消息写入失败', e))
}
)
// 3. 持久化成功后推进内存游标
this.updateMessageCursor(conversationType, maxMessageId)
for (const item of persistedMessages.values()) {
this.updateMessageCursor(item.conversationType, item.message.id)
}
},
/** 插入消息 */
@ -565,7 +588,7 @@ export const useMessageStore = defineStore('imMessageStore', {
conversationInfo: MessageConversationInfo,
messageInfo: Message,
options?: { saveMaxId?: boolean }
) {
): Promise<void> {
const conversationStore = useConversationStore()
const hasIncomingClientMessageId = !!messageInfo.clientMessageId
const message = ensureClientMessageId(messageInfo)
@ -589,8 +612,7 @@ export const useMessageStore = defineStore('imMessageStore', {
recomputeConversationLast(conversation, messages)
syncConversationAtFlags(conversation, message)
}
this.updateMessageCursor(conversationInfo.type, message.id)
void getDb()
return getDb()
.transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => {
await this.saveMessageRecord(messages[existingIndex], conversationInfo.type, tx, {
mergeClientRecord: hasIncomingClientMessageId
@ -600,8 +622,13 @@ export const useMessageStore = defineStore('imMessageStore', {
await setMessageMaxId(conversationInfo.type, message.id, tx)
}
})
.catch((e) => console.error('[IM messageStore] 消息写入失败', e))
return
.catch((e) => {
console.error('[IM messageStore] 消息写入失败', e)
throw e
})
.then(() => {
this.updateMessageCursor(conversationInfo.type, message.id)
})
}
// 4. 新消息更新会话摘要和未读状态
@ -632,9 +659,8 @@ export const useMessageStore = defineStore('imMessageStore', {
}
}
messages.splice(insertIndex, 0, message)
this.updateMessageCursor(conversationInfo.type, message.id)
// 6. 单事务写入消息、会话摘要和游标
void getDb()
return getDb()
.transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => {
await this.saveMessageRecord(message, conversationInfo.type, tx, {
mergeClientRecord: hasIncomingClientMessageId && !!message.id
@ -644,7 +670,13 @@ export const useMessageStore = defineStore('imMessageStore', {
await setMessageMaxId(conversationInfo.type, message.id, tx)
}
})
.catch((e) => console.error('[IM messageStore] 消息写入失败', e))
.catch((e) => {
console.error('[IM messageStore] 消息写入失败', e)
throw e
})
.then(() => {
this.updateMessageCursor(conversationInfo.type, message.id)
})
},
/** ack 合并 */
@ -696,7 +728,6 @@ export const useMessageStore = defineStore('imMessageStore', {
if (messages[messages.length - 1] === message) {
recomputeConversationLast(conversation, messages)
}
this.updateMessageCursor(conversationType, message.id)
// 3. 单事务写入消息、会话摘要和游标
await getDb()
.transaction(['messages', 'conversations', 'settings'], 'readwrite', async (tx) => {
@ -706,7 +737,11 @@ export const useMessageStore = defineStore('imMessageStore', {
await conversationStore.saveConversationRecord(conversation, tx)
await setMessageMaxId(conversationType, message.id, tx)
})
.catch((e) => console.error('[IM messageStore] ack 写入失败', e))
.catch((e) => {
console.error('[IM messageStore] ack 写入失败', e)
throw e
})
this.updateMessageCursor(conversationType, message.id)
} finally {
// 4. 清理合并标记
message._ackMerging = false
@ -743,7 +778,11 @@ export const useMessageStore = defineStore('imMessageStore', {
},
/** 撤回消息 */
recallMessage(conversationType: number, targetId: number, recallSignalContent: string) {
async recallMessage(
conversationType: number,
targetId: number,
recallSignalContent: string
): Promise<void> {
const conversationStore = useConversationStore()
const changed = this.applyRecallMessageInMemory(
conversationType,
@ -753,10 +792,15 @@ export const useMessageStore = defineStore('imMessageStore', {
if (!changed) {
return
}
this.saveMessageRecord(changed.message, conversationType).catch((e) =>
console.error('[IM messageStore] 撤回消息写入失败', e)
)
conversationStore.saveConversation(changed.conversation)
await getDb()
.transaction(['messages', 'conversations'], 'readwrite', async (tx) => {
await this.saveMessageRecord(changed.message, conversationType, tx)
await conversationStore.saveConversationRecord(changed.conversation, tx)
})
.catch((e) => {
console.error('[IM messageStore] 撤回消息写入失败', e)
throw e
})
},
/** 应用已读回执 */
@ -809,6 +853,127 @@ export const useMessageStore = defineStore('imMessageStore', {
.catch((e) => console.warn('[IM messageStore] 回执写入失败', e))
},
/** 应用会话读位置补偿 */
async applyConversationReadList(
records: ImConversationReadRespVO[],
isActive?: () => boolean
): Promise<void> {
if (records.length === 0) {
return
}
const conversationStore = useConversationStore()
const changedConversations = new Map<string, Conversation>()
const changedMessages = new Map<string, MessageDO>()
const db = getDb()
// 1. 按读位置更新会话未读和频道已读态
for (const record of records) {
if (isActive && !isActive()) {
return
}
if (!record.conversationType || !record.targetId || !record.messageId) {
continue
}
const clientConversationId = getClientConversationId(
record.conversationType,
record.targetId
)
let storedMessages: MessageDO[] | undefined
const getStoredMessages = async () => {
if (!storedMessages) {
storedMessages = await db.getAllByIndex<MessageDO>(
'messages',
'clientConversationId',
clientConversationId
)
}
return storedMessages
}
const conversation = conversationStore.getConversation(
record.conversationType,
record.targetId
)
if (
conversation &&
(conversation.unreadCount > 0 || conversation.atMe || conversation.atAll)
) {
const memoryMessages = this.messagesByConversation[clientConversationId]
let readCovered =
!!conversation.lastMessageId && conversation.lastMessageId <= record.messageId
const latestMessageLoaded =
!!conversation.lastMessageId &&
memoryMessages?.some((message) => message.id === conversation.lastMessageId)
if (!readCovered && latestMessageLoaded && memoryMessages) {
const maxIncomingMessageId = getMaxIncomingNormalMessageId(memoryMessages)
readCovered = maxIncomingMessageId > 0 && maxIncomingMessageId <= record.messageId
}
if (!readCovered && !latestMessageLoaded) {
const storedMessages = await getStoredMessages()
const latestMessageStored =
!!conversation.lastMessageId &&
storedMessages.some((message) => message.id === conversation.lastMessageId)
if (latestMessageStored) {
const storedMaxIncomingMessageId = getMaxIncomingNormalMessageId(storedMessages)
readCovered =
storedMaxIncomingMessageId > 0 && storedMaxIncomingMessageId <= record.messageId
}
}
if (readCovered) {
conversation.unreadCount = 0
conversation.atMe = false
conversation.atAll = false
changedConversations.set(clientConversationId, conversation)
}
}
if (record.conversationType !== ImConversationType.CHANNEL) {
continue
}
const memoryMessages = this.messagesByConversation[clientConversationId] || []
for (const message of memoryMessages) {
if (
message.id &&
message.id <= record.messageId &&
message.receiptStatus !== ImMessageReceiptStatus.DONE
) {
message.receiptStatus = ImMessageReceiptStatus.DONE
}
}
for (const message of await getStoredMessages()) {
if (
message.id &&
message.id <= record.messageId &&
message.receiptStatus !== ImMessageReceiptStatus.DONE
) {
message.receiptStatus = ImMessageReceiptStatus.DONE
changedMessages.set(message.messageKey, message)
}
}
}
// 2. 持久化本轮变更
if (changedConversations.size === 0 && changedMessages.size === 0) {
return
}
if (isActive && !isActive()) {
return
}
const stores: Array<'conversations' | 'messages'> = []
if (changedConversations.size > 0) {
stores.push('conversations')
}
if (changedMessages.size > 0) {
stores.push('messages')
}
await db.transaction(stores, 'readwrite', async (tx) => {
if (changedConversations.size > 0) {
await conversationStore.saveConversationRecord([...changedConversations.values()], tx)
}
for (const message of changedMessages.values()) {
await db.put('messages', message, tx)
}
})
},
/** 前置历史消息 */
prependMessageList(conversationType: number, targetId: number, earlierMessages: Message[]) {
if (earlierMessages.length === 0) {

View File

@ -69,6 +69,11 @@ const isFriendDeleteWithClear = (frame: ImPrivateMessageDTO): boolean => {
const RTC_LIVEKIT_PROTOCOLS = new Set(['ws:', 'wss:', 'http:', 'https:'])
const RTC_MEDIA_TYPES = new Set<number>(Object.values(ImRtcCallMediaType))
/** 忽略普通实时帧持久化失败 */
function ignoreRealtimePersistError(promise: Promise<void>): void {
void promise.catch(() => undefined)
}
/** 校验 LiveKit 连接地址 */
function isValidLiveKitUrl(url?: string): boolean {
if (!url) {
@ -302,27 +307,28 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
this.handleChannelRead(websocketMessage)
return
}
this.handleChannelMessage(websocketMessage)
ignoreRealtimePersistError(this.handleChannelMessage(websocketMessage))
},
/** 频道 READ自己其它终端在某频道里标为已读本端同步清零该频道未读 */
handleChannelRead(websocketMessage: ImChannelMessageRespVO) {
const conversationStore = useConversationStore()
const conversation = conversationStore.getConversation(
ImConversationType.CHANNEL,
websocketMessage.channelId
)
if (conversation) {
conversation.unreadCount = 0
conversationStore.saveConversation(conversation)
}
void useMessageStore()
.applyConversationReadList([
{
id: websocketMessage.id,
conversationType: ImConversationType.CHANNEL,
targetId: websocketMessage.channelId,
messageId: websocketMessage.id
}
])
.catch((e) => console.warn('[IM WS] 频道已读同步失败', e))
},
/**
* + insertMessage
* pull WS id messageStore.insertMessage id
*/
handleChannelMessage(websocketMessage: ImChannelMessageRespVO) {
handleChannelMessage(websocketMessage: ImChannelMessageRespVO): Promise<void> {
const conversationStore = useConversationStore()
const messageStore = useMessageStore()
// 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱
@ -331,7 +337,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
conversationType: ImConversationType.CHANNEL,
payload: websocketMessage
})
return
return Promise.resolve()
}
const sendTimeMs =
typeof websocketMessage.sendTime === 'number'
@ -345,7 +351,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
conversationStore.activeConversation?.type === ImConversationType.CHANNEL &&
conversationStore.activeConversation?.targetId === websocketMessage.channelId
// 频道单向订阅receiptStatus 表达「我是否已读这条」:会话打开即已读 DONE否则 PENDING与 pull 口径一致)
messageStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), {
const persistPromise = messageStore.insertMessage(buildChannelConversationStub(websocketMessage.channelId), {
id: websocketMessage.id,
clientMessageId: '',
type: websocketMessage.type,
@ -368,6 +374,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
// 非当前会话且未免打扰:响一下提示音
playAudioTip()
}
return persistPromise
},
/** content 既可能已是对象也可能是 JSON 字符串(后端用 Map 序列化下发) */
@ -410,7 +417,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
case ImMessageType.RTC_CALL_END:
// 入库 + 关闭通话窗 + 渲染聊天 tip私聊场景
this.handleRtcCallEnd(websocketMessage)
this.handlePrivateMessage(websocketMessage)
ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage))
break
default:
if (isFriendNotification(websocketMessage.type)) {
@ -422,14 +429,14 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
isFriendChatTip(websocketMessage.type) &&
!isFriendDeleteWithClear(websocketMessage)
) {
this.handlePrivateMessage(websocketMessage)
ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage))
}
} else if (isGroupRequestNotification(websocketMessage.type)) {
// 加群申请通知1503 / 1505 / 1506走私聊通道与好友通知同段位但分开 dispatcher
this.handleGroupRequestNotification(websocketMessage)
} else {
// TEXT / IMAGE / FILE / VOICE / VIDEO 等普通消息
this.handlePrivateMessage(websocketMessage)
ignoreRealtimePersistError(this.handlePrivateMessage(websocketMessage))
}
}
} catch (e) {
@ -457,16 +464,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
break
case ImMessageType.RTC_CALL_START:
// 入库 + 渲染聊天 tip胶囊条状态走 1602/1603本帧不动 rtcStore避免与首次填充竞争
this.handleGroupMessage(websocketMessage)
ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage))
break
case ImMessageType.RTC_CALL_END:
// 入库 + 移除胶囊条 + 关闭通话窗(如果当前在该群通话内)
this.handleRtcCallEnd(websocketMessage)
this.handleGroupMessage(websocketMessage)
ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage))
break
default:
// TEXT / IMAGE / FILE / VOICE / VIDEO + GROUP_* 群广播事件
this.handleGroupMessage(websocketMessage)
ignoreRealtimePersistError(this.handleGroupMessage(websocketMessage))
}
} catch (e) {
// 单条帧的处理异常不应阻断后续帧;打印完整 websocketMessage 便于排查
@ -484,7 +491,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
* 4. Message
* 5.
*/
handlePrivateMessage(websocketMessage: ImPrivateMessageDTO) {
handlePrivateMessage(websocketMessage: ImPrivateMessageDTO): Promise<void> {
const conversationStore = useConversationStore()
const friendStore = useFriendStore()
const currentUserId = getCurrentUserId()
@ -497,7 +504,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
websocketMessage.receiverId !== currentUserId
) {
console.warn('[IM WS] 丢弃不属于当前用户的私聊帧', websocketMessage)
return
return Promise.resolve()
}
// 1. 离线加载期间先缓冲,等 pull 完成后再统一回放,避免重复或顺序错乱
@ -506,7 +513,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
conversationType: ImConversationType.PRIVATE,
payload: websocketMessage
})
return
return Promise.resolve()
}
// 2. selfSend / peerId自己发的消息属于「发给 receiverId 的会话」,别人发的属于「发送者的会话」
@ -523,17 +530,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
// 3. 后端撤回:下发一条 RECALL 消息content 为 `{"messageId": xxx}`(对齐 ImMessageTypeEnum.RECALL → RecallMessage
// 这里拦截下来改走 recallMessage把原消息更新为 RECALL 态),不让它作为新消息进列表
if (websocketMessage.type === ImMessageType.RECALL) {
useMessageStore().recallMessage(
return useMessageStore().recallMessage(
ImConversationType.PRIVATE,
peerId,
websocketMessage.content
)
return
}
// 4. 后端 DTO → 前端 Message发送人名渲染时实时算不写入消息字段
const message = convertPrivateMessage(websocketMessage, currentUserId)
useMessageStore().insertMessage(
const persistPromise = useMessageStore().insertMessage(
{
type: ImConversationType.PRIVATE,
targetId: peerId,
@ -564,6 +570,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
playAudioTip()
}
}
return persistPromise
},
/** 私聊 READ 事件:自己的其它终端在对方会话里标为已读,本端同步清零未读;私聊已读关闭时兜底忽略 */
@ -571,11 +578,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
if (!MESSAGE_PRIVATE_READ_ENABLED) {
return
}
const conversationStore = useConversationStore()
conversationStore.markConversationRead(
ImConversationType.PRIVATE,
websocketMessage.receiverId
)
void useMessageStore()
.applyConversationReadList([
{
id: websocketMessage.id,
conversationType: ImConversationType.PRIVATE,
targetId: websocketMessage.receiverId,
messageId: websocketMessage.id
}
])
.catch((e) => console.warn('[IM WS] 私聊已读同步失败', e))
},
/**
@ -607,7 +619,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
* 4. Message + at
* 5. lastMessageId
*/
handleGroupMessage(websocketMessage: ImGroupMessageDTO) {
handleGroupMessage(websocketMessage: ImGroupMessageDTO): Promise<void> {
const conversationStore = useConversationStore()
const groupStore = useGroupStore()
const currentUserId = getCurrentUserId()
@ -624,7 +636,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
!receiverUserIds.includes(currentUserId)
) {
console.warn('[IM WS] 丢弃不属于当前用户的定向群消息', websocketMessage)
return
return Promise.resolve()
}
// 1. 离线加载期缓冲(与私聊对称)
@ -633,7 +645,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
conversationType: ImConversationType.GROUP,
payload: websocketMessage
})
return
return Promise.resolve()
}
// 2. 未知群时自动拉群详情 + 成员(被拉入群但还没收到 GROUP_CREATE 时的兜底)
@ -645,17 +657,16 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
// 3. 后端撤回:下发一条 RECALL 消息content 为 `{"messageId": xxx}`
// 这里拦截下来改走 recallMessage把原消息更新为 RECALL 态)
if (websocketMessage.type === ImMessageType.RECALL) {
useMessageStore().recallMessage(
return useMessageStore().recallMessage(
ImConversationType.GROUP,
websocketMessage.groupId,
websocketMessage.content
)
return
}
// 4. 后端 DTO → 前端 Message发送人名渲染时实时算不写入消息字段
const message = convertGroupMessage(websocketMessage, currentUserId)
useMessageStore().insertMessage(
const persistPromise = useMessageStore().insertMessage(
{
type: ImConversationType.GROUP,
targetId: websocketMessage.groupId,
@ -691,6 +702,7 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
playAudioTip()
}
}
return persistPromise
},
// ==================== 群聊已读 / 回执 ====================
@ -700,8 +712,17 @@ export const useImWebSocketStore = defineStore('imWebSocketStore', {
if (!MESSAGE_GROUP_READ_ENABLED) {
return
}
const conversationStore = useConversationStore()
conversationStore.markConversationRead(ImConversationType.GROUP, websocketMessage.groupId)
const readMessageId = websocketMessage.readId || websocketMessage.id
void useMessageStore()
.applyConversationReadList([
{
id: readMessageId,
conversationType: ImConversationType.GROUP,
targetId: websocketMessage.groupId,
messageId: readMessageId
}
])
.catch((e) => console.warn('[IM WS] 群聊已读同步失败', e))
},
/** 群聊 RECEIPT更新某条群消息的 readCount / receiptStatus群已读关闭时兜底忽略 */

View File

@ -149,10 +149,11 @@ export interface Group {
groupRemark?: string // 群备注。从当前用户的 GroupMember 回填(当前用户对该群的自定义名)
members?: GroupMember[] // 群成员缓存(按需懒加载)
membersLoaded?: boolean // members 是否"完整加载"——只有整群 loadGroupMemberList / fetchGroupMemberList 命中时为 truefetchGroupMember 单成员补齐不置位,避免 fetchGroupMemberList(force=false) 命中缓存时误判整群已加载
membersExpired?: boolean // 群成员缓存是否已过期;重连 / 重新进入 IM 后只标记不删除,下次进入群会话再刷新
memberCount?: number // 成员总数
}
export type GroupDO = Omit<Group, 'members' | 'membersLoaded'>
export type GroupDO = Omit<Group, 'members' | 'membersLoaded' | 'membersExpired'>
// 群成员实体(前端内部结构)
export interface GroupMember {

View File

@ -35,7 +35,17 @@ export const StorageKeys = {
/** 频道消息拉取游标 */
channelMessageMaxId: 'channelMessageMaxId',
/** 最近转发会话 key 列表 */
recentForwardConversationKeys: 'recentForwardConversationKeys'
recentForwardConversationKeys: 'recentForwardConversationKeys',
// 状态事件补偿增量拉取游标:与上面消息 maxId 游标共用同一 settings keyspace统一登记在此避免撞 key
// 走 update_time + id 复合游标(非单条 maxId故用 PullCursor 后缀区分语义
/** 好友关系增量拉取游标 */
friendPullCursor: 'friendPullCursor',
/** 好友申请增量拉取游标 */
friendRequestPullCursor: 'friendRequestPullCursor',
/** 加群申请增量拉取游标 */
groupRequestPullCursor: 'groupRequestPullCursor',
/** 会话读位置增量拉取游标 */
conversationReadPullCursor: 'conversationReadPullCursor'
}
} as const
@ -208,9 +218,7 @@ class DbClient {
if (tx) {
return requestToPromise<T[]>(tx.objectStore(storeName).getAll())
}
return this.transaction<T[]>([storeName], 'readonly', (tx) =>
this.getAll<T>(storeName, tx)
)
return this.transaction<T[]>([storeName], 'readonly', (tx) => this.getAll<T>(storeName, tx))
}
/** 按唯一索引获取单条记录 */
@ -258,9 +266,7 @@ class DbClient {
await requestToPromise(tx.objectStore(storeName).delete(key))
return
}
await this.transaction([storeName], 'readwrite', (tx) =>
this.delete(storeName, key, tx)
)
await this.transaction([storeName], 'readwrite', (tx) => this.delete(storeName, key, tx))
}
/** 清空 store 记录 */

152
src/views/im/utils/pull.ts Normal file
View File

@ -0,0 +1,152 @@
import { getDb } from './db'
/**
* IM
*
* update_time + id
* store
* WebSocket IM线
*/
/** 增量拉取游标:上次拉到的位置 */
export interface PullCursor {
lastUpdateTime?: number
lastId?: number
}
/** 可作为游标的拉取记录:服务端按 update_time + id 返回,客户端取最后一条推进游标 */
interface PullRecord {
id: number
updateTime?: number
}
/** 单次拉取条数(与后端 limit 上限对齐) */
const PULL_PAGE_SIZE = 100
/** 单轮最多翻页数,兜底防御异常游标导致的死循环 */
const PULL_MAX_PAGES = 100
/** 状态事件拉取回扫窗口,覆盖同秒内旧行更新和客户端 / 服务端时钟精度差 */
const PULL_OVERLAP_MS = 5000
/** 消息类 minId 拉取的单轮翻页上限,兜底防御异常游标死翻;消息量可能远大于状态事件,放宽到 1000 */
const MIN_ID_PULL_MAX_PAGES = 1000
/** 读取某模块的拉取游标;无则返回空游标(首次拉全量) */
export async function getPullCursor(key: string): Promise<PullCursor> {
return (await getDb().getSetting<PullCursor>(key)) ?? {}
}
/**
*
*
* @param cursorKey settings key
* @param fetchPage pull VO
* @param apply store false
* Promise
*/
export async function runIncrementalPull<T extends PullRecord>(
cursorKey: string,
fetchPage: (params: { lastUpdateTime?: number; lastId?: number; limit: number }) => Promise<T[]>,
apply: (records: T[]) => boolean | Promise<boolean>,
isActive?: () => boolean
): Promise<void> {
const storedCursor = await getPullCursor(cursorKey)
const highWater = { ...storedCursor }
let cursor =
storedCursor.lastUpdateTime != null
? { lastUpdateTime: Math.max(0, storedCursor.lastUpdateTime - PULL_OVERLAP_MS), lastId: 0 }
: {}
for (let page = 0; page < PULL_MAX_PAGES; page++) {
if (isActive && !isActive()) {
return
}
const list = await fetchPage({
lastUpdateTime: cursor.lastUpdateTime,
lastId: cursor.lastId,
limit: PULL_PAGE_SIZE
})
if (isActive && !isActive()) {
return
}
if (list.length) {
// apply 未完全落地(返回 false时直接终止游标只能跟着已落地的数据走否则会跳过本页记录
if ((await apply(list)) === false) {
return
}
if (isActive && !isActive()) {
return
}
// 推进游标到本页最后一条并持久化:下次从这里接着拉
const last = list[list.length - 1]
if (last.updateTime == null) {
return
}
cursor = { lastUpdateTime: last.updateTime, lastId: last.id }
if (
highWater.lastUpdateTime == null ||
cursor.lastUpdateTime > highWater.lastUpdateTime ||
(cursor.lastUpdateTime === highWater.lastUpdateTime &&
cursor.lastId > (highWater.lastId ?? 0))
) {
highWater.lastUpdateTime = cursor.lastUpdateTime
highWater.lastId = cursor.lastId
await getDb().setSetting(cursorKey, highWater)
}
}
// 不满一页 = 没有更多变更
if (list.length < PULL_PAGE_SIZE) {
return
}
}
console.warn(`[IM pull] ${cursorKey} 达到单轮翻页上限,提前结束本轮补偿`)
}
/**
* minId
*
* runIncrementalPull id update_time + id messageMaxId
* applyPage messageStore settings
*
* @param initialMinId id
* @param pageSize
* @param fetchPage minId
* @param applyPage / / + messageMaxId false
* @param isActive await / false
* @param maxPages
*/
export async function runMinIdPull<T extends { id?: number }>(options: {
initialMinId: number
pageSize: number
fetchPage: (params: { minId: number; size: number }) => Promise<T[]>
applyPage: (records: T[], nextMinId?: number) => Promise<boolean | void>
isActive?: () => boolean
maxPages?: number
}): Promise<void> {
const { initialMinId, pageSize, fetchPage, applyPage, isActive } = options
const maxPages = options.maxPages ?? MIN_ID_PULL_MAX_PAGES
let minId = initialMinId || 0
for (let page = 0; page < maxPages; page++) {
if (isActive && !isActive()) {
return
}
const list = await fetchPage({ minId, size: pageSize })
// 拉取期间取消 / 切账号:丢弃本批不入库,也不再翻页
if (isActive && !isActive()) {
return
}
if (!list || list.length === 0) {
return
}
// 本批最大消息 id 作为下次游标;无有效 id 则本批 apply 后停(无法继续翻页)
const validIds = list.map((record) => record.id).filter((id): id is number => id != null)
const nextMinId = validIds.length > 0 ? Math.max(...validIds) : undefined
// applyPage 返回 false本页未落地如入库失败不推进游标并终止避免漏消息
if ((await applyPage(list, nextMinId)) === false) {
return
}
// 无有效 id或游标没前进后端契约是 id > minId理论不会出现防御死翻
if (nextMinId == null || nextMinId <= minId) {
return
}
minId = nextMinId
}
console.warn('[IM pull] runMinIdPull 达到单轮翻页上限,提前结束本轮')
}