websocket:重新封装 websocket 组件,支持 token 认证,并增加 WebSocketMessageListener 方便处理消息
							parent
							
								
									522ab17902
								
							
						
					
					
						commit
						2d9aa7a94a
					
				|  | @ -203,6 +203,12 @@ | ||||||
|                 <version>${revision}</version> |                 <version>${revision}</version> | ||||||
|             </dependency> |             </dependency> | ||||||
| 
 | 
 | ||||||
|  |             <dependency> | ||||||
|  |                 <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |                 <artifactId>yudao-spring-boot-starter-websocket</artifactId> | ||||||
|  |                 <version>${revision}</version> | ||||||
|  |             </dependency> | ||||||
|  | 
 | ||||||
|             <dependency> |             <dependency> | ||||||
|                 <groupId>org.springdoc</groupId> <!-- 接口文档 UI:默认 --> |                 <groupId>org.springdoc</groupId> <!-- 接口文档 UI:默认 --> | ||||||
|                 <artifactId>springdoc-openapi-ui</artifactId> |                 <artifactId>springdoc-openapi-ui</artifactId> | ||||||
|  |  | ||||||
|  | @ -43,6 +43,7 @@ | ||||||
| 
 | 
 | ||||||
|         <module>yudao-spring-boot-starter-flowable</module> |         <module>yudao-spring-boot-starter-flowable</module> | ||||||
|         <module>yudao-spring-boot-starter-captcha</module> |         <module>yudao-spring-boot-starter-captcha</module> | ||||||
|  |         <module>yudao-spring-boot-starter-websocket</module> | ||||||
|         <module>yudao-spring-boot-starter-desensitize</module> |         <module>yudao-spring-boot-starter-desensitize</module> | ||||||
|     </modules> |     </modules> | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -280,6 +280,15 @@ public class CollectionUtils { | ||||||
|         return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toList()); |         return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toList()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     public static <T, U, R> List<R> convertListByFlatMap(Collection<T> from, | ||||||
|  |                                                          Function<? super T, ? extends U> mapper, | ||||||
|  |                                                          Function<U, ? extends Stream<? extends R>> func) { | ||||||
|  |         if (CollUtil.isEmpty(from)) { | ||||||
|  |             return new ArrayList<>(); | ||||||
|  |         } | ||||||
|  |         return from.stream().map(mapper).flatMap(func).filter(Objects::nonNull).collect(Collectors.toList()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     public static <T, U> Set<U> convertSetByFlatMap(Collection<T> from, |     public static <T, U> Set<U> convertSetByFlatMap(Collection<T> from, | ||||||
|                                                     Function<T, ? extends Stream<? extends U>> func) { |                                                     Function<T, ? extends Stream<? extends U>> func) { | ||||||
|         if (CollUtil.isEmpty(from)) { |         if (CollUtil.isEmpty(from)) { | ||||||
|  | @ -288,4 +297,13 @@ public class CollectionUtils { | ||||||
|         return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toSet()); |         return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toSet()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     public static <T, U, R> Set<R> convertSetByFlatMap(Collection<T> from, | ||||||
|  |                                                        Function<? super T, ? extends U> mapper, | ||||||
|  |                                                        Function<U, ? extends Stream<? extends R>> func) { | ||||||
|  |         if (CollUtil.isEmpty(from)) { | ||||||
|  |             return new HashSet<>(); | ||||||
|  |         } | ||||||
|  |         return from.stream().map(mapper).flatMap(func).filter(Objects::nonNull).collect(Collectors.toSet()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -5,7 +5,6 @@ import cn.hutool.core.util.StrUtil; | ||||||
| import cn.hutool.system.SystemUtil; | import cn.hutool.system.SystemUtil; | ||||||
| import cn.iocoder.yudao.framework.common.enums.DocumentEnum; | import cn.iocoder.yudao.framework.common.enums.DocumentEnum; | ||||||
| import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; | import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; | ||||||
| import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; |  | ||||||
| import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; | import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; | ||||||
| import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; | import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; | ||||||
| import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; | import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; | ||||||
|  | @ -23,7 +22,6 @@ import org.springframework.data.redis.connection.stream.ReadOffset; | ||||||
| import org.springframework.data.redis.connection.stream.StreamOffset; | import org.springframework.data.redis.connection.stream.StreamOffset; | ||||||
| import org.springframework.data.redis.core.RedisCallback; | import org.springframework.data.redis.core.RedisCallback; | ||||||
| import org.springframework.data.redis.core.RedisTemplate; | import org.springframework.data.redis.core.RedisTemplate; | ||||||
| import org.springframework.data.redis.core.StringRedisTemplate; |  | ||||||
| import org.springframework.data.redis.listener.ChannelTopic; | import org.springframework.data.redis.listener.ChannelTopic; | ||||||
| import org.springframework.data.redis.listener.RedisMessageListenerContainer; | import org.springframework.data.redis.listener.RedisMessageListenerContainer; | ||||||
| import org.springframework.data.redis.stream.StreamMessageListenerContainer; | import org.springframework.data.redis.stream.StreamMessageListenerContainer; | ||||||
|  | @ -33,30 +31,19 @@ import java.util.List; | ||||||
| import java.util.Properties; | import java.util.Properties; | ||||||
| 
 | 
 | ||||||
| /** | /** | ||||||
|  * 消息队列配置类 |  * Redis 消息队列 Consumer 配置类 | ||||||
|  * |  * | ||||||
|  * @author 芋道源码 |  * @author 芋道源码 | ||||||
|  */ |  */ | ||||||
| @Slf4j | @Slf4j | ||||||
| @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 | @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
 | ||||||
| @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) | @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) | ||||||
| public class YudaoRedisMQAutoConfiguration { | public class YudaoRedisMQConsumerAutoConfiguration { | ||||||
| 
 |  | ||||||
|     @Bean |  | ||||||
|     public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, |  | ||||||
|                                            List<RedisMessageInterceptor> interceptors) { |  | ||||||
|         RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); |  | ||||||
|         // 添加拦截器
 |  | ||||||
|         interceptors.forEach(redisMQTemplate::addInterceptor); |  | ||||||
|         return redisMQTemplate; |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     // ========== 消费者相关 ==========
 |  | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|      * 创建 Redis Pub/Sub 广播消费的容器 |      * 创建 Redis Pub/Sub 广播消费的容器 | ||||||
|      */ |      */ | ||||||
|     @Bean(initMethod = "start", destroyMethod = "stop") |     @Bean | ||||||
|     @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
 |     @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
 | ||||||
|     public RedisMessageListenerContainer redisMessageListenerContainer( |     public RedisMessageListenerContainer redisMessageListenerContainer( | ||||||
|             RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) { |             RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) { | ||||||
|  | @ -0,0 +1,31 @@ | ||||||
|  | package cn.iocoder.yudao.framework.mq.redis.config; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; | ||||||
|  | import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.springframework.boot.autoconfigure.AutoConfiguration; | ||||||
|  | import org.springframework.context.annotation.Bean; | ||||||
|  | import org.springframework.data.redis.core.StringRedisTemplate; | ||||||
|  | 
 | ||||||
|  | import java.util.List; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * Redis 消息队列 Producer 配置类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) | ||||||
|  | public class YudaoRedisMQProducerAutoConfiguration { | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, | ||||||
|  |                                            List<RedisMessageInterceptor> interceptors) { | ||||||
|  |         RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate); | ||||||
|  |         // 添加拦截器
 | ||||||
|  |         interceptors.forEach(redisMQTemplate::addInterceptor); | ||||||
|  |         return redisMQTemplate; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -1,2 +1,3 @@ | ||||||
| cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration | cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQProducerAutoConfiguration | ||||||
|  | cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration | ||||||
| cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration | cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration | ||||||
|  |  | ||||||
|  | @ -12,6 +12,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage; | ||||||
| import com.baomidou.mybatisplus.core.toolkit.support.SFunction; | import com.baomidou.mybatisplus.core.toolkit.support.SFunction; | ||||||
| import com.baomidou.mybatisplus.extension.toolkit.Db; | import com.baomidou.mybatisplus.extension.toolkit.Db; | ||||||
| import com.github.yulichang.base.MPJBaseMapper; | import com.github.yulichang.base.MPJBaseMapper; | ||||||
|  | import com.github.yulichang.interfaces.MPJBaseJoin; | ||||||
| import org.apache.ibatis.annotations.Param; | import org.apache.ibatis.annotations.Param; | ||||||
| 
 | 
 | ||||||
| import java.util.Collection; | import java.util.Collection; | ||||||
|  | @ -39,6 +40,13 @@ public interface BaseMapperX<T> extends MPJBaseMapper<T> { | ||||||
|         return new PageResult<>(mpPage.getRecords(), mpPage.getTotal()); |         return new PageResult<>(mpPage.getRecords(), mpPage.getTotal()); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     default <DTO> PageResult<DTO> selectJoinPage(PageParam pageParam, Class<DTO> resultTypeClass, MPJBaseJoin<T> joinQueryWrapper) { | ||||||
|  |         IPage<DTO> mpPage = MyBatisUtils.buildPage(pageParam); | ||||||
|  |         selectJoinPage(mpPage, resultTypeClass, joinQueryWrapper); | ||||||
|  |         // 转换返回
 | ||||||
|  |         return new PageResult<>(mpPage.getRecords(), mpPage.getTotal()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     default T selectOne(String field, Object value) { |     default T selectOne(String field, Object value) { | ||||||
|         return selectOne(new QueryWrapper<T>().eq(field, value)); |         return selectOne(new QueryWrapper<T>().eq(field, value)); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -19,6 +19,13 @@ public class SecurityProperties { | ||||||
|      */ |      */ | ||||||
|     @NotEmpty(message = "Token Header 不能为空") |     @NotEmpty(message = "Token Header 不能为空") | ||||||
|     private String tokenHeader = "Authorization"; |     private String tokenHeader = "Authorization"; | ||||||
|  |     /** | ||||||
|  |      * HTTP 请求时,访问令牌的请求参数 | ||||||
|  |      * | ||||||
|  |      * 初始目的:解决 WebSocket 无法通过 header 传参,只能通过 token 参数拼接 | ||||||
|  |      */ | ||||||
|  |     @NotEmpty(message = "Token Parameter 不能为空") | ||||||
|  |     private String tokenParameter = "token"; | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|      * mock 模式的开关 |      * mock 模式的开关 | ||||||
|  | @ -41,5 +48,4 @@ public class SecurityProperties { | ||||||
|      * PasswordEncoder 加密复杂度,越高开销越大 |      * PasswordEncoder 加密复杂度,越高开销越大 | ||||||
|      */ |      */ | ||||||
|     private Integer passwordEncoderLength = 4; |     private Integer passwordEncoderLength = 4; | ||||||
| 
 |  | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -48,7 +48,8 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { | ||||||
|         // 情况二,基于 Token 获得用户
 |         // 情况二,基于 Token 获得用户
 | ||||||
|         // 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。
 |         // 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。
 | ||||||
|         if (loginUser == null) { |         if (loginUser == null) { | ||||||
|             String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader()); |             String token = SecurityFrameworkUtils.obtainAuthorization(request, | ||||||
|  |                     securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); | ||||||
|             if (StrUtil.isNotEmpty(token)) { |             if (StrUtil.isNotEmpty(token)) { | ||||||
|                 Integer userType = WebFrameworkUtils.getLoginUserType(request); |                 Integer userType = WebFrameworkUtils.getLoginUserType(request); | ||||||
|                 try { |                 try { | ||||||
|  | @ -82,7 +83,10 @@ public class TokenAuthenticationFilter extends OncePerRequestFilter { | ||||||
|                 return null; |                 return null; | ||||||
|             } |             } | ||||||
|             // 用户类型不匹配,无权限
 |             // 用户类型不匹配,无权限
 | ||||||
|             if (ObjectUtil.notEqual(accessToken.getUserType(), userType)) { |             // 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型
 | ||||||
|  |             // 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的
 | ||||||
|  |             if (userType != null | ||||||
|  |                     && ObjectUtil.notEqual(accessToken.getUserType(), userType)) { | ||||||
|                 throw new AccessDeniedException("错误的用户类型"); |                 throw new AccessDeniedException("错误的用户类型"); | ||||||
|             } |             } | ||||||
|             // 构建登录用户
 |             // 构建登录用户
 | ||||||
|  |  | ||||||
|  | @ -1,5 +1,6 @@ | ||||||
| package cn.iocoder.yudao.framework.security.core.util; | package cn.iocoder.yudao.framework.security.core.util; | ||||||
| 
 | 
 | ||||||
|  | import cn.hutool.core.util.StrUtil; | ||||||
| import cn.iocoder.yudao.framework.security.core.LoginUser; | import cn.iocoder.yudao.framework.security.core.LoginUser; | ||||||
| import cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils; | import cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils; | ||||||
| import org.springframework.lang.Nullable; | import org.springframework.lang.Nullable; | ||||||
|  | @ -20,6 +21,9 @@ import java.util.Collections; | ||||||
|  */ |  */ | ||||||
| public class SecurityFrameworkUtils { | public class SecurityFrameworkUtils { | ||||||
| 
 | 
 | ||||||
|  |     /** | ||||||
|  |      * HEADER 认证头 value 的前缀 | ||||||
|  |      */ | ||||||
|     public static final String AUTHORIZATION_BEARER = "Bearer"; |     public static final String AUTHORIZATION_BEARER = "Bearer"; | ||||||
| 
 | 
 | ||||||
|     public static final String LOGIN_USER_HEADER = "login-user"; |     public static final String LOGIN_USER_HEADER = "login-user"; | ||||||
|  | @ -30,19 +34,23 @@ public class SecurityFrameworkUtils { | ||||||
|      * 从请求中,获得认证 Token |      * 从请求中,获得认证 Token | ||||||
|      * |      * | ||||||
|      * @param request 请求 |      * @param request 请求 | ||||||
|      * @param header 认证 Token 对应的 Header 名字 |      * @param headerName 认证 Token 对应的 Header 名字 | ||||||
|  |      * @param parameterName 认证 Token 对应的 Parameter 名字 | ||||||
|      * @return 认证 Token |      * @return 认证 Token | ||||||
|      */ |      */ | ||||||
|     public static String obtainAuthorization(HttpServletRequest request, String header) { |     public static String obtainAuthorization(HttpServletRequest request, | ||||||
|         String authorization = request.getHeader(header); |                                              String headerName, String parameterName) { | ||||||
|         if (!StringUtils.hasText(authorization)) { |         // 1. 获得 Token。优先级:Header > Parameter
 | ||||||
|  |         String token = request.getHeader(headerName); | ||||||
|  |         if (StrUtil.isEmpty(token)) { | ||||||
|  |             token = request.getParameter(parameterName); | ||||||
|  |         } | ||||||
|  |         if (!StringUtils.hasText(token)) { | ||||||
|             return null; |             return null; | ||||||
|         } |         } | ||||||
|         int index = authorization.indexOf(AUTHORIZATION_BEARER + " "); |         // 2. 去除 Token 中带的 Bearer
 | ||||||
|         if (index == -1) { // 未找到
 |         int index = token.indexOf(AUTHORIZATION_BEARER + " "); | ||||||
|             return null; |         return index >= 0 ? token.substring(index + 7).trim() : token; | ||||||
|         } |  | ||||||
|         return authorization.substring(index + 7).trim(); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /** |     /** | ||||||
|  |  | ||||||
|  | @ -0,0 +1,84 @@ | ||||||
|  | <?xml version="1.0" encoding="UTF-8"?> | ||||||
|  | <project xmlns="http://maven.apache.org/POM/4.0.0" | ||||||
|  |          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||||||
|  |          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||||||
|  |     <parent> | ||||||
|  |         <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |         <artifactId>yudao-framework</artifactId> | ||||||
|  |         <version>${revision}</version> | ||||||
|  |     </parent> | ||||||
|  |     <modelVersion>4.0.0</modelVersion> | ||||||
|  |     <artifactId>yudao-spring-boot-starter-websocket</artifactId> | ||||||
|  |     <packaging>jar</packaging> | ||||||
|  | 
 | ||||||
|  |     <name>${project.artifactId}</name> | ||||||
|  |     <description>WebSocket 框架,支持多节点的广播</description> | ||||||
|  |     <url>https://github.com/YunaiV/ruoyi-vue-pro</url> | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  |     <dependencies> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-common</artifactId> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |         <!-- Web 相关 --> | ||||||
|  |         <dependency> | ||||||
|  |             <!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢? | ||||||
|  |                  因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。 | ||||||
|  |                  如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。 | ||||||
|  |             --> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-security</artifactId> | ||||||
|  |             <scope>provided</scope> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.boot</groupId> | ||||||
|  |             <artifactId>spring-boot-starter-websocket</artifactId> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |         <!-- Web 相关 --> | ||||||
|  |         <dependency> | ||||||
|  |             <!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢? | ||||||
|  |                  因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。 | ||||||
|  |                  如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。 | ||||||
|  |             --> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-security</artifactId> | ||||||
|  |             <scope>provided</scope> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |         <!-- 消息队列相关 --> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-mq</artifactId> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.kafka</groupId> | ||||||
|  |             <artifactId>spring-kafka</artifactId> | ||||||
|  |             <optional>true</optional> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.springframework.amqp</groupId> | ||||||
|  |             <artifactId>spring-rabbit</artifactId> | ||||||
|  |             <optional>true</optional> | ||||||
|  |         </dependency> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>org.apache.rocketmq</groupId> | ||||||
|  |             <artifactId>rocketmq-spring-boot-starter</artifactId> | ||||||
|  |             <optional>true</optional> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|  |         <!-- 业务组件 --> | ||||||
|  |         <dependency> | ||||||
|  |             <!-- 为什么要依赖 tenant 组件? | ||||||
|  |                 因为广播某个类型的用户时候,需要根据租户过滤下,避免广播到别的租户! | ||||||
|  |             --> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-biz-tenant</artifactId> | ||||||
|  |             <scope>provided</scope> | ||||||
|  |         </dependency> | ||||||
|  |     </dependencies> | ||||||
|  | 
 | ||||||
|  | </project> | ||||||
|  | @ -0,0 +1,34 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.config; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | import org.springframework.boot.context.properties.ConfigurationProperties; | ||||||
|  | import org.springframework.validation.annotation.Validated; | ||||||
|  | 
 | ||||||
|  | import javax.validation.constraints.NotEmpty; | ||||||
|  | import javax.validation.constraints.NotNull; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 配置项 | ||||||
|  |  * | ||||||
|  |  * @author xingyu4j | ||||||
|  |  */ | ||||||
|  | @ConfigurationProperties("yudao.websocket") | ||||||
|  | @Data | ||||||
|  | @Validated | ||||||
|  | public class WebSocketProperties { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * WebSocket 的连接路径 | ||||||
|  |      */ | ||||||
|  |     @NotEmpty(message = "WebSocket 的连接路径不能为空") | ||||||
|  |     private String path = "/ws"; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息发送器的类型 | ||||||
|  |      * | ||||||
|  |      * 可选值:local、redis、rocketmq、kafka、rabbitmq | ||||||
|  |      */ | ||||||
|  |     @NotNull(message = "WebSocket 的消息发送者不能为空") | ||||||
|  |     private String senderType = "local"; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,177 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.config; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration; | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; | ||||||
|  | import org.apache.rocketmq.spring.core.RocketMQTemplate; | ||||||
|  | import org.springframework.amqp.core.TopicExchange; | ||||||
|  | import org.springframework.amqp.rabbit.core.RabbitTemplate; | ||||||
|  | import org.springframework.beans.factory.annotation.Value; | ||||||
|  | import org.springframework.boot.autoconfigure.AutoConfiguration; | ||||||
|  | import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | ||||||
|  | import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||||||
|  | import org.springframework.context.annotation.Bean; | ||||||
|  | import org.springframework.context.annotation.Configuration; | ||||||
|  | import org.springframework.kafka.core.KafkaTemplate; | ||||||
|  | import org.springframework.web.socket.WebSocketHandler; | ||||||
|  | import org.springframework.web.socket.config.annotation.EnableWebSocket; | ||||||
|  | import org.springframework.web.socket.config.annotation.WebSocketConfigurer; | ||||||
|  | import org.springframework.web.socket.server.HandshakeInterceptor; | ||||||
|  | 
 | ||||||
|  | import java.util.List; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 自动配置 | ||||||
|  |  * | ||||||
|  |  * @author xingyu4j | ||||||
|  |  */ | ||||||
|  | @AutoConfiguration(before = YudaoRedisMQConsumerAutoConfiguration.class) // before YudaoRedisMQConsumerAutoConfiguration 的原因是,需要保证 RedisWebSocketMessageConsumer 先创建,才能创建 RedisMessageListenerContainer
 | ||||||
|  | @EnableWebSocket // 开启 websocket
 | ||||||
|  | @ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // 允许使用 yudao.websocket.enable=false 禁用 websocket
 | ||||||
|  | @EnableConfigurationProperties(WebSocketProperties.class) | ||||||
|  | public class YudaoWebSocketAutoConfiguration { | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor[] handshakeInterceptors, | ||||||
|  |                                                    WebSocketHandler webSocketHandler, | ||||||
|  |                                                    WebSocketProperties webSocketProperties) { | ||||||
|  |         return registry -> registry | ||||||
|  |                 // 添加 WebSocketHandler
 | ||||||
|  |                 .addHandler(webSocketHandler, webSocketProperties.getPath()) | ||||||
|  |                 .addInterceptors(handshakeInterceptors) | ||||||
|  |                 // 允许跨域,否则前端连接会直接断开
 | ||||||
|  |                 .setAllowedOriginPatterns("*"); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public HandshakeInterceptor handshakeInterceptor() { | ||||||
|  |         return new LoginUserHandshakeInterceptor(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public WebSocketHandler webSocketHandler(WebSocketSessionManager sessionManager, | ||||||
|  |                                              List<? extends WebSocketMessageListener<?>> messageListeners) { | ||||||
|  |         // 1. 创建 JsonWebSocketMessageHandler 对象,处理消息
 | ||||||
|  |         JsonWebSocketMessageHandler messageHandler = new JsonWebSocketMessageHandler(messageListeners); | ||||||
|  |         // 2. 创建 WebSocketSessionHandlerDecorator 对象,处理连接
 | ||||||
|  |         return new WebSocketSessionHandlerDecorator(messageHandler, sessionManager); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public WebSocketSessionManager webSocketSessionManager() { | ||||||
|  |         return new WebSocketSessionManagerImpl(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     // ==================== Sender 相关 ====================
 | ||||||
|  | 
 | ||||||
|  |     @Configuration | ||||||
|  |     @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true) | ||||||
|  |     public class LocalWebSocketMessageSenderConfiguration { | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) { | ||||||
|  |             return new LocalWebSocketMessageSender(sessionManager); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Configuration | ||||||
|  |     @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true) | ||||||
|  |     public class RedisWebSocketMessageSenderConfiguration { | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, | ||||||
|  |                                                                        RedisMQTemplate redisMQTemplate) { | ||||||
|  |             return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer( | ||||||
|  |                 RedisWebSocketMessageSender redisWebSocketMessageSender) { | ||||||
|  |             return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Configuration | ||||||
|  |     @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true) | ||||||
|  |     public class RocketMQWebSocketMessageSenderConfiguration { | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender( | ||||||
|  |                 WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate, | ||||||
|  |                 @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) { | ||||||
|  |             return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer( | ||||||
|  |                 RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) { | ||||||
|  |             return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Configuration | ||||||
|  |     @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true) | ||||||
|  |     public class RabbitMQWebSocketMessageSenderConfiguration { | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender( | ||||||
|  |                 WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate, | ||||||
|  |                 TopicExchange websocketTopicExchange) { | ||||||
|  |             return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer( | ||||||
|  |                 RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) { | ||||||
|  |             return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         /** | ||||||
|  |          * 创建 Topic Exchange | ||||||
|  |          */ | ||||||
|  |         @Bean | ||||||
|  |         public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) { | ||||||
|  |             return new TopicExchange(exchange, | ||||||
|  |                     true,  // durable: 是否持久化
 | ||||||
|  |                     false);  // exclusive: 是否排它
 | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Configuration | ||||||
|  |     @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true) | ||||||
|  |     public class KafkaWebSocketMessageSenderConfiguration { | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public KafkaWebSocketMessageSender kafkaWebSocketMessageSender( | ||||||
|  |                 WebSocketSessionManager sessionManager, KafkaTemplate<Object, Object> kafkaTemplate, | ||||||
|  |                 @Value("${yudao.websocket.sender-kafka.topic}") String topic) { | ||||||
|  |             return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         @Bean | ||||||
|  |         public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer( | ||||||
|  |                 KafkaWebSocketMessageSender kafkaWebSocketMessageSender) { | ||||||
|  |             return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,83 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.handler; | ||||||
|  | 
 | ||||||
|  | import cn.hutool.core.util.StrUtil; | ||||||
|  | import cn.hutool.core.util.TypeUtil; | ||||||
|  | import cn.iocoder.yudao.framework.common.util.json.JsonUtils; | ||||||
|  | import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.springframework.web.socket.TextMessage; | ||||||
|  | import org.springframework.web.socket.WebSocketHandler; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | import org.springframework.web.socket.handler.TextWebSocketHandler; | ||||||
|  | 
 | ||||||
|  | import java.lang.reflect.Type; | ||||||
|  | import java.util.HashMap; | ||||||
|  | import java.util.List; | ||||||
|  | import java.util.Map; | ||||||
|  | import java.util.Objects; | ||||||
|  | import java.util.function.Consumer; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * JSON 格式 {@link WebSocketHandler} 实现类 | ||||||
|  |  * | ||||||
|  |  * 基于 {@link JsonWebSocketMessage#getType()} 消息类型,调度到对应的 {@link WebSocketMessageListener} 监听器。 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class JsonWebSocketMessageHandler extends TextWebSocketHandler { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * type 与 WebSocketMessageListener 的映射 | ||||||
|  |      */ | ||||||
|  |     private final Map<String, WebSocketMessageListener<Object>> listeners = new HashMap<>(); | ||||||
|  | 
 | ||||||
|  |     @SuppressWarnings({"rawtypes", "unchecked"}) | ||||||
|  |     public JsonWebSocketMessageHandler(List<? extends WebSocketMessageListener> listenersList) { | ||||||
|  |         listenersList.forEach((Consumer<WebSocketMessageListener>) | ||||||
|  |                 listener -> listeners.put(listener.getType(), listener)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { | ||||||
|  |         // 1.1 空消息,跳过
 | ||||||
|  |         if (message.getPayloadLength() == 0) { | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         // 1.2 ping 心跳消息,直接返回 pong 消息。
 | ||||||
|  |         if (message.getPayloadLength() == 4 && Objects.equals(message.getPayload(), "ping")) { | ||||||
|  |             session.sendMessage(new TextMessage("pong")); | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         // 2.1 解析消息
 | ||||||
|  |         try { | ||||||
|  |             JsonWebSocketMessage jsonMessage = JsonUtils.parseObject(message.getPayload(), JsonWebSocketMessage.class); | ||||||
|  |             if (jsonMessage == null) { | ||||||
|  |                 log.error("[handleTextMessage][session({}) message({}) 解析为空]", session.getId(), message.getPayload()); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             if (StrUtil.isEmpty(jsonMessage.getType())) { | ||||||
|  |                 log.error("[handleTextMessage][session({}) message({}) 类型为空]", session.getId(), message.getPayload()); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             // 2.2 获得对应的 WebSocketMessageListener
 | ||||||
|  |             WebSocketMessageListener<Object> messageListener = listeners.get(jsonMessage.getType()); | ||||||
|  |             if (messageListener == null) { | ||||||
|  |                 log.error("[handleTextMessage][session({}) message({}) 监听器为空]", session.getId(), message.getPayload()); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             // 2.3 处理消息
 | ||||||
|  |             Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); | ||||||
|  |             Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); | ||||||
|  |             Long tenantId = WebSocketFrameworkUtils.getTenantId(session); | ||||||
|  |             TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); | ||||||
|  |         } catch (Throwable ex) { | ||||||
|  |             log.error("[handleTextMessage][session({}) message({}) 处理异常]", session.getId(), message.getPayload()); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,31 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.listener; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 消息监听器接口 | ||||||
|  |  * | ||||||
|  |  * 目的:前端发送消息给后端后,处理对应 {@link #getType()} 类型的消息 | ||||||
|  |  * | ||||||
|  |  * @param <T> 泛型,消息类型 | ||||||
|  |  */ | ||||||
|  | public interface WebSocketMessageListener<T> { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 处理消息 | ||||||
|  |      * | ||||||
|  |      * @param session Session | ||||||
|  |      * @param message 消息 | ||||||
|  |      */ | ||||||
|  |     void onMessage(WebSocketSession session, T message); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得消息类型 | ||||||
|  |      * | ||||||
|  |      * @see JsonWebSocketMessage#getType() | ||||||
|  |      * @return 消息类型 | ||||||
|  |      */ | ||||||
|  |     String getType(); | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,29 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.message; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | import java.io.Serializable; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * JSON 格式的 WebSocket 消息帧 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class JsonWebSocketMessage implements Serializable { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息类型 | ||||||
|  |      * | ||||||
|  |      * 目的:用于分发到对应的 {@link WebSocketMessageListener} 实现类 | ||||||
|  |      */ | ||||||
|  |     private String type; | ||||||
|  |     /** | ||||||
|  |      * 消息内容 | ||||||
|  |      * | ||||||
|  |      * 要求 JSON 对象 | ||||||
|  |      */ | ||||||
|  |     private String content; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,42 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.security; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.security.core.LoginUser; | ||||||
|  | import cn.iocoder.yudao.framework.security.core.filter.TokenAuthenticationFilter; | ||||||
|  | import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; | ||||||
|  | import org.springframework.http.server.ServerHttpRequest; | ||||||
|  | import org.springframework.http.server.ServerHttpResponse; | ||||||
|  | import org.springframework.web.socket.WebSocketHandler; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | import org.springframework.web.socket.server.HandshakeInterceptor; | ||||||
|  | 
 | ||||||
|  | import java.util.Map; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 登录用户的 {@link HandshakeInterceptor} 实现类 | ||||||
|  |  * | ||||||
|  |  * 流程如下: | ||||||
|  |  * 1. 前端连接 websocket 时,会通过拼接 ?token={token} 到 ws:// 连接后,这样它可以被 {@link TokenAuthenticationFilter} 所认证通过
 | ||||||
|  |  * 2. {@link LoginUserHandshakeInterceptor} 负责把 {@link LoginUser} 添加到 {@link WebSocketSession} 中 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public class LoginUserHandshakeInterceptor implements HandshakeInterceptor { | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, | ||||||
|  |                                    WebSocketHandler wsHandler, Map<String, Object> attributes) { | ||||||
|  |         LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); | ||||||
|  |         if (loginUser != null) { | ||||||
|  |             WebSocketFrameworkUtils.setLoginUser(loginUser, attributes); | ||||||
|  |         } | ||||||
|  |         return true; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, | ||||||
|  |                                WebSocketHandler wsHandler, Exception exception) { | ||||||
|  |         // do nothing
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,24 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.security; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.security.config.AuthorizeRequestsCustomizer; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.config.WebSocketProperties; | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | import org.springframework.security.config.annotation.web.builders.HttpSecurity; | ||||||
|  | import org.springframework.security.config.annotation.web.configurers.ExpressionUrlAuthorizationConfigurer; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 的权限自定义 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public class WebSocketAuthorizeRequestsCustomizer extends AuthorizeRequestsCustomizer { | ||||||
|  | 
 | ||||||
|  |     private final WebSocketProperties webSocketProperties; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void customize(ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry registry) { | ||||||
|  |         registry.antMatchers(webSocketProperties.getPath()).permitAll(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,104 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender; | ||||||
|  | 
 | ||||||
|  | import cn.hutool.core.collection.CollUtil; | ||||||
|  | import cn.hutool.core.util.StrUtil; | ||||||
|  | import cn.iocoder.yudao.framework.common.util.json.JsonUtils; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.springframework.web.socket.TextMessage; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | import java.io.IOException; | ||||||
|  | import java.util.Collection; | ||||||
|  | import java.util.Collections; | ||||||
|  | import java.util.List; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocketMessageSender 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     private final WebSocketSessionManager sessionManager; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         send(null, userType, userId, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         send(null, userType, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         send(sessionId, null, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息 | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         // 1. 获得 Session 列表
 | ||||||
|  |         List<WebSocketSession> sessions = Collections.emptyList(); | ||||||
|  |         if (StrUtil.isNotEmpty(sessionId)) { | ||||||
|  |             WebSocketSession session = sessionManager.getSession(sessionId); | ||||||
|  |             if (session != null) { | ||||||
|  |                 sessions = Collections.singletonList(session); | ||||||
|  |             } | ||||||
|  |         } else if (userType != null && userId != null) { | ||||||
|  |             sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType, userId); | ||||||
|  |         } else if (userType != null) { | ||||||
|  |             sessions = (List<WebSocketSession>) sessionManager.getSessionList(userType); | ||||||
|  |         } | ||||||
|  |         if (CollUtil.isEmpty(sessions)) { | ||||||
|  |             log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹配到会话]", | ||||||
|  |                     sessionId, userType, userId, messageType, messageContent); | ||||||
|  |         } | ||||||
|  |         // 2. 执行发送
 | ||||||
|  |         doSend(sessions, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息的具体实现 | ||||||
|  |      * | ||||||
|  |      * @param sessions Session 列表 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     public void doSend(Collection<WebSocketSession> sessions, String messageType, String messageContent) { | ||||||
|  |         JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); | ||||||
|  |         String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON 序列化
 | ||||||
|  |         sessions.forEach(session -> { | ||||||
|  |             // 1. 各种校验,保证 Session 可以被发送
 | ||||||
|  |             if (session == null) { | ||||||
|  |                 log.error("[doSend][session 为空, message({})]", message); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             if (!session.isOpen()) { | ||||||
|  |                 log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  |             // 2. 执行发送
 | ||||||
|  |             try { | ||||||
|  |                 session.sendMessage(new TextMessage(payload)); | ||||||
|  |                 log.info("[doSend][session({}) 发送消息成功,message({})]", session.getId(), message); | ||||||
|  |             } catch (IOException ex) { | ||||||
|  |                 log.error("[doSend][session({}) 发送消息失败,message({})]", session.getId(), message, ex); | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,52 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.common.util.json.JsonUtils; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 消息的发送器接口 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public interface WebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定用户 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     void send(Integer userType, Long userId, String messageType, String messageContent); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定用户类型 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     void send(Integer userType, String messageType, String messageContent); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定 Session | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     void send(String sessionId, String messageType, String messageContent); | ||||||
|  | 
 | ||||||
|  |     default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { | ||||||
|  |         send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     default void sendObject(Integer userType, String messageType, Object messageContent) { | ||||||
|  |         send(userType, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     default void sendObject(String sessionId, String messageType, Object messageContent) { | ||||||
|  |         send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,35 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.kafka; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * Kafka 广播 WebSocket 的消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class KafkaWebSocketMessage { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Session 编号 | ||||||
|  |      */ | ||||||
|  |     private String sessionId; | ||||||
|  |     /** | ||||||
|  |      * 用户类型 | ||||||
|  |      */ | ||||||
|  |     private Integer userType; | ||||||
|  |     /** | ||||||
|  |      * 用户编号 | ||||||
|  |      */ | ||||||
|  |     private Long userId; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息类型 | ||||||
|  |      */ | ||||||
|  |     private String messageType; | ||||||
|  |     /** | ||||||
|  |      * 消息内容 | ||||||
|  |      */ | ||||||
|  |     private String messageContent; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,28 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.kafka; | ||||||
|  | 
 | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | import org.springframework.amqp.rabbit.annotation.RabbitHandler; | ||||||
|  | import org.springframework.kafka.annotation.KafkaListener; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link KafkaWebSocketMessage} 广播消息的消费者,真正把消息发送出去 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public class KafkaWebSocketMessageConsumer { | ||||||
|  | 
 | ||||||
|  |     private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @RabbitHandler | ||||||
|  |     @KafkaListener( | ||||||
|  |             topics = "${yudao.websocket.sender-kafka.topic}", | ||||||
|  |             // 在 Group 上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Group 不同,以达到广播消费的目的
 | ||||||
|  |             groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}") | ||||||
|  |     public void onMessage(KafkaWebSocketMessage message) { | ||||||
|  |         rabbitMQWebSocketMessageSender.send(message.getSessionId(), | ||||||
|  |                 message.getUserType(), message.getUserId(), | ||||||
|  |                 message.getMessageType(), message.getMessageContent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,67 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.kafka; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.springframework.kafka.core.KafkaTemplate; | ||||||
|  | 
 | ||||||
|  | import java.util.concurrent.ExecutionException; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 基于 Kafka 的 {@link WebSocketMessageSender} 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     private final KafkaTemplate<Object, Object> kafkaTemplate; | ||||||
|  | 
 | ||||||
|  |     private final String topic; | ||||||
|  | 
 | ||||||
|  |     public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, | ||||||
|  |                                        KafkaTemplate<Object, Object> kafkaTemplate, | ||||||
|  |                                        String topic) { | ||||||
|  |         super(sessionManager); | ||||||
|  |         this.kafkaTemplate = kafkaTemplate; | ||||||
|  |         this.topic = topic; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         sendKafkaMessage(null, userId, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         sendKafkaMessage(null, null, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         sendKafkaMessage(sessionId, null, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 通过 Kafka 广播消息 | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     private void sendKafkaMessage(String sessionId, Long userId, Integer userType, | ||||||
|  |                                   String messageType, String messageContent) { | ||||||
|  |         KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage() | ||||||
|  |                 .setSessionId(sessionId).setUserId(userId).setUserType(userType) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent); | ||||||
|  |         try { | ||||||
|  |             kafkaTemplate.send(topic, mqMessage).get(); | ||||||
|  |         } catch (InterruptedException | ExecutionException e) { | ||||||
|  |             log.error("[sendKafkaMessage][发送消息({}) 到 Kafka 失败]", mqMessage, e); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,20 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.local; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 本地的 {@link WebSocketMessageSender} 实现类 | ||||||
|  |  * | ||||||
|  |  * 注意:仅仅适合单机场景!!! | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) { | ||||||
|  |         super(sessionManager); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,37 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | import java.io.Serializable; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * RabbitMQ 广播 WebSocket 的消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class RabbitMQWebSocketMessage implements Serializable { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Session 编号 | ||||||
|  |      */ | ||||||
|  |     private String sessionId; | ||||||
|  |     /** | ||||||
|  |      * 用户类型 | ||||||
|  |      */ | ||||||
|  |     private Integer userType; | ||||||
|  |     /** | ||||||
|  |      * 用户编号 | ||||||
|  |      */ | ||||||
|  |     private Long userId; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息类型 | ||||||
|  |      */ | ||||||
|  |     private String messageType; | ||||||
|  |     /** | ||||||
|  |      * 消息内容 | ||||||
|  |      */ | ||||||
|  |     private String messageContent; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,39 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; | ||||||
|  | 
 | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | import org.springframework.amqp.core.ExchangeTypes; | ||||||
|  | import org.springframework.amqp.rabbit.annotation.*; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link RabbitMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @RabbitListener( | ||||||
|  |         bindings = @QueueBinding( | ||||||
|  |                 value = @Queue( | ||||||
|  |                         // 在 Queue 的名字上,使用 UUID 生成其后缀。这样,启动的 Consumer 的 Queue 不同,以达到广播消费的目的
 | ||||||
|  |                         name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", | ||||||
|  |                         // Consumer 关闭时,该队列就可以被自动删除了
 | ||||||
|  |                         autoDelete = "true" | ||||||
|  |                 ), | ||||||
|  |                 exchange = @Exchange( | ||||||
|  |                         name = "${yudao.websocket.sender-rabbitmq.exchange}", | ||||||
|  |                         type = ExchangeTypes.TOPIC, | ||||||
|  |                         declare = "false" | ||||||
|  |                 ) | ||||||
|  |         ) | ||||||
|  | ) | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public class RabbitMQWebSocketMessageConsumer { | ||||||
|  | 
 | ||||||
|  |     private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @RabbitHandler | ||||||
|  |     public void onMessage(RabbitMQWebSocketMessage message) { | ||||||
|  |         rabbitMQWebSocketMessageSender.send(message.getSessionId(), | ||||||
|  |                 message.getUserType(), message.getUserId(), | ||||||
|  |                 message.getMessageType(), message.getMessageContent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,62 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.springframework.amqp.core.TopicExchange; | ||||||
|  | import org.springframework.amqp.rabbit.core.RabbitTemplate; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 基于 RabbitMQ 的 {@link WebSocketMessageSender} 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     private final RabbitTemplate rabbitTemplate; | ||||||
|  | 
 | ||||||
|  |     private final TopicExchange topicExchange; | ||||||
|  | 
 | ||||||
|  |     public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, | ||||||
|  |                                           RabbitTemplate rabbitTemplate, | ||||||
|  |                                           TopicExchange topicExchange) { | ||||||
|  |         super(sessionManager); | ||||||
|  |         this.rabbitTemplate = rabbitTemplate; | ||||||
|  |         this.topicExchange = topicExchange; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         sendRabbitMQMessage(null, userId, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         sendRabbitMQMessage(null, null, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         sendRabbitMQMessage(sessionId, null, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 通过 RabbitMQ 广播消息 | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, | ||||||
|  |                                      String messageType, String messageContent) { | ||||||
|  |         RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage() | ||||||
|  |                 .setSessionId(sessionId).setUserId(userId).setUserType(userType) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent); | ||||||
|  |         rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,34 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.redis; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * Redis 广播 WebSocket 的消息 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class RedisWebSocketMessage extends AbstractRedisChannelMessage { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Session 编号 | ||||||
|  |      */ | ||||||
|  |     private String sessionId; | ||||||
|  |     /** | ||||||
|  |      * 用户类型 | ||||||
|  |      */ | ||||||
|  |     private Integer userType; | ||||||
|  |     /** | ||||||
|  |      * 用户编号 | ||||||
|  |      */ | ||||||
|  |     private Long userId; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息类型 | ||||||
|  |      */ | ||||||
|  |     private String messageType; | ||||||
|  |     /** | ||||||
|  |      * 消息内容 | ||||||
|  |      */ | ||||||
|  |     private String messageContent; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,23 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.redis; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link RedisWebSocketMessage} 广播消息的消费者,真正把消息发送出去 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener<RedisWebSocketMessage> { | ||||||
|  | 
 | ||||||
|  |     private final RedisWebSocketMessageSender redisWebSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void onMessage(RedisWebSocketMessage message) { | ||||||
|  |         redisWebSocketMessageSender.send(message.getSessionId(), | ||||||
|  |                 message.getUserType(), message.getUserId(), | ||||||
|  |                 message.getMessageType(), message.getMessageContent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,57 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.redis; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 基于 Redis 的 {@link WebSocketMessageSender} 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     private final RedisMQTemplate redisMQTemplate; | ||||||
|  | 
 | ||||||
|  |     public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager, | ||||||
|  |                                        RedisMQTemplate redisMQTemplate) { | ||||||
|  |         super(sessionManager); | ||||||
|  |         this.redisMQTemplate = redisMQTemplate; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         sendRedisMessage(null, userId, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         sendRedisMessage(null, null, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         sendRedisMessage(sessionId, null, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 通过 Redis 广播消息 | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     private void sendRedisMessage(String sessionId, Long userId, Integer userType, | ||||||
|  |                                   String messageType, String messageContent) { | ||||||
|  |         RedisWebSocketMessage mqMessage = new RedisWebSocketMessage() | ||||||
|  |                 .setSessionId(sessionId).setUserId(userId).setUserType(userType) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent); | ||||||
|  |         redisMQTemplate.send(mqMessage); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,35 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * RocketMQ 广播 WebSocket 的消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class RocketMQWebSocketMessage { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * Session 编号 | ||||||
|  |      */ | ||||||
|  |     private String sessionId; | ||||||
|  |     /** | ||||||
|  |      * 用户类型 | ||||||
|  |      */ | ||||||
|  |     private Integer userType; | ||||||
|  |     /** | ||||||
|  |      * 用户编号 | ||||||
|  |      */ | ||||||
|  |     private Long userId; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 消息类型 | ||||||
|  |      */ | ||||||
|  |     private String messageType; | ||||||
|  |     /** | ||||||
|  |      * 消息内容 | ||||||
|  |      */ | ||||||
|  |     private String messageContent; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,30 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; | ||||||
|  | 
 | ||||||
|  | import lombok.RequiredArgsConstructor; | ||||||
|  | import org.apache.rocketmq.spring.annotation.MessageModel; | ||||||
|  | import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; | ||||||
|  | import org.apache.rocketmq.spring.core.RocketMQListener; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link RocketMQWebSocketMessage} 广播消息的消费者,真正把消息发送出去 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @RocketMQMessageListener( // 重点:添加 @RocketMQMessageListener 注解,声明消费的 topic
 | ||||||
|  |         topic = "${yudao.websocket.sender-rocketmq.topic}", | ||||||
|  |         consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}", | ||||||
|  |         messageModel = MessageModel.BROADCASTING // 设置为广播模式,保证每个实例都能收到消息
 | ||||||
|  | ) | ||||||
|  | @RequiredArgsConstructor | ||||||
|  | public class RocketMQWebSocketMessageConsumer implements RocketMQListener<RocketMQWebSocketMessage> { | ||||||
|  | 
 | ||||||
|  |     private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void onMessage(RocketMQWebSocketMessage message) { | ||||||
|  |         rocketMQWebSocketMessageSender.send(message.getSessionId(), | ||||||
|  |                 message.getUserType(), message.getUserId(), | ||||||
|  |                 message.getMessageType(), message.getMessageContent()); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,61 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; | ||||||
|  | import lombok.extern.slf4j.Slf4j; | ||||||
|  | import org.apache.rocketmq.spring.core.RocketMQTemplate; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 基于 RocketMQ 的 {@link WebSocketMessageSender} 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Slf4j | ||||||
|  | public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender { | ||||||
|  | 
 | ||||||
|  |     private final RocketMQTemplate rocketMQTemplate; | ||||||
|  | 
 | ||||||
|  |     private final String topic; | ||||||
|  | 
 | ||||||
|  |     public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, | ||||||
|  |                                           RocketMQTemplate rocketMQTemplate, | ||||||
|  |                                           String topic) { | ||||||
|  |         super(sessionManager); | ||||||
|  |         this.rocketMQTemplate = rocketMQTemplate; | ||||||
|  |         this.topic = topic; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         sendRocketMQMessage(null, userId, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         sendRocketMQMessage(null, null, userType, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         sendRocketMQMessage(sessionId, null, null, messageType, messageContent); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 通过 RocketMQ 广播消息 | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容 | ||||||
|  |      */ | ||||||
|  |     private void sendRocketMQMessage(String sessionId, Long userId, Integer userType, | ||||||
|  |                                      String messageType, String messageContent) { | ||||||
|  |         RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage() | ||||||
|  |                 .setSessionId(sessionId).setUserId(userId).setUserType(userType) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent); | ||||||
|  |         rocketMQTemplate.syncSend(topic, mqMessage); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,49 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.session; | ||||||
|  | 
 | ||||||
|  | import org.springframework.web.socket.CloseStatus; | ||||||
|  | import org.springframework.web.socket.WebSocketHandler; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; | ||||||
|  | import org.springframework.web.socket.handler.WebSocketHandlerDecorator; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link WebSocketHandler} 的装饰类,实现了以下功能: | ||||||
|  |  * | ||||||
|  |  * 1. {@link WebSocketSession} 连接或关闭时,使用 {@link #sessionManager} 进行管理 | ||||||
|  |  * 2. 封装 {@link WebSocketSession} 支持并发操作 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public class WebSocketSessionHandlerDecorator extends WebSocketHandlerDecorator { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送时间的限制,单位:毫秒 | ||||||
|  |      */ | ||||||
|  |     private static final Integer SEND_TIME_LIMIT = 1000 * 5; | ||||||
|  |     /** | ||||||
|  |      * 发送消息缓冲上线,单位:bytes | ||||||
|  |      */ | ||||||
|  |     private static final Integer BUFFER_SIZE_LIMIT = 1024 * 100; | ||||||
|  | 
 | ||||||
|  |     private final WebSocketSessionManager sessionManager; | ||||||
|  | 
 | ||||||
|  |     public WebSocketSessionHandlerDecorator(WebSocketHandler delegate, | ||||||
|  |                                             WebSocketSessionManager sessionManager) { | ||||||
|  |         super(delegate); | ||||||
|  |         this.sessionManager = sessionManager; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void afterConnectionEstablished(WebSocketSession session) { | ||||||
|  |         // 实现 session 支持并发,可参考 https://blog.csdn.net/abu935009066/article/details/131218149
 | ||||||
|  |         session = new ConcurrentWebSocketSessionDecorator(session, SEND_TIME_LIMIT, BUFFER_SIZE_LIMIT); | ||||||
|  |         // 添加到 WebSocketSessionManager 中
 | ||||||
|  |         sessionManager.addSession(session); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) { | ||||||
|  |         sessionManager.removeSession(session); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,53 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.session; | ||||||
|  | 
 | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | import java.util.Collection; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * {@link WebSocketSession} 管理器的接口 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public interface WebSocketSessionManager { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 添加 Session | ||||||
|  |      * | ||||||
|  |      * @param session Session | ||||||
|  |      */ | ||||||
|  |     void addSession(WebSocketSession session); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 移除 Session | ||||||
|  |      * | ||||||
|  |      * @param session Session | ||||||
|  |      */ | ||||||
|  |     void removeSession(WebSocketSession session); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得指定编号的 Session | ||||||
|  |      * | ||||||
|  |      * @param id Session 编号 | ||||||
|  |      * @return Session | ||||||
|  |      */ | ||||||
|  |     WebSocketSession getSession(String id); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得指定用户类型的 Session 列表 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @return Session 列表 | ||||||
|  |      */ | ||||||
|  |     Collection<WebSocketSession> getSessionList(Integer userType); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得指定用户编号的 Session 列表 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @return Session 列表 | ||||||
|  |      */ | ||||||
|  |     Collection<WebSocketSession> getSessionList(Integer userType, Long userId); | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,125 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.session; | ||||||
|  | 
 | ||||||
|  | import cn.hutool.core.collection.CollUtil; | ||||||
|  | import cn.iocoder.yudao.framework.security.core.LoginUser; | ||||||
|  | import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | import java.util.ArrayList; | ||||||
|  | import java.util.Collection; | ||||||
|  | import java.util.LinkedList; | ||||||
|  | import java.util.List; | ||||||
|  | import java.util.concurrent.ConcurrentHashMap; | ||||||
|  | import java.util.concurrent.ConcurrentMap; | ||||||
|  | import java.util.concurrent.CopyOnWriteArrayList; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 默认的 {@link WebSocketSessionManager} 实现类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public class WebSocketSessionManagerImpl implements WebSocketSessionManager { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * id 与 WebSocketSession 映射 | ||||||
|  |      * | ||||||
|  |      * key:Session 编号 | ||||||
|  |      */ | ||||||
|  |     private final ConcurrentMap<String, WebSocketSession> idSessions = new ConcurrentHashMap<>(); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * user 与 WebSocketSession 映射 | ||||||
|  |      * | ||||||
|  |      * key1:用户类型 | ||||||
|  |      * key2:用户编号 | ||||||
|  |      */ | ||||||
|  |     private final ConcurrentMap<Integer, ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>>> userSessions | ||||||
|  |             = new ConcurrentHashMap<>(); | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void addSession(WebSocketSession session) { | ||||||
|  |         // 添加到 idSessions 中
 | ||||||
|  |         idSessions.put(session.getId(), session); | ||||||
|  |         // 添加到 userSessions 中
 | ||||||
|  |         LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); | ||||||
|  |         if (user == null) { | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType()); | ||||||
|  |         if (userSessionsMap == null) { | ||||||
|  |             userSessionsMap = new ConcurrentHashMap<>(); | ||||||
|  |             if (userSessions.putIfAbsent(user.getUserType(), userSessionsMap) != null) { | ||||||
|  |                 userSessionsMap = userSessions.get(user.getUserType()); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId()); | ||||||
|  |         if (sessions == null) { | ||||||
|  |             sessions = new CopyOnWriteArrayList<>(); | ||||||
|  |             if (userSessionsMap.putIfAbsent(user.getId(), sessions) != null) { | ||||||
|  |                 sessions = userSessionsMap.get(user.getId()); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         sessions.add(session); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void removeSession(WebSocketSession session) { | ||||||
|  |         // 移除从 idSessions 中
 | ||||||
|  |         idSessions.remove(session.getId(), session); | ||||||
|  |         // 移除从 idSessions 中
 | ||||||
|  |         LoginUser user = WebSocketFrameworkUtils.getLoginUser(session); | ||||||
|  |         if (user == null) { | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(user.getUserType()); | ||||||
|  |         if (userSessionsMap == null) { | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(user.getId()); | ||||||
|  |         sessions.removeIf(session0 -> session0.getId().equals(session.getId())); | ||||||
|  |         if (CollUtil.isEmpty(sessions)) { | ||||||
|  |             userSessionsMap.remove(user.getId(), sessions); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public WebSocketSession getSession(String id) { | ||||||
|  |         return idSessions.get(id); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public Collection<WebSocketSession> getSessionList(Integer userType) { | ||||||
|  |         ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType); | ||||||
|  |         if (CollUtil.isEmpty(userSessionsMap)) { | ||||||
|  |             return new ArrayList<>(); | ||||||
|  |         } | ||||||
|  |         LinkedList<WebSocketSession> result = new LinkedList<>(); // 避免扩容
 | ||||||
|  |         Long contextTenantId = TenantContextHolder.getTenantId(); | ||||||
|  |         for (List<WebSocketSession> sessions : userSessionsMap.values()) { | ||||||
|  |             if (CollUtil.isEmpty(sessions)) { | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  |             // 特殊:如果租户不匹配,则直接排除
 | ||||||
|  |             if (contextTenantId != null) { | ||||||
|  |                 Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0)); | ||||||
|  |                 if (!contextTenantId.equals(userTenantId)) { | ||||||
|  |                     continue; | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             result.addAll(sessions); | ||||||
|  |         } | ||||||
|  |         return result; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public Collection<WebSocketSession> getSessionList(Integer userType, Long userId) { | ||||||
|  |         ConcurrentMap<Long, CopyOnWriteArrayList<WebSocketSession>> userSessionsMap = userSessions.get(userType); | ||||||
|  |         if (CollUtil.isEmpty(userSessionsMap)) { | ||||||
|  |             return new ArrayList<>(); | ||||||
|  |         } | ||||||
|  |         CopyOnWriteArrayList<WebSocketSession> sessions = userSessionsMap.get(userId); | ||||||
|  |         return CollUtil.isNotEmpty(sessions) ? new ArrayList<>(sessions) : new ArrayList<>(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,67 @@ | ||||||
|  | package cn.iocoder.yudao.framework.websocket.core.util; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.security.core.LoginUser; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | import java.util.Map; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 专属于 web 包的工具类 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | public class WebSocketFrameworkUtils { | ||||||
|  | 
 | ||||||
|  |     public static final String ATTRIBUTE_LOGIN_USER = "LOGIN_USER"; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 设置当前用户 | ||||||
|  |      * | ||||||
|  |      * @param loginUser 登录用户 | ||||||
|  |      * @param attributes Session | ||||||
|  |      */ | ||||||
|  |     public static void setLoginUser(LoginUser loginUser, Map<String, Object> attributes) { | ||||||
|  |         attributes.put(ATTRIBUTE_LOGIN_USER, loginUser); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获取当前用户 | ||||||
|  |      * | ||||||
|  |      * @return 当前用户 | ||||||
|  |      */ | ||||||
|  |     public static LoginUser getLoginUser(WebSocketSession session) { | ||||||
|  |         return (LoginUser) session.getAttributes().get(ATTRIBUTE_LOGIN_USER); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得当前用户的编号 | ||||||
|  |      * | ||||||
|  |      * @return 用户编号 | ||||||
|  |      */ | ||||||
|  |     public static Long getLoginUserId(WebSocketSession session) { | ||||||
|  |         LoginUser loginUser = getLoginUser(session); | ||||||
|  |         return loginUser != null ? loginUser.getId() : null; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得当前用户的类型 | ||||||
|  |      * | ||||||
|  |      * @return 用户编号 | ||||||
|  |      */ | ||||||
|  |     public static Integer getLoginUserType(WebSocketSession session) { | ||||||
|  |         LoginUser loginUser = getLoginUser(session); | ||||||
|  |         return loginUser != null ? loginUser.getUserType() : null; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 获得当前用户的租户编号 | ||||||
|  |      * | ||||||
|  |      * @param session Session | ||||||
|  |      * @return 租户编号 | ||||||
|  |      */ | ||||||
|  |     public static Long getTenantId(WebSocketSession session) { | ||||||
|  |         LoginUser loginUser = getLoginUser(session); | ||||||
|  |         return loginUser != null ? loginUser.getTenantId() : null; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,4 @@ | ||||||
|  | /** | ||||||
|  |  * WebSocket 框架,支持多节点的广播 | ||||||
|  |  */ | ||||||
|  | package cn.iocoder.yudao.framework.websocket; | ||||||
|  | @ -0,0 +1 @@ | ||||||
|  | cn.iocoder.yudao.framework.websocket.config.YudaoWebSocketAutoConfiguration | ||||||
|  | @ -0,0 +1 @@ | ||||||
|  | <http://www.iocoder.cn/Spring-Boot/WebSocket/?yudao> | ||||||
|  | @ -37,6 +37,10 @@ spring: | ||||||
|           uri: grayLb://infra-server |           uri: grayLb://infra-server | ||||||
|           predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组 |           predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组 | ||||||
|             - Path=/admin/** |             - Path=/admin/** | ||||||
|  |         - id: infra-websocket # 路由的编号(WebSocket) | ||||||
|  |           uri: grayLb://infra-server | ||||||
|  |           predicates: # 断言,作为路由的匹配条件,对应 RouteDefinition 数组 | ||||||
|  |             - Path=/infra/ws/** | ||||||
|         ## member-server 服务 |         ## member-server 服务 | ||||||
|         - id: member-admin-api # 路由的编号 |         - id: member-admin-api # 路由的编号 | ||||||
|           uri: grayLb://member-server |           uri: grayLb://member-server | ||||||
|  |  | ||||||
|  | @ -0,0 +1,74 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.api.websocket; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.common.pojo.CommonResult; | ||||||
|  | import cn.iocoder.yudao.framework.common.util.json.JsonUtils; | ||||||
|  | import cn.iocoder.yudao.module.infra.api.websocket.dto.WebSocketSendReqDTO; | ||||||
|  | import cn.iocoder.yudao.module.infra.enums.ApiConstants; | ||||||
|  | import io.swagger.v3.oas.annotations.Operation; | ||||||
|  | import io.swagger.v3.oas.annotations.tags.Tag; | ||||||
|  | import org.springframework.cloud.openfeign.FeignClient; | ||||||
|  | import org.springframework.web.bind.annotation.PostMapping; | ||||||
|  | import org.springframework.web.bind.annotation.RequestBody; | ||||||
|  | 
 | ||||||
|  | import javax.validation.Valid; | ||||||
|  | 
 | ||||||
|  | @FeignClient(name = ApiConstants.NAME) // TODO 芋艿:fallbackFactory =
 | ||||||
|  | @Tag(name = "RPC 服务 - WebSocket 发送器的") // 对 WebSocketMessageSender 进行封装,提供给其它模块使用
 | ||||||
|  | public interface WebSocketSenderApi { | ||||||
|  | 
 | ||||||
|  |     String PREFIX = ApiConstants.PREFIX + "/websocket"; | ||||||
|  | 
 | ||||||
|  |     @PostMapping(PREFIX + "/send") | ||||||
|  |     @Operation(summary = "发送 WebSocket 消息") | ||||||
|  |     CommonResult<Boolean> send(@Valid @RequestBody WebSocketSendReqDTO message); | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定用户 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param userId 用户编号 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     default void send(Integer userType, Long userId, String messageType, String messageContent) { | ||||||
|  |         send(new WebSocketSendReqDTO().setUserType(userType).setUserId(userId) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent)).checkError(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定用户类型 | ||||||
|  |      * | ||||||
|  |      * @param userType 用户类型 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     default void send(Integer userType, String messageType, String messageContent) { | ||||||
|  |         send(new WebSocketSendReqDTO().setUserType(userType) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent)).checkError(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送消息给指定 Session | ||||||
|  |      * | ||||||
|  |      * @param sessionId Session 编号 | ||||||
|  |      * @param messageType 消息类型 | ||||||
|  |      * @param messageContent 消息内容,JSON 格式 | ||||||
|  |      */ | ||||||
|  |     default void send(String sessionId, String messageType, String messageContent) { | ||||||
|  |         send(new WebSocketSendReqDTO().setSessionId(sessionId) | ||||||
|  |                 .setMessageType(messageType).setMessageContent(messageContent)).checkError(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { | ||||||
|  |         send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     default void sendObject(Integer userType, String messageType, Object messageContent) { | ||||||
|  |         send(userType, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     default void sendObject(String sessionId, String messageType, Object messageContent) { | ||||||
|  |         send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,26 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.api.websocket.dto; | ||||||
|  | 
 | ||||||
|  | import io.swagger.v3.oas.annotations.media.Schema; | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | import javax.validation.constraints.NotEmpty; | ||||||
|  | 
 | ||||||
|  | @Schema(description = "RPC 服务 - WebSocket 消息发送 Request DTO") | ||||||
|  | @Data | ||||||
|  | public class WebSocketSendReqDTO { | ||||||
|  | 
 | ||||||
|  |     @Schema(description = "Session 编号", example = "abc") | ||||||
|  |     private String sessionId; | ||||||
|  |     @Schema(description = "用户编号", example = "1024") | ||||||
|  |     private Long userId; | ||||||
|  |     @Schema(description = "用户类型", example = "1") | ||||||
|  |     private Integer userType; | ||||||
|  | 
 | ||||||
|  |     @Schema(description = "消息类型", example = "demo-message") | ||||||
|  |     @NotEmpty(message = "消息类型不能为空") | ||||||
|  |     private String messageType; | ||||||
|  |     @Schema(description = "消息内容", example = "{\"name\":\"李四\"}}") | ||||||
|  |     @NotEmpty(message = "消息内容不能为空") | ||||||
|  |     private String messageContent; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -61,6 +61,11 @@ | ||||||
|             <artifactId>yudao-spring-boot-starter-security</artifactId> |             <artifactId>yudao-spring-boot-starter-security</artifactId> | ||||||
|         </dependency> |         </dependency> | ||||||
| 
 | 
 | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-websocket</artifactId> | ||||||
|  |         </dependency> | ||||||
|  | 
 | ||||||
|         <!-- DB 相关 --> |         <!-- DB 相关 --> | ||||||
|         <dependency> |         <dependency> | ||||||
|             <groupId>cn.iocoder.cloud</groupId> |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  | @ -101,6 +106,10 @@ | ||||||
|         </dependency> |         </dependency> | ||||||
| 
 | 
 | ||||||
|         <!-- 消息队列相关 --> |         <!-- 消息队列相关 --> | ||||||
|  |         <dependency> | ||||||
|  |             <groupId>cn.iocoder.cloud</groupId> | ||||||
|  |             <artifactId>yudao-spring-boot-starter-mq</artifactId> | ||||||
|  |         </dependency> | ||||||
| 
 | 
 | ||||||
|         <!-- Test 测试相关 --> |         <!-- Test 测试相关 --> | ||||||
|         <dependency> |         <dependency> | ||||||
|  |  | ||||||
|  | @ -0,0 +1,36 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.api.websocket; | ||||||
|  | 
 | ||||||
|  | import cn.hutool.core.util.StrUtil; | ||||||
|  | import cn.iocoder.yudao.framework.common.pojo.CommonResult; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.module.infra.api.websocket.dto.WebSocketSendReqDTO; | ||||||
|  | import org.springframework.validation.annotation.Validated; | ||||||
|  | import org.springframework.web.bind.annotation.RestController; | ||||||
|  | 
 | ||||||
|  | import javax.annotation.Resource; | ||||||
|  | 
 | ||||||
|  | import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; | ||||||
|  | 
 | ||||||
|  | @RestController // 提供 RESTful API 接口,给 Feign 调用
 | ||||||
|  | @Validated | ||||||
|  | public class WebSocketSenderApiImpl implements WebSocketSenderApi { | ||||||
|  | 
 | ||||||
|  |     @Resource | ||||||
|  |     private WebSocketMessageSender webSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public CommonResult<Boolean> send(WebSocketSendReqDTO message) { | ||||||
|  |         if (StrUtil.isNotEmpty(message.getSessionId())) { | ||||||
|  |             webSocketMessageSender.send(message.getSessionId(), | ||||||
|  |                     message.getMessageType(), message.getMessageContent()); | ||||||
|  |         } else if (message.getUserType() != null && message.getUserId() != null) { | ||||||
|  |             webSocketMessageSender.send(message.getUserType(), message.getUserId(), | ||||||
|  |                     message.getMessageType(), message.getMessageContent()); | ||||||
|  |         } else if (message.getUserType() != null) { | ||||||
|  |             webSocketMessageSender.send(message.getUserType(), | ||||||
|  |                     message.getMessageType(), message.getMessageContent()); | ||||||
|  |         } | ||||||
|  |         return success(true); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -64,7 +64,7 @@ public class CodegenBuilder { | ||||||
|      */ |      */ | ||||||
|     public static final String TENANT_ID_FIELD = "tenantId"; |     public static final String TENANT_ID_FIELD = "tenantId"; | ||||||
|     /** |     /** | ||||||
|      * {@link BaseDO} 的字段 |      * {@link cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO} 的字段 | ||||||
|      */ |      */ | ||||||
|     public static final Set<String> BASE_DO_FIELDS = new HashSet<>(); |     public static final Set<String> BASE_DO_FIELDS = new HashSet<>(); | ||||||
|     /** |     /** | ||||||
|  |  | ||||||
|  | @ -0,0 +1,48 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.websocket; | ||||||
|  | 
 | ||||||
|  | import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; | ||||||
|  | import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; | ||||||
|  | import cn.iocoder.yudao.module.infra.websocket.message.DemoReceiveMessage; | ||||||
|  | import cn.iocoder.yudao.module.infra.websocket.message.DemoSendMessage; | ||||||
|  | import org.springframework.stereotype.Component; | ||||||
|  | import org.springframework.web.socket.WebSocketSession; | ||||||
|  | 
 | ||||||
|  | import javax.annotation.Resource; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * WebSocket 示例:单发消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Component | ||||||
|  | public class DemoWebSocketMessageListener implements WebSocketMessageListener<DemoSendMessage> { | ||||||
|  | 
 | ||||||
|  |     @Resource | ||||||
|  |     private WebSocketMessageSender webSocketMessageSender; | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void onMessage(WebSocketSession session, DemoSendMessage message) { | ||||||
|  |         Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); | ||||||
|  |         // 情况一:单发
 | ||||||
|  |         if (message.getToUserId() != null) { | ||||||
|  |             DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) | ||||||
|  |                     .setText(message.getText()).setSingle(true); | ||||||
|  |             webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户
 | ||||||
|  |                     "demo-message-receive", toMessage); | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  |         // 情况二:群发
 | ||||||
|  |         DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) | ||||||
|  |                 .setText(message.getText()).setSingle(false); | ||||||
|  |         webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户
 | ||||||
|  |                 "demo-message-receive", toMessage); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public String getType() { | ||||||
|  |         return "demo-message-send"; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,27 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.websocket.message; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 示例:server -> client 同步消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class DemoReceiveMessage { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 接收人的编号 | ||||||
|  |      */ | ||||||
|  |     private Long fromUserId; | ||||||
|  |     /** | ||||||
|  |      * 内容 | ||||||
|  |      */ | ||||||
|  |     private String text; | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 是否单聊 | ||||||
|  |      */ | ||||||
|  |     private Boolean single; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -0,0 +1,24 @@ | ||||||
|  | package cn.iocoder.yudao.module.infra.websocket.message; | ||||||
|  | 
 | ||||||
|  | import lombok.Data; | ||||||
|  | 
 | ||||||
|  | /** | ||||||
|  |  * 示例:client -> server 发送消息 | ||||||
|  |  * | ||||||
|  |  * @author 芋道源码 | ||||||
|  |  */ | ||||||
|  | @Data | ||||||
|  | public class DemoSendMessage { | ||||||
|  | 
 | ||||||
|  |     /** | ||||||
|  |      * 发送给谁 | ||||||
|  |      * | ||||||
|  |      * 如果为空,说明发送给所有人 | ||||||
|  |      */ | ||||||
|  |     private Long toUserId; | ||||||
|  |     /** | ||||||
|  |      * 内容 | ||||||
|  |      */ | ||||||
|  |     private String text; | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | @ -60,6 +60,21 @@ spring: | ||||||
| 
 | 
 | ||||||
| --- #################### MQ 消息队列相关配置 #################### | --- #################### MQ 消息队列相关配置 #################### | ||||||
| 
 | 
 | ||||||
|  | # rocketmq 配置项,对应 RocketMQProperties 配置类 | ||||||
|  | rocketmq: | ||||||
|  |   name-server: 127.0.0.1:9876 # RocketMQ Namesrv | ||||||
|  | 
 | ||||||
|  | spring: | ||||||
|  |   # RabbitMQ 配置项,对应 RabbitProperties 配置类 | ||||||
|  |   rabbitmq: | ||||||
|  |     host: 127.0.0.1 # RabbitMQ 服务的地址 | ||||||
|  |     port: 5672 # RabbitMQ 服务的端口 | ||||||
|  |     username: guest # RabbitMQ 服务的账号 | ||||||
|  |     password: guest # RabbitMQ 服务的密码 | ||||||
|  |   # Kafka 配置项,对应 KafkaProperties 配置类 | ||||||
|  |   kafka: | ||||||
|  |     bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 | ||||||
|  | 
 | ||||||
| --- #################### 定时任务相关配置 #################### | --- #################### 定时任务相关配置 #################### | ||||||
| xxl: | xxl: | ||||||
|   job: |   job: | ||||||
|  |  | ||||||
|  | @ -75,6 +75,21 @@ spring: | ||||||
| 
 | 
 | ||||||
| --- #################### MQ 消息队列相关配置 #################### | --- #################### MQ 消息队列相关配置 #################### | ||||||
| 
 | 
 | ||||||
|  | # rocketmq 配置项,对应 RocketMQProperties 配置类 | ||||||
|  | rocketmq: | ||||||
|  |   name-server: 127.0.0.1:9876 # RocketMQ Namesrv | ||||||
|  | 
 | ||||||
|  | spring: | ||||||
|  |   # RabbitMQ 配置项,对应 RabbitProperties 配置类 | ||||||
|  |   rabbitmq: | ||||||
|  |     host: 127.0.0.1 # RabbitMQ 服务的地址 | ||||||
|  |     port: 5672 # RabbitMQ 服务的端口 | ||||||
|  |     username: guest # RabbitMQ 服务的账号 | ||||||
|  |     password: guest # RabbitMQ 服务的密码 | ||||||
|  |   # Kafka 配置项,对应 KafkaProperties 配置类 | ||||||
|  |   kafka: | ||||||
|  |     bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 | ||||||
|  | 
 | ||||||
| --- #################### 定时任务相关配置 #################### | --- #################### 定时任务相关配置 #################### | ||||||
| xxl: | xxl: | ||||||
|   job: |   job: | ||||||
|  |  | ||||||
|  | @ -72,7 +72,31 @@ spring: | ||||||
| 
 | 
 | ||||||
| --- #################### RPC 远程调用相关配置 #################### | --- #################### RPC 远程调用相关配置 #################### | ||||||
| 
 | 
 | ||||||
| --- #################### MQ 消息队列相关配置 #################### | --- #################### 消息队列相关 #################### | ||||||
|  | 
 | ||||||
|  | # rocketmq 配置项,对应 RocketMQProperties 配置类 | ||||||
|  | rocketmq: | ||||||
|  |   # Producer 配置项 | ||||||
|  |   producer: | ||||||
|  |     group: ${spring.application.name}_PRODUCER # 生产者分组 | ||||||
|  | 
 | ||||||
|  | spring: | ||||||
|  |   # Kafka 配置项,对应 KafkaProperties 配置类 | ||||||
|  |   kafka: | ||||||
|  |     # Kafka Producer 配置项 | ||||||
|  |     producer: | ||||||
|  |       acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 | ||||||
|  |       retries: 3 # 发送失败时,重试发送的次数 | ||||||
|  |       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 | ||||||
|  |     # Kafka Consumer 配置项 | ||||||
|  |     consumer: | ||||||
|  |       auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 | ||||||
|  |       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer | ||||||
|  |       properties: | ||||||
|  |         spring.json.trusted.packages: '*' | ||||||
|  |     # Kafka Consumer Listener 监听器配置 | ||||||
|  |     listener: | ||||||
|  |       missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 | ||||||
| 
 | 
 | ||||||
| --- #################### 定时任务相关配置 #################### | --- #################### 定时任务相关配置 #################### | ||||||
| 
 | 
 | ||||||
|  | @ -92,6 +116,19 @@ yudao: | ||||||
|   web: |   web: | ||||||
|     admin-ui: |     admin-ui: | ||||||
|       url: http://dashboard.yudao.iocoder.cn # Admin 管理后台 UI 的地址 |       url: http://dashboard.yudao.iocoder.cn # Admin 管理后台 UI 的地址 | ||||||
|  |   websocket: | ||||||
|  |     enable: true # websocket的开关 | ||||||
|  |     path: /infra/ws # 路径 | ||||||
|  |     sender-type: redis # 消息发送的类型,可选值为 local、redis、rocketmq、kafka、rabbitmq | ||||||
|  |     sender-rocketmq: | ||||||
|  |       topic: ${spring.application.name}-websocket # 消息发送的 RocketMQ Topic | ||||||
|  |       consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 RocketMQ Consumer Group | ||||||
|  |     sender-rabbitmq: | ||||||
|  |       exchange: ${spring.application.name}-websocket-exchange # 消息发送的 RabbitMQ Exchange | ||||||
|  |       queue: ${spring.application.name}-websocket-queue # 消息发送的 RabbitMQ Queue | ||||||
|  |     sender-kafka: | ||||||
|  |       topic: ${spring.application.name}-websocket # 消息发送的 Kafka Topic | ||||||
|  |       consumer-group: ${spring.application.name}-websocket-consumer # 消息发送的 Kafka Consumer Group | ||||||
|   swagger: |   swagger: | ||||||
|     title: 管理后台 |     title: 管理后台 | ||||||
|     description: 提供管理员管理的所有功能 |     description: 提供管理员管理的所有功能 | ||||||
|  |  | ||||||
|  | @ -53,7 +53,8 @@ public class AppAuthController { | ||||||
|     @PermitAll |     @PermitAll | ||||||
|     @Operation(summary = "登出系统") |     @Operation(summary = "登出系统") | ||||||
|     public CommonResult<Boolean> logout(HttpServletRequest request) { |     public CommonResult<Boolean> logout(HttpServletRequest request) { | ||||||
|         String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader()); |         String token = SecurityFrameworkUtils.obtainAuthorization(request, | ||||||
|  |                 securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); | ||||||
|         if (StrUtil.isNotBlank(token)) { |         if (StrUtil.isNotBlank(token)) { | ||||||
|             authService.logout(token); |             authService.logout(token); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  | @ -7,6 +7,7 @@ import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; | ||||||
| import cn.iocoder.yudao.framework.common.pojo.CommonResult; | import cn.iocoder.yudao.framework.common.pojo.CommonResult; | ||||||
| import cn.iocoder.yudao.framework.operatelog.core.annotations.OperateLog; | import cn.iocoder.yudao.framework.operatelog.core.annotations.OperateLog; | ||||||
| import cn.iocoder.yudao.framework.security.config.SecurityProperties; | import cn.iocoder.yudao.framework.security.config.SecurityProperties; | ||||||
|  | import cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils; | ||||||
| import cn.iocoder.yudao.module.system.controller.admin.auth.vo.*; | import cn.iocoder.yudao.module.system.controller.admin.auth.vo.*; | ||||||
| import cn.iocoder.yudao.module.system.convert.auth.AuthConvert; | import cn.iocoder.yudao.module.system.convert.auth.AuthConvert; | ||||||
| import cn.iocoder.yudao.module.system.dal.dataobject.permission.MenuDO; | import cn.iocoder.yudao.module.system.dal.dataobject.permission.MenuDO; | ||||||
|  | @ -38,7 +39,6 @@ import java.util.Set; | ||||||
| import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; | import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; | ||||||
| import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; | import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; | ||||||
| import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.getLoginUserId; | import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.getLoginUserId; | ||||||
| import static cn.iocoder.yudao.framework.security.core.util.SecurityFrameworkUtils.obtainAuthorization; |  | ||||||
| 
 | 
 | ||||||
| @Tag(name = "管理后台 - 认证") | @Tag(name = "管理后台 - 认证") | ||||||
| @RestController | @RestController | ||||||
|  | @ -76,7 +76,8 @@ public class AuthController { | ||||||
|     @Operation(summary = "登出系统") |     @Operation(summary = "登出系统") | ||||||
|     @OperateLog(enable = false) // 避免 Post 请求被记录操作日志
 |     @OperateLog(enable = false) // 避免 Post 请求被记录操作日志
 | ||||||
|     public CommonResult<Boolean> logout(HttpServletRequest request) { |     public CommonResult<Boolean> logout(HttpServletRequest request) { | ||||||
|         String token = obtainAuthorization(request, securityProperties.getTokenHeader()); |         String token = SecurityFrameworkUtils.obtainAuthorization(request, | ||||||
|  |                 securityProperties.getTokenHeader(), securityProperties.getTokenParameter()); | ||||||
|         if (StrUtil.isNotBlank(token)) { |         if (StrUtil.isNotBlank(token)) { | ||||||
|             authService.logout(token, LoginLogTypeEnum.LOGOUT_SELF.getType()); |             authService.logout(token, LoginLogTypeEnum.LOGOUT_SELF.getType()); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  | @ -1,12 +1,16 @@ | ||||||
| package cn.iocoder.yudao.module.system.controller.admin.notice; | package cn.iocoder.yudao.module.system.controller.admin.notice; | ||||||
| 
 | 
 | ||||||
|  | import cn.hutool.core.lang.Assert; | ||||||
|  | import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; | ||||||
| import cn.iocoder.yudao.framework.common.pojo.CommonResult; | import cn.iocoder.yudao.framework.common.pojo.CommonResult; | ||||||
| import cn.iocoder.yudao.framework.common.pojo.PageResult; | import cn.iocoder.yudao.framework.common.pojo.PageResult; | ||||||
|  | import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; | ||||||
| import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO; | import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO; | ||||||
| import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO; | import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO; | ||||||
| import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO; | import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO; | ||||||
| import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO; | import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO; | ||||||
| import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert; | import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert; | ||||||
|  | import cn.iocoder.yudao.module.system.dal.dataobject.notice.NoticeDO; | ||||||
| import cn.iocoder.yudao.module.system.service.notice.NoticeService; | import cn.iocoder.yudao.module.system.service.notice.NoticeService; | ||||||
| import io.swagger.v3.oas.annotations.tags.Tag; | import io.swagger.v3.oas.annotations.tags.Tag; | ||||||
| import io.swagger.v3.oas.annotations.Parameter; | import io.swagger.v3.oas.annotations.Parameter; | ||||||
|  | @ -29,6 +33,9 @@ public class NoticeController { | ||||||
|     @Resource |     @Resource | ||||||
|     private NoticeService noticeService; |     private NoticeService noticeService; | ||||||
| 
 | 
 | ||||||
|  |     @Resource | ||||||
|  |     private WebSocketSenderApi webSocketSenderApi; | ||||||
|  | 
 | ||||||
|     @PostMapping("/create") |     @PostMapping("/create") | ||||||
|     @Operation(summary = "创建通知公告") |     @Operation(summary = "创建通知公告") | ||||||
|     @PreAuthorize("@ss.hasPermission('system:notice:create')") |     @PreAuthorize("@ss.hasPermission('system:notice:create')") | ||||||
|  | @ -69,4 +76,16 @@ public class NoticeController { | ||||||
|         return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id))); |         return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id))); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @PostMapping("/push") | ||||||
|  |     @Operation(summary = "推送通知公告", description = "只发送给 websocket 连接在线的用户") | ||||||
|  |     @Parameter(name = "id", description = "编号", required = true, example = "1024") | ||||||
|  |     @PreAuthorize("@ss.hasPermission('system:notice:update')") | ||||||
|  |     public CommonResult<Boolean> push(@RequestParam("id") Long id) { | ||||||
|  |         NoticeDO notice = noticeService.getNotice(id); | ||||||
|  |         Assert.notNull(notice, "公告不能为空"); | ||||||
|  |         // 通过 websocket 推送给在线的用户
 | ||||||
|  |         webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), "notice-push", notice); | ||||||
|  |         return success(true); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -1,10 +1,11 @@ | ||||||
| package cn.iocoder.yudao.module.system.framework.rpc.config; | package cn.iocoder.yudao.module.system.framework.rpc.config; | ||||||
| 
 | 
 | ||||||
| import cn.iocoder.yudao.module.infra.api.file.FileApi; | import cn.iocoder.yudao.module.infra.api.file.FileApi; | ||||||
|  | import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; | ||||||
| import org.springframework.cloud.openfeign.EnableFeignClients; | import org.springframework.cloud.openfeign.EnableFeignClients; | ||||||
| import org.springframework.context.annotation.Configuration; | import org.springframework.context.annotation.Configuration; | ||||||
| 
 | 
 | ||||||
| @Configuration(proxyBeanMethods = false) | @Configuration(proxyBeanMethods = false) | ||||||
| @EnableFeignClients(clients = FileApi.class) | @EnableFeignClients(clients = {FileApi.class, WebSocketSenderApi.class}) | ||||||
| public class RpcConfiguration { | public class RpcConfiguration { | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -72,8 +72,6 @@ spring: | ||||||
| 
 | 
 | ||||||
| --- #################### RPC 远程调用相关配置 #################### | --- #################### RPC 远程调用相关配置 #################### | ||||||
| 
 | 
 | ||||||
| --- #################### MQ 消息队列相关配置 #################### |  | ||||||
| 
 |  | ||||||
| --- #################### 消息队列相关 #################### | --- #################### 消息队列相关 #################### | ||||||
| 
 | 
 | ||||||
| # rocketmq 配置项,对应 RocketMQProperties 配置类 | # rocketmq 配置项,对应 RocketMQProperties 配置类 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue
	
	 YunaiV
						YunaiV