diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
index 54b22dc5f..d567c9634 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -21,6 +21,13 @@
cn.iocoder.cloud
yudao-spring-boot-starter-redis
+
+
+
+ com.alibaba.cloud
+
+ spring-cloud-starter-stream-rocketmq
+
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java
index a4b633512..406e3d0ba 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java
@@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.system.mq.consumer.permission;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
import cn.iocoder.yudao.module.system.service.permission.MenuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.function.Consumer;
/**
* 针对 {@link MenuRefreshMessage} 的消费者
@@ -15,15 +15,14 @@ import javax.annotation.Resource;
*/
@Component
@Slf4j
-public class MenuRefreshConsumer extends AbstractChannelMessageListener {
+public class MenuRefreshConsumer implements Consumer {
@Resource
private MenuService menuService;
@Override
- public void onMessage(MenuRefreshMessage message) {
+ public void accept(MenuRefreshMessage menuRefreshMessage) {
log.info("[onMessage][收到 Menu 刷新消息]");
menuService.initLocalCache();
}
-
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java
index bb53b7499..5acf367d3 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java
@@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.system.mq.consumer.permission;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
import cn.iocoder.yudao.module.system.service.permission.RoleService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.function.Consumer;
/**
* 针对 {@link RoleRefreshMessage} 的消费者
@@ -15,13 +15,13 @@ import javax.annotation.Resource;
*/
@Component
@Slf4j
-public class RoleRefreshConsumer extends AbstractChannelMessageListener {
+public class RoleRefreshConsumer implements Consumer {
@Resource
private RoleService roleService;
@Override
- public void onMessage(RoleRefreshMessage message) {
+ public void accept(RoleRefreshMessage message) {
log.info("[onMessage][收到 Role 刷新消息]");
roleService.initLocalCache();
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java
index e80d8f30c..a1cf81fc3 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleRefreshMessage.java
@@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.system.mq.message.permission;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import lombok.Data;
-import lombok.EqualsAndHashCode;
/**
* 角色数据刷新 Message
@@ -10,12 +8,5 @@ import lombok.EqualsAndHashCode;
* @author 芋道源码
*/
@Data
-@EqualsAndHashCode(callSuper = true)
-public class RoleRefreshMessage extends AbstractChannelMessage {
-
- @Override
- public String getChannel() {
- return "system.role.refresh";
- }
-
+public class RoleRefreshMessage {
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java
index 5764c872a..7f6939116 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java
@@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.system.mq.producer.permission;
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -13,14 +14,14 @@ import javax.annotation.Resource;
public class MenuProducer {
@Resource
- private RedisMQTemplate redisMQTemplate;
+ private StreamBridge streamBridge;
/**
* 发送 {@link MenuRefreshMessage} 消息
*/
public void sendMenuRefreshMessage() {
MenuRefreshMessage message = new MenuRefreshMessage();
- redisMQTemplate.send(message);
+ streamBridge.send("demo02-output", message);
}
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java
index c249d964e..def7e4157 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java
+++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java
@@ -2,6 +2,10 @@ package cn.iocoder.yudao.module.system.mq.producer.permission;
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.stream.function.StreamBridge;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@@ -15,14 +19,14 @@ import javax.annotation.Resource;
public class RoleProducer {
@Resource
- private RedisMQTemplate redisMQTemplate;
+ private StreamBridge streamBridge;
/**
* 发送 {@link RoleRefreshMessage} 消息
*/
public void sendRoleRefreshMessage() {
RoleRefreshMessage message = new RoleRefreshMessage();
- redisMQTemplate.send(message);
+ streamBridge.send("demo01-output", message);
}
}
diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
index 5598ef1a5..b0f39ab29 100644
--- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
+++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml
@@ -50,6 +50,49 @@ dubbo:
registry:
address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心
+--- #################### MQ 消息队列相关配置 ####################
+spring:
+ cloud:
+ # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
+ stream:
+ function:
+ definition: roleRefreshConsumer;menuRefreshConsumer;
+ # Binding 配置项,对应 BindingProperties Map
+ bindings:
+ demo01-output:
+ destination: TEST
+ roleRefreshConsumer-in-0:
+ destination: TEST
+ group: roleRefreshConsumer
+ demo02-output:
+ destination: TEST2
+ menuRefreshConsumer-in-0:
+ destination: TEST2
+ group: menuRefreshConsumer
+ # Spring Cloud Stream RocketMQ 配置项
+ rocketmq:
+ # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
+ binder:
+ name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
+ # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
+ bindings:
+ demo01-output:
+ # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
+ producer:
+ group: test # 生产者分组
+ sync: true # 是否同步发送消息,默认为 false 异步。
+ roleRefreshConsumer-in-0:
+ consumer:
+ message-model: BROADCASTING
+ demo02-output:
+ # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
+ producer:
+ group: test # 生产者分组
+ sync: true # 是否同步发送消息,默认为 false 异步。
+ menuRefreshConsumer-in-0:
+ consumer:
+ message-model: BROADCASTING
+
--- #################### 芋道相关配置 ####################
yudao: