admin-vue3/src/views/im/home/composables/useMessagePuller.ts

461 lines
19 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import { watch } from 'vue'
import { useConversationStore } from '../store/conversationStore'
import { useMessageStore, type PulledMessage } from '../store/messageStore'
import { useImWebSocketStore } from '../store/websocketStore'
import { useFriendStore } from '../store/friendStore'
import { getFriendDisplayName } from '../../utils/user'
import { useGroupStore } from '../store/groupStore'
import { useGroupRequestStore } from '../store/groupRequestStore'
import {
pullPrivateMessages as apiPullPrivateMessages,
getPrivateMaxReadMessageId as apiGetPrivateMaxReadMessageId,
type ImPrivateMessageRespVO
} from '@/api/im/message/private'
import {
pullGroupMessages as apiPullGroupMessages,
type ImGroupMessageRespVO
} from '@/api/im/message/group'
import {
pullChannelMessages as apiPullChannelMessages,
type ImChannelMessageRespVO
} from '@/api/im/message/channel'
import {
ImConversationType,
ImMessageStatus,
ImContentType,
isFriendChatTip,
isFriendNotification
} from '../../utils/constants'
import {
MESSAGE_PRIVATE_PULL_SIZE,
MESSAGE_GROUP_PULL_SIZE,
MESSAGE_PRIVATE_READ_ENABLED
} from '../../utils/config'
import { buildChannelConversationStub } from '../../utils/channel'
import { generateClientMessageId, getPrivateMessagePeerId } from '../../utils/message'
import { runMinIdPull } from '../../utils/pull'
import { getCurrentUserId } from '@/utils/auth'
import type { Message } from '../types'
/** 三类消息 pull 接口返回的原始 VO 联合类型runMinIdPull 只需 id 推进游标,具体分发在 applyPage 内按类型 cast */
type PulledRawMessage = ImPrivateMessageRespVO | ImGroupMessageRespVO | ImChannelMessageRespVO
/**
* 消息增量拉取:登录后分页拉取离线期间的新消息
*
* 设计要点:
* 1. 同时拉取私聊 + 群聊,使用各自的 `minId` 游标privateMessageMaxId / groupMessageMaxId
* 2. 后端一次最多返回 size 条;前端按 minId 持续翻页,直到接口返回空列表为止
* 3. 拉取期间 conversationStore.loading=true
* - conversationStore 跳过批量持久化,避免频繁写入卡顿
* - websocketStore 把新来的 WS 普通消息丢进缓冲区,等循环结束后统一回放
* 4. WebSocket 重连后会再触发一次拉取,补齐断网期间错过的消息
*/
export const useMessagePuller = () => {
const conversationStore = useConversationStore()
const messageStore = useMessageStore()
const wsStore = useImWebSocketStore()
const friendStore = useFriendStore()
const groupStore = useGroupStore()
const groupRequestStore = useGroupRequestStore()
const currentUserId = getCurrentUserId()
/** 判断请求是否被主动取消 */
const isAbortError = (e: unknown): boolean => {
const error = e as { name?: string; code?: string; message?: string }
return (
error?.name === 'CanceledError' ||
error?.code === 'ERR_CANCELED' ||
error?.message === 'canceled'
)
}
/** 私聊会话归属:自己发的算"发给 receiverId 的会话",否则算"发送方的会话"curry currentUserId 进闭包减少 3 处调用方的样板 */
const getPrivatePeerId = (message: ImPrivateMessageRespVO) =>
getPrivateMessagePeerId(message, currentUserId)
/** 服务端私聊消息 -> 本地 MessagetargetId 是会话主键(对端 userId */
const convertPrivateMessage = (message: ImPrivateMessageRespVO): Message => {
return {
id: message.id,
clientMessageId: message.clientMessageId || generateClientMessageId(),
type: message.type,
content: message.content,
status: message.status,
receiptStatus: message.receiptStatus,
sendTime: new Date(message.sendTime).getTime(),
senderId: message.senderId,
targetId: getPrivatePeerId(message),
selfSend: message.senderId === currentUserId
}
}
/** 服务端群聊消息 -> 本地 Message */
const convertGroupMessage = (message: ImGroupMessageRespVO): Message => {
return {
id: message.id,
clientMessageId: message.clientMessageId || generateClientMessageId(),
type: message.type,
content: message.content,
status: message.status,
sendTime: new Date(message.sendTime).getTime(),
senderId: message.senderId,
targetId: message.groupId,
selfSend: message.senderId === currentUserId,
atUserIds: message.atUserIds || [],
receiverUserIds: message.receiverUserIds || [],
receiptStatus: message.receiptStatus,
readCount: message.readCount
}
}
/** 服务端频道消息 -> 本地 Message */
const convertChannelMessage = (message: ImChannelMessageRespVO): Message => {
return {
id: message.id,
clientMessageId: message.clientMessageId || generateClientMessageId(),
type: message.type,
content: message.content,
status: ImMessageStatus.NORMAL, // 频道无撤回,恒为正常
receiptStatus: message.receiptStatus, // 频道已读态DONE 已读 / PENDING 未读
sendTime: new Date(message.sendTime).getTime(),
senderId: 0, // 系统下发,无发送人
targetId: message.channelId, // 会话归属到频道编号
selfSend: false,
materialId: message.materialId // 详情页拉富文本用
}
}
/** 频道:会话归属到 channelIdname / avatar 暂用占位,将来接入 channelStore 后再填真值 */
const convertChannelConversation = (message: ImChannelMessageRespVO) =>
buildChannelConversationStub(message.channelId)
/** 私聊:会话归属到对端 userId */
const convertPrivateConversation = (message: ImPrivateMessageRespVO) => {
const targetId = getPrivatePeerId(message)
const friend = friendStore.getFriend(targetId)
return {
type: ImConversationType.PRIVATE,
targetId,
name: friend ? getFriendDisplayName(friend) : String(targetId), // 会话列表 / 顶部标题展示:好友备注 > 真实昵称
avatar: friend?.avatar || '',
silent: friend?.silent
}
}
/** 群聊:会话归属到 groupId */
const convertGroupConversation = (message: ImGroupMessageRespVO) => {
const group = groupStore.getGroup(message.groupId)
return {
type: ImConversationType.GROUP,
targetId: message.groupId,
name: group?.name || String(message.groupId),
avatar: group?.avatar || '',
silent: group?.silent
}
}
/**
* 分类型拉取离线消息:翻页 / minId 游标推进 / 空页停由 runMinIdPull 负责,这里只做接口分支 + 逐条业务分发
* (撤回 / 好友通知 / 普通消息)+ 入库。
*
* 取消语义两层守卫,经 isActive 传入 runMinIdPull任一不等即丢弃本批不入库、停止翻页避免旧 session 响应落到新 store
* 1. startEpochcancelPull() 递增 pullEpoch离开 IM / 切账号时跳出
* 2. startUserId每批 await 后比对当前登录 userId防御 logout / 多 tab 下用户已切但 cancelPull 未触发
*/
const pullByType = async (
conversationType: number,
startMinId: number,
startEpoch: number,
startUserId: number,
signal: AbortSignal
) => {
// 私聊 / 群聊 / 频道各自一套接口;按 conversationType 分支调度。翻页机制minId 游标 / 空页判断 / 防死翻)交给 runMinIdPull
const isPrivate = conversationType === ImConversationType.PRIVATE
const isChannel = conversationType === ImConversationType.CHANNEL
const size = isPrivate ? MESSAGE_PRIVATE_PULL_SIZE : MESSAGE_GROUP_PULL_SIZE
const isStillValid = () =>
!signal.aborted && pullEpoch === startEpoch && getCurrentUserId() === startUserId
await runMinIdPull<PulledRawMessage>({
initialMinId: startMinId,
pageSize: size,
isActive: isStillValid,
fetchPage: ({ minId, size }) => {
if (isPrivate) {
return apiPullPrivateMessages({ minId, size }, signal)
}
if (isChannel) {
return apiPullChannelMessages({ minId, size }, signal)
}
return apiPullGroupMessages({ minId, size }, signal)
},
applyPage: async (list, nextMinId) => {
const pulledMessages: PulledMessage[] = []
// 逐条 dispatch原消息走批量 insertRECALL 信号走批量 recall 把同批内已 insert 的原消息更新为撤回提示。
// 后端按 id 升序返回,且信号 id 一定 > 原消息 id先更新 status 再插信号所以原消息一定先到、recallMessage 找得到
for (const raw of list) {
if (isChannel) {
const message = raw as ImChannelMessageRespVO
pulledMessages.push({
kind: 'insert',
conversationInfo: convertChannelConversation(message),
message: convertChannelMessage(message)
})
continue
}
if (isPrivate) {
const message = raw as ImPrivateMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImContentType.RECALL) {
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.PRIVATE,
targetId: getPrivatePeerId(message),
recallSignalContent: message.content
})
continue
}
// 特殊:历史好友事件只还原聊天气泡;好友主数据由好友增量补偿同步
if (isFriendNotification(message.type)) {
// 仅 FRIEND_ADD / FRIEND_DELETE 才作为会话气泡入消息列表
if (!isFriendChatTip(message.type)) {
continue
}
}
// 其它消息正常入会话消息列表
pulledMessages.push({
kind: 'insert',
conversationInfo: convertPrivateConversation(message),
message: convertPrivateMessage(message)
})
} else {
const message = raw as ImGroupMessageRespVO
// 特殊:撤回消息的处理
if (message.type === ImContentType.RECALL) {
pulledMessages.push({
kind: 'recall',
conversationType: ImConversationType.GROUP,
targetId: message.groupId,
recallSignalContent: message.content
})
continue
}
pulledMessages.push({
kind: 'insert',
conversationInfo: convertGroupConversation(message),
message: convertGroupMessage(message)
})
}
}
// 入库 + 推进 messageMaxIdnextMinId 为空(本批无有效 id时不推进游标与旧逻辑一致
await messageStore.applyPulledMessageList(pulledMessages, conversationType, nextMinId)
}
})
}
/** 同一时刻只允许一次 pullIndex.vue 的手动调用与重连 watch 触发可能并发,共用同一个 promise 即可去重 */
let pullPromise: Promise<void> | null = null
let pullAbortController: AbortController | null = null
/**
* 首次 pull 是否已完成。仅在置 true 后isConnected watch 才会触发 pull。
* 防止 socket onopen 比 friendStore/groupStore 预拉先到达时watcher 抢跑造成消息插入早于会话元数据可见
*/
let initialPulled = false
/**
* pull 轮次计数;切账号 / 离开 IM 时 cancelPull() 递增,旧 pullByType 循环按 epoch 自检后跳出
* 避免旧 session 的接口响应在新 session 落地,造成跨账号消息泄漏
*
* 注意普通断连WS 短断)不取消 pull——网络抖动 / 服务端重启都属于本账号正常生命周期,
* 取消会导致首拉被中断后 initialPulled 永远停在 false后续重连 watcher 不再补拉
*/
let pullEpoch = 0
/** 显式取消:仅由 Index.vue onUnmounted离开 IM / 切账号 / 路由跳出)调用 */
const cancelPull = () => {
pullEpoch++
pullAbortController?.abort()
pullAbortController = null
// 旧 promise 仍在 finally 阶段跑,但 epoch 守卫已阻断后续副作用;这里立刻让 pullPromise = null 让新一轮可重入
pullPromise = null
// 同步丢弃 WS 缓冲帧;旧 pull 已不会 flushBuffer若不清下次进 IM 第一次 pullOnce 会把旧 session 的帧回放进新 store
wsStore.discardBuffer()
}
/**
* 状态事件补偿:好友 / 好友申请走增量;群列表和群申请红点走快照刷新
*
* 首登主数据由 index.vue 驱动,重连时各 store 已就位,多路 allSettled 并发互不影响,单路失败仅记日志。
* 群成员不做全局增量同步,重连只标记本地群成员 cache 过期,进入群会话或成员列表时再按 groupId 刷新。
*/
const pullStateEvents = async (): Promise<void> => {
groupStore.markAllGroupMembersExpired()
const results = await Promise.allSettled([
friendStore.pullFriends(),
friendStore.pullFriendRequests(),
conversationStore.pullConversationReads(),
groupStore.fetchGroupList(true),
groupRequestStore.pullGroupRequests(),
groupRequestStore.fetchUnhandledGroupRequestList()
])
for (const result of results) {
if (result.status === 'rejected') {
console.warn('[IM] 状态事件增量补偿失败', result.reason)
}
}
}
/** 执行一次全量增量拉取(重入安全:进行中再次调用复用同一个 promise */
const pullOnce = (): Promise<void> => {
if (!currentUserId) {
return Promise.resolve()
}
if (pullPromise) {
return pullPromise
}
const startEpoch = pullEpoch
// 启动时的用户快照pullByType 每批 await 后比对当前登录用户,账号变了立刻丢弃
const startUserId = currentUserId
const abortController = new AbortController()
pullAbortController = abortController
// 本轮 pull 仍属于当前 sessionepoch 未漂 + 用户未切;任何动新 store 状态的副作用都要先过这道关
const isCurrentPull = () =>
!abortController.signal.aborted &&
pullEpoch === startEpoch &&
getCurrentUserId() === startUserId
pullPromise = (async () => {
try {
// 旧 puller 在 cancelPull 未触发的异常路径上再进来时,先于任何副作用退出,避免污染新 session 的 loading
if (!isCurrentPull()) {
return
}
conversationStore.loading = true
let messagePullSucceeded = false
try {
// 并发拉取私聊 + 群聊 + 频道消息,降低初始加载耗时
await Promise.all([
pullByType(
ImConversationType.PRIVATE,
messageStore.privateMessageMaxId,
startEpoch,
startUserId,
abortController.signal
),
pullByType(
ImConversationType.GROUP,
messageStore.groupMessageMaxId,
startEpoch,
startUserId,
abortController.signal
),
pullByType(
ImConversationType.CHANNEL,
messageStore.channelMessageMaxId,
startEpoch,
startUserId,
abortController.signal
)
])
messagePullSucceeded = true
} catch (e) {
if (isAbortError(e)) {
return
}
console.error('[IM] 拉取离线消息失败:', e)
} finally {
// 仍属本轮才复位 loading旧轮被 cancel / 切账号时由新一轮自管,避免覆盖新 session 的 true
if (isCurrentPull()) {
conversationStore.loading = false
}
}
// 取消 / 切账号后跳过 flushBuffer / 排序 / 已读位置补齐
if (!isCurrentPull()) {
return
}
if (!messagePullSucceeded) {
return
}
// 回放 WebSocket 在 loading 期间收到的缓冲消息
const buffered = wsStore.flushBuffer()
const replayPersistPromises: Promise<void>[] = []
for (const item of buffered) {
if (item.conversationType === ImConversationType.PRIVATE) {
replayPersistPromises.push(wsStore.handlePrivateMessage(item.payload))
} else if (item.conversationType === ImConversationType.CHANNEL) {
replayPersistPromises.push(wsStore.handleChannelMessage(item.payload))
} else {
replayPersistPromises.push(wsStore.handleGroupMessage(item.payload))
}
}
await Promise.all(replayPersistPromises)
// pull + replay 都完成后再排序,避免回放消息打乱顺序
conversationStore.sortConversationList()
// 重连 / 冷启动后补齐当前激活私聊会话的「对方已读位置」
// 离线期间错过的 RECEIPT 推送会被这里补回;其他私聊会话等用户点开时由 Index.vue 的 watch 触发
// 私聊已读关闭时跳过,避免打到已禁用接口触发错误日志
const active = conversationStore.activeConversation
if (MESSAGE_PRIVATE_READ_ENABLED && active && active.type === ImConversationType.PRIVATE) {
try {
const maxReadId = await apiGetPrivateMaxReadMessageId(
active.targetId,
abortController.signal
)
if (!isCurrentPull()) {
return
}
if (maxReadId) {
messageStore.applyMessageReadReceipt({
conversationType: ImConversationType.PRIVATE,
targetId: active.targetId,
privateReadMaxId: maxReadId
})
}
} catch (e) {
if (isAbortError(e)) {
return
}
console.warn('[IM] 拉取对方已读位置失败', e)
}
}
} finally {
// 仍属本轮正常完成首拉epoch 等但 userId 切了:清 pullPromise 防卡死、不标首拉epoch 漂cancelPull 已清no-op
if (isCurrentPull()) {
pullPromise = null
initialPulled = true
if (pullAbortController === abortController) {
pullAbortController = null
}
} else if (pullEpoch === startEpoch) {
pullPromise = null
if (pullAbortController === abortController) {
pullAbortController = null
}
}
}
})()
return pullPromise
}
/**
* 断网期间 WS 收不到推送:重连后既要按 minId 补齐消息,也要按 update_time + id 补齐好友 / 群 / 群申请状态。
* 首次连接由 Index.vue 显式驱动pullOnce 拉消息 + 各 store 首拉),这里仅覆盖之后的重连。
* 重连时 store 已就位pullStateEvents 与 pullOnce 并发即可,无需「先就位再拉消息」的首登顺序约束。
*/
watch(
() => wsStore.isConnected,
(isConnected) => {
if (isConnected && initialPulled) {
void pullOnce()
void pullStateEvents()
}
}
)
return { pullOnce, cancelPull, convertPrivateMessage, convertGroupMessage }
}