diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index d567c9634..49afa2a5e 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -12,7 +12,11 @@ jar ${project.artifactId} - 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费 + + 消息队列: + 1. 基于 Spring Cloud Stream 实现异步消息 + 2. 基于 Spring Cloud Bus 实现事件总线 + https://github.com/YunaiV/ruoyi-vue-pro @@ -28,6 +32,12 @@ spring-cloud-starter-stream-rocketmq + + + com.alibaba.cloud + + spring-cloud-starter-bus-rocketmq + diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java index 76e89358d..a3595f859 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java @@ -1,33 +1,19 @@ package cn.iocoder.yudao.framework.mq.config; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.StrUtil; -import cn.hutool.system.SystemUtil; -import cn.iocoder.yudao.framework.common.enums.DocumentEnum; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; +import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.data.redis.connection.RedisServerCommands; -import org.springframework.data.redis.connection.stream.Consumer; -import org.springframework.data.redis.connection.stream.ObjectRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.StreamOffset; -import org.springframework.data.redis.core.RedisCallback; -import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; -import org.springframework.data.redis.listener.ChannelTopic; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; -import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.messaging.converter.*; +import java.util.ArrayList; import java.util.List; -import java.util.Properties; /** * 消息队列配置类 @@ -48,96 +34,19 @@ public class YudaoMQAutoConfiguration { return redisMQTemplate; } - // ========== 消费者相关 ========== - /** - * 创建 Redis Pub/Sub 广播消费的容器 + * 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题 */ - @Bean - public RedisMessageListenerContainer redisMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { - // 创建 RedisMessageListenerContainer 对象 - RedisMessageListenerContainer container = new RedisMessageListenerContainer(); - // 设置 RedisConnection 工厂。 - container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory()); - // 添加监听器 - listeners.forEach(listener -> { - listener.setRedisMQTemplate(redisMQTemplate); - container.addMessageListener(listener, new ChannelTopic(listener.getChannel())); - log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", - listener.getChannel(), listener.getClass().getName()); - }); - return container; - } - - /** - * 创建 Redis Stream 集群消费的容器 - * - * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html - */ - @Bean(initMethod = "start", destroyMethod = "stop") - public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { - RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); - checkRedisVersion(redisTemplate); - // 第一步,创建 StreamMessageListenerContainer 容器 - // 创建 options 配置 - StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = - StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() - .batchSize(10) // 一次性最多拉取多少条消息 - .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化 - .build(); - // 创建 container 对象 - StreamMessageListenerContainer> container = -// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); - DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); - - // 第二步,注册监听器,消费对应的 Stream 主题 - String consumerName = buildConsumerName(); - listeners.parallelStream().forEach(listener -> { - // 创建 listener 对应的消费者分组 - try { - redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup()); - } catch (Exception ignore) {} - // 设置 listener 对应的 redisTemplate - listener.setRedisMQTemplate(redisMQTemplate); - // 创建 Consumer 对象 - Consumer consumer = Consumer.from(listener.getGroup(), consumerName); - // 设置 Consumer 消费进度,以最小消费进度为准 - StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()); - // 设置 Consumer 监听 - StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest - .builder(streamOffset).consumer(consumer) - .autoAcknowledge(false) // 不自动 ack - .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false - container.register(builder.build(), listener); - }); - return container; - } - - /** - * 构建消费者名字,使用本地 IP + 进程编号的方式。 - * 参考自 RocketMQ clientId 的实现 - * - * @return 消费者名字 - */ - private static String buildConsumerName() { - return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID()); - } - - /** - * 校验 Redis 版本号,是否满足最低的版本号要求! - */ - private static void checkRedisVersion(RedisTemplate redisTemplate) { - // 获得 Redis 版本 - Properties info = redisTemplate.execute((RedisCallback) RedisServerCommands::info); - String version = MapUtil.getStr(info, "redis_version"); - // 校验最低版本必须大于等于 5.0.0 - int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false)); - if (majorVersion < 5) { - throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" + - "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl())); - } + @Bean(RocketMQMessageConverter.DEFAULT_NAME) + @ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME }) + public CompositeMessageConverter rocketMQMessageConverter() { + List messageConverters = new ArrayList<>(); + ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter(); + byteArrayMessageConverter.setContentTypeResolver(null); + messageConverters.add(byteArrayMessageConverter); + messageConverters.add(new StringMessageConverter()); + messageConverters.add(new MappingJackson2MessageConverter()); + return new CompositeMessageConverter(messageConverters); } } diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java new file mode 100644 index 000000000..7a4686405 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java @@ -0,0 +1,41 @@ +package cn.iocoder.yudao.framework.mq.core.bus; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.bus.ServiceMatcher; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; +import org.springframework.context.ApplicationEventPublisher; + +import javax.annotation.Resource; + +/** + * 基于 Spring Cloud Bus 实现的 Producer 抽象类 + * + * @author 芋道源码 + */ +public abstract class AbstractBusProducer { + + @Resource + protected ApplicationEventPublisher applicationEventPublisher; + + @Resource + protected ServiceMatcher serviceMatcher; + + @Value("{spring.application.name}") + protected String applicationName; + + protected void publishEvent(RemoteApplicationEvent event) { + applicationEventPublisher.publishEvent(event); + } + + /** + * @return 只广播给自己服务的实例 + */ + protected String selfDestinationService() { + return applicationName + ":**"; + } + + protected String getBusId() { + return serviceMatcher.getBusId(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java deleted file mode 100644 index b4cf4c55e..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainerX.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.springframework.data.redis.stream; - -import cn.hutool.core.util.ReflectUtil; -import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.connection.stream.ByteRecord; -import org.springframework.data.redis.connection.stream.ReadOffset; -import org.springframework.data.redis.connection.stream.Record; -import org.springframework.util.Assert; - -import java.util.Collections; -import java.util.List; -import java.util.function.Function; - -/** - * 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。 - * 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006 - * 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽! - * - * @author 芋道源码 - */ -public class DefaultStreamMessageListenerContainerX> extends DefaultStreamMessageListenerContainer { - - /** - * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现 - */ - public static > StreamMessageListenerContainer create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions options) { - Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!"); - Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!"); - return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options); - } - - public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions containerOptions) { - super(connectionFactory, containerOptions); - } - - /** - * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现 - */ - @Override - public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { - return this.doRegisterX(getReadTaskX(streamRequest, listener)); - } - - @SuppressWarnings("unchecked") - private StreamPollTask getReadTaskX(StreamReadRequest streamRequest, StreamListener listener) { - StreamPollTask task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener); - // 修改 readFunction 方法 - Function> readFunction = (Function>) ReflectUtil.getFieldValue(task, "readFunction"); - ReflectUtil.setFieldValue(task, "readFunction", (Function>) readOffset -> { - List records = readFunction.apply(readOffset); - //【重点】保证 records 不是空,避免 NPE 的问题!!! - return records != null ? records : Collections.emptyList(); - }); - return task; - } - - private Subscription doRegisterX(Task task) { - return ReflectUtil.invoke(this, "doRegister", task); - } - -} - diff --git a/yudao-framework/yudao-spring-boot-starter-web/src/main/java/cn/iocoder/yudao/framework/web/config/YudaoWebAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-web/src/main/java/cn/iocoder/yudao/framework/web/config/YudaoWebAutoConfiguration.java index a7c5b7a53..e44379889 100644 --- a/yudao-framework/yudao-spring-boot-starter-web/src/main/java/cn/iocoder/yudao/framework/web/config/YudaoWebAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-web/src/main/java/cn/iocoder/yudao/framework/web/config/YudaoWebAutoConfiguration.java @@ -104,8 +104,8 @@ public class YudaoWebAutoConfiguration implements WebMvcConfigurer { * 创建 XssFilter Bean,解决 Xss 安全问题 */ @Bean - public FilterRegistrationBean xssFilter(XssProperties properties, PathMatcher pathMatcher) { - return createFilterBean(new XssFilter(properties, pathMatcher), WebFilterOrderEnum.XSS_FILTER); + public FilterRegistrationBean xssFilter(XssProperties properties, PathMatcher mvcPathMatcher) { + return createFilterBean(new XssFilter(properties, mvcPathMatcher), WebFilterOrderEnum.XSS_FILTER); } /** diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/auth/OAuth2ClientRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/auth/OAuth2ClientRefreshConsumer.java index fc765c425..52763e96e 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/auth/OAuth2ClientRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/auth/OAuth2ClientRefreshConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.auth; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage; import cn.iocoder.yudao.module.system.service.oauth2.OAuth2ClientService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class OAuth2ClientRefreshConsumer extends AbstractChannelMessageListener { +public class OAuth2ClientRefreshConsumer { @Resource private OAuth2ClientService oauth2ClientService; - @Override - public void onMessage(OAuth2ClientRefreshMessage message) { - log.info("[onMessage][收到 OAuth2Client 刷新消息]"); + @EventListener + public void execute(OAuth2ClientRefreshMessage message) { + log.info("[execute][收到 OAuth2Client 刷新消息]"); oauth2ClientService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/dept/DeptRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/dept/DeptRefreshConsumer.java index 981244d90..981a0b9f9 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/dept/DeptRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/dept/DeptRefreshConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.dept; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage; import cn.iocoder.yudao.module.system.service.dept.DeptService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class DeptRefreshConsumer extends AbstractChannelMessageListener { +public class DeptRefreshConsumer { @Resource private DeptService deptService; - @Override - public void onMessage(DeptRefreshMessage message) { - log.info("[onMessage][收到 Dept 刷新消息]"); + @EventListener + public void execute(DeptRefreshMessage message) { + log.info("[execute][收到 Dept 刷新消息]"); deptService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java index cf0373e05..6103022b4 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java @@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.MenuService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.function.Consumer; /** * 针对 {@link MenuRefreshMessage} 的消费者 @@ -15,14 +15,14 @@ import java.util.function.Consumer; */ @Component @Slf4j -public class MenuRefreshConsumer implements Consumer { +public class MenuRefreshConsumer { @Resource private MenuService menuService; - @Override - public void accept(MenuRefreshMessage menuRefreshMessage) { - log.info("[accept][收到 Menu 刷新消息]"); + @EventListener + public void execute(MenuRefreshMessage menuRefreshMessage) { + log.info("[execute][收到 Menu 刷新消息]"); menuService.initLocalCache(); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java index 1bce693f7..52b53606c 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java @@ -1,13 +1,12 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.PermissionService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.function.Consumer; /** * 针对 {@link RoleMenuRefreshMessage} 的消费者 @@ -16,14 +15,14 @@ import java.util.function.Consumer; */ @Component @Slf4j -public class RoleMenuRefreshConsumer implements Consumer { +public class RoleMenuRefreshConsumer { @Resource private PermissionService permissionService; - @Override - public void accept(RoleMenuRefreshMessage roleMenuRefreshMessage) { - log.info("[accept][收到 Role 与 Menu 的关联刷新消息]"); + @EventListener + public void execute(RoleMenuRefreshMessage roleMenuRefreshMessage) { + log.info("[execute][收到 Role 与 Menu 的关联刷新消息]"); permissionService.initLocalCache(); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java index 8c744397b..fa8b48f5e 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java @@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.RoleService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.function.Consumer; /** * 针对 {@link RoleRefreshMessage} 的消费者 @@ -15,14 +15,14 @@ import java.util.function.Consumer; */ @Component @Slf4j -public class RoleRefreshConsumer implements Consumer { +public class RoleRefreshConsumer { @Resource private RoleService roleService; - @Override - public void accept(RoleRefreshMessage message) { - log.info("[accept][收到 Role 刷新消息]"); + @EventListener + public void execute(RoleRefreshMessage message) { + log.info("[execute][收到 Role 刷新消息]"); roleService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java index f5d44b36f..e1426797a 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java @@ -3,10 +3,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.PermissionService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.function.Consumer; /** * 针对 {@link UserRoleRefreshMessage} 的消费者 @@ -15,14 +15,14 @@ import java.util.function.Consumer; */ @Component @Slf4j -public class UserRoleRefreshConsumer implements Consumer { +public class UserRoleRefreshConsumer { @Resource private PermissionService permissionService; - @Override - public void accept(UserRoleRefreshMessage userRoleRefreshMessage) { - log.info("[accept][收到 User 与 Role 的关联刷新消息]"); + @EventListener + public void execute(UserRoleRefreshMessage userRoleRefreshMessage) { + log.info("[execute][收到 User 与 Role 的关联刷新消息]"); permissionService.initLocalCache(); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sensitiveword/SensitiveWordRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sensitiveword/SensitiveWordRefreshConsumer.java index dc3a06236..9b5310b98 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sensitiveword/SensitiveWordRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sensitiveword/SensitiveWordRefreshConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.sensitiveword; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage; import cn.iocoder.yudao.module.system.service.sensitiveword.SensitiveWordService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class SensitiveWordRefreshConsumer extends AbstractChannelMessageListener { +public class SensitiveWordRefreshConsumer { @Resource private SensitiveWordService sensitiveWordService; - @Override - public void onMessage(SensitiveWordRefreshMessage message) { - log.info("[onMessage][收到 SensitiveWord 刷新消息]"); + @EventListener + public void execute(SensitiveWordRefreshMessage message) { + log.info("[execute][收到 SensitiveWord 刷新消息]"); sensitiveWordService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsChannelRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsChannelRefreshConsumer.java index cc5d83d1e..35c5eb4e0 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsChannelRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsChannelRefreshConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.sms; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage; import cn.iocoder.yudao.module.system.service.sms.SmsChannelService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class SmsChannelRefreshConsumer extends AbstractChannelMessageListener { +public class SmsChannelRefreshConsumer { @Resource private SmsChannelService smsChannelService; - @Override - public void onMessage(SmsChannelRefreshMessage message) { - log.info("[onMessage][收到 SmsChannel 刷新消息]"); + @EventListener + public void execute(SmsChannelRefreshMessage message) { + log.info("[execute][收到 SmsChannel 刷新消息]"); smsChannelService.initSmsClients(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsTemplateRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsTemplateRefreshConsumer.java index 02bc59984..737654d31 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsTemplateRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsTemplateRefreshConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.sms; import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.service.sms.SmsTemplateService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,14 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class SmsTemplateRefreshConsumer extends AbstractChannelMessageListener { +public class SmsTemplateRefreshConsumer { @Resource private SmsTemplateService smsTemplateService; - @Override - public void onMessage(SmsTemplateRefreshMessage message) { - log.info("[onMessage][收到 SmsTemplate 刷新消息]"); + @EventListener + public void execute(SmsTemplateRefreshMessage message) { + log.info("[execute][收到 SmsTemplate 刷新消息]"); smsTemplateService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/auth/OAuth2ClientRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/auth/OAuth2ClientRefreshMessage.java index 3d18df150..0285664b1 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/auth/OAuth2ClientRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/auth/OAuth2ClientRefreshMessage.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.system.mq.message.auth; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * OAuth 2.0 客户端的数据刷新 Message @@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode; */ @Data @EqualsAndHashCode(callSuper = true) -public class OAuth2ClientRefreshMessage extends AbstractChannelMessage { +public class OAuth2ClientRefreshMessage extends RemoteApplicationEvent { - @Override - public String getChannel() { - return "system.oauth2-client.refresh"; + public OAuth2ClientRefreshMessage() { + } + + public OAuth2ClientRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/dept/DeptRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/dept/DeptRefreshMessage.java index 80d3c8c39..b94e6b81f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/dept/DeptRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/dept/DeptRefreshMessage.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.system.mq.message.dept; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 部门数据刷新 Message @@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode; */ @Data @EqualsAndHashCode(callSuper = true) -public class DeptRefreshMessage extends AbstractChannelMessage { +public class DeptRefreshMessage extends RemoteApplicationEvent { - @Override - public String getChannel() { - return "system.dept.refresh"; + public DeptRefreshMessage() { + } + + public DeptRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java index 4805c4bae..e03e26f08 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java @@ -1,14 +1,19 @@ package cn.iocoder.yudao.module.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; -import lombok.Data; -import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 菜单数据刷新 Message * * @author 芋道源码 */ -@Data -public class MenuRefreshMessage { +public class MenuRefreshMessage extends RemoteApplicationEvent { + + public MenuRefreshMessage() { + } + + public MenuRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); + } + } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java index 947b4cd18..d2756c2cf 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.module.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; -import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 角色与菜单数据刷新 Message @@ -10,5 +9,13 @@ import lombok.EqualsAndHashCode; * @author 芋道源码 */ @Data -public class RoleMenuRefreshMessage { +public class RoleMenuRefreshMessage extends RemoteApplicationEvent { + + public RoleMenuRefreshMessage() { + } + + public RoleMenuRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); + } + } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java index a1cf81fc3..8078cab47 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.system.mq.message.permission; import lombok.Data; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 角色数据刷新 Message @@ -8,5 +9,13 @@ import lombok.Data; * @author 芋道源码 */ @Data -public class RoleRefreshMessage { +public class RoleRefreshMessage extends RemoteApplicationEvent { + + public RoleRefreshMessage() { + } + + public RoleRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); + } + } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java index 8b6566d6f..a675d5187 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java @@ -1,6 +1,7 @@ package cn.iocoder.yudao.module.system.mq.message.permission; import lombok.Data; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 用户与角色的数据刷新 Message @@ -8,5 +9,13 @@ import lombok.Data; * @author 芋道源码 */ @Data -public class UserRoleRefreshMessage { +public class UserRoleRefreshMessage extends RemoteApplicationEvent { + + public UserRoleRefreshMessage() { + } + + public UserRoleRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); + } + } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sensitiveword/SensitiveWordRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sensitiveword/SensitiveWordRefreshMessage.java index 13ebf425f..3dcb3fb7d 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sensitiveword/SensitiveWordRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sensitiveword/SensitiveWordRefreshMessage.java @@ -1,19 +1,21 @@ package cn.iocoder.yudao.module.system.mq.message.sensitiveword; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 敏感词的刷新 Message */ @Data @EqualsAndHashCode(callSuper = true) -public class SensitiveWordRefreshMessage extends AbstractChannelMessage { +public class SensitiveWordRefreshMessage extends RemoteApplicationEvent { - @Override - public String getChannel() { - return "system.sensitive-word.refresh"; + public SensitiveWordRefreshMessage() { + } + + public SensitiveWordRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsChannelRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsChannelRefreshMessage.java index 0841cdee0..11b18aac5 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsChannelRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsChannelRefreshMessage.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.system.mq.message.sms; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 短信渠道的数据刷新 Message @@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode; */ @Data @EqualsAndHashCode(callSuper = true) -public class SmsChannelRefreshMessage extends AbstractChannelMessage { +public class SmsChannelRefreshMessage extends RemoteApplicationEvent { - @Override - public String getChannel() { - return "system.sms-channel.refresh"; + public SmsChannelRefreshMessage() { + } + + public SmsChannelRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsTemplateRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsTemplateRefreshMessage.java index 4873c06c4..edcc44214 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsTemplateRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsTemplateRefreshMessage.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.system.mq.message.sms; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; import lombok.EqualsAndHashCode; +import org.springframework.cloud.bus.event.RemoteApplicationEvent; /** * 短信模板的数据刷新 Message @@ -11,11 +11,13 @@ import lombok.EqualsAndHashCode; */ @Data @EqualsAndHashCode(callSuper = true) -public class SmsTemplateRefreshMessage extends AbstractChannelMessage { +public class SmsTemplateRefreshMessage extends RemoteApplicationEvent { - @Override - public String getChannel() { - return "system.sms-template.refresh"; + public SmsTemplateRefreshMessage() { + } + + public SmsTemplateRefreshMessage(Object source, String originService, String destinationService) { + super(source, originService, destinationService); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/auth/OAuth2ClientProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/auth/OAuth2ClientProducer.java index 1a849efc6..6d45d43e2 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/auth/OAuth2ClientProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/auth/OAuth2ClientProducer.java @@ -1,26 +1,20 @@ package cn.iocoder.yudao.module.system.mq.producer.auth; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.auth.OAuth2ClientRefreshMessage; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * OAuth 2.0 客户端相关消息的 Producer */ @Component -public class OAuth2ClientProducer { - - @Resource - private RedisMQTemplate redisMQTemplate; +public class OAuth2ClientProducer extends AbstractBusProducer { /** * 发送 {@link OAuth2ClientRefreshMessage} 消息 */ public void sendOAuth2ClientRefreshMessage() { - OAuth2ClientRefreshMessage message = new OAuth2ClientRefreshMessage(); - redisMQTemplate.send(message); + publishEvent(new OAuth2ClientRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/dept/DeptProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/dept/DeptProducer.java index 9a2ca1b9c..4b7c1493e 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/dept/DeptProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/dept/DeptProducer.java @@ -1,26 +1,20 @@ package cn.iocoder.yudao.module.system.mq.producer.dept; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.dept.DeptRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * Dept 部门相关消息的 Producer */ @Component -public class DeptProducer { - - @Resource - private RedisMQTemplate redisMQTemplate; +public class DeptProducer extends AbstractBusProducer { /** * 发送 {@link DeptRefreshMessage} 消息 */ public void sendDeptRefreshMessage() { - DeptRefreshMessage message = new DeptRefreshMessage(); - redisMQTemplate.send(message); + publishEvent(new DeptRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java index ca093c029..598a656b1 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java @@ -1,27 +1,20 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * Menu 菜单相关消息的 Producer */ @Component -public class MenuProducer { - - @Resource - private StreamBridge streamBridge; +public class MenuProducer extends AbstractBusProducer { /** * 发送 {@link MenuRefreshMessage} 消息 */ public void sendMenuRefreshMessage() { - MenuRefreshMessage message = new MenuRefreshMessage(); - streamBridge.send("menuRefresh-out-0", message); + publishEvent(new MenuRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java index 3ce7c52a2..5b726a968 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java @@ -1,36 +1,28 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; -import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * Permission 权限相关消息的 Producer */ @Component -public class PermissionProducer { - - @Resource - private StreamBridge streamBridge; +public class PermissionProducer extends AbstractBusProducer { /** * 发送 {@link RoleMenuRefreshMessage} 消息 */ public void sendRoleMenuRefreshMessage() { - RoleMenuRefreshMessage message = new RoleMenuRefreshMessage(); - streamBridge.send("roleMenuRefresh-out-0", message); + publishEvent(new RoleMenuRefreshMessage(this, getBusId(), selfDestinationService())); } /** * 发送 {@link UserRoleRefreshMessage} 消息 */ public void sendUserRoleRefreshMessage() { - UserRoleRefreshMessage message = new UserRoleRefreshMessage(); - streamBridge.send("userRoleRefresh-out-0", message); + publishEvent(new UserRoleRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java index 0c43fa47f..e157ef625 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java @@ -1,32 +1,22 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * Role 角色相关消息的 Producer * * @author 芋道源码 */ @Component -public class RoleProducer { - - @Resource - private StreamBridge streamBridge; +public class RoleProducer extends AbstractBusProducer { /** * 发送 {@link RoleRefreshMessage} 消息 */ public void sendRoleRefreshMessage() { - RoleRefreshMessage message = new RoleRefreshMessage(); - streamBridge.send("roleRefresh-out-0", message); + publishEvent(new RoleRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sensitiveword/SensitiveWordProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sensitiveword/SensitiveWordProducer.java index 3c43eca3b..c148865da 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sensitiveword/SensitiveWordProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sensitiveword/SensitiveWordProducer.java @@ -1,26 +1,21 @@ package cn.iocoder.yudao.module.system.mq.producer.sensitiveword; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.sensitiveword.SensitiveWordRefreshMessage; import org.springframework.stereotype.Component; -import javax.annotation.Resource; - /** * 敏感词相关的 Producer */ @Component -public class SensitiveWordProducer { - - @Resource - private RedisMQTemplate redisMQTemplate; +public class SensitiveWordProducer extends AbstractBusProducer {{ +} /** * 发送 {@link SensitiveWordRefreshMessage} 消息 */ public void sendSensitiveWordRefreshMessage() { - SensitiveWordRefreshMessage message = new SensitiveWordRefreshMessage(); - redisMQTemplate.send(message); + publishEvent(new SensitiveWordRefreshMessage(this, getBusId(), selfDestinationService())); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java index e14fb953b..1bcac4be3 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java @@ -1,10 +1,11 @@ package cn.iocoder.yudao.module.system.mq.producer.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; +import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer; import cn.iocoder.yudao.module.system.mq.message.sms.SmsChannelRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage; import cn.iocoder.yudao.module.system.mq.message.sms.SmsTemplateRefreshMessage; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -19,7 +20,7 @@ import java.util.List; */ @Slf4j @Component -public class SmsProducer { +public class SmsProducer extends AbstractBusProducer { @Resource private RedisMQTemplate redisMQTemplate; @@ -28,16 +29,14 @@ public class SmsProducer { * 发送 {@link SmsChannelRefreshMessage} 消息 */ public void sendSmsChannelRefreshMessage() { - SmsChannelRefreshMessage message = new SmsChannelRefreshMessage(); - redisMQTemplate.send(message); + publishEvent(new SmsChannelRefreshMessage(this, getBusId(), selfDestinationService())); } /** * 发送 {@link SmsTemplateRefreshMessage} 消息 */ public void sendSmsTemplateRefreshMessage() { - SmsTemplateRefreshMessage message = new SmsTemplateRefreshMessage(); - redisMQTemplate.send(message); + publishEvent(new SmsTemplateRefreshMessage(this, getBusId(), selfDestinationService())); } /** diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml index 8be1339e8..0003cb5e6 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml +++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml @@ -51,12 +51,13 @@ dubbo: address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心 --- #################### MQ 消息队列相关配置 #################### + spring: cloud: # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类 stream: - function: - definition: roleRefreshConsumer;menuRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer; +# function: +# definition: roleRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer; # Binding 配置项,对应 BindingProperties Map bindings: roleRefresh-out-0: @@ -64,21 +65,6 @@ spring: roleRefreshConsumer-in-0: destination: system_role_refresh group: system_role_refresh_consumer_group - menuRefresh-out-0: - destination: system_menu_refresh - menuRefreshConsumer-in-0: - destination: system_menu_refresh - group: system_menu_refresh_consumer_group - roleMenuRefresh-out-0: - destination: system_role_menu_refresh - roleMenuRefreshConsumer-in-0: - destination: system_role_menu_refresh - group: system_role_menu_refresh_consumer_group - userRoleRefresh-out-0: - destination: system_user_role_refresh - userRoleRefreshConsumer-in-0: - destination: system_user_role_refresh - group: system_user_role_refresh_consumer_group # Spring Cloud Stream RocketMQ 配置项 rocketmq: # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类 @@ -88,20 +74,12 @@ spring: producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 group: system_producer_group # 生产者分组 send-type: SYNC # 发送模式,SYNC 同步 - # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map - bindings: - roleRefreshConsumer-in-0: - consumer: - message-model: BROADCASTING # 广播消费 - menuRefreshConsumer-in-0: - consumer: - message-model: BROADCASTING # 广播消费 - roleMenuRefreshConsumer-in-0: - consumer: - message-model: BROADCASTING # 广播消费 - userRoleRefreshConsumer-in-0: - consumer: - message-model: BROADCASTING # 广播消费 + + # Spring Cloud Bus 配置项,对应 BusProperties 类 + bus: + enabled: true # 是否开启,默认为 true + id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式 + destination: springCloudBus2 # 目标消息队列,默认为 springCloudBus --- #################### 芋道相关配置 ####################