集成 spring-cloud-starter-stream-rocketmq 组件

pull/4/head
YunaiV 2022-06-18 17:46:11 +08:00
parent d150a8333d
commit 6471c4641d
7 changed files with 66 additions and 21 deletions

View File

@ -21,6 +21,13 @@
<groupId>cn.iocoder.cloud</groupId> <groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-redis</artifactId> <artifactId>yudao-spring-boot-starter-redis</artifactId>
</dependency> </dependency>
<!-- MQ 相关 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.system.mq.consumer.permission; package cn.iocoder.yudao.module.system.mq.consumer.permission;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
import cn.iocoder.yudao.module.system.service.permission.MenuService; import cn.iocoder.yudao.module.system.service.permission.MenuService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.function.Consumer;
/** /**
* {@link MenuRefreshMessage} * {@link MenuRefreshMessage}
@ -15,15 +15,14 @@ import javax.annotation.Resource;
*/ */
@Component @Component
@Slf4j @Slf4j
public class MenuRefreshConsumer extends AbstractChannelMessageListener<MenuRefreshMessage> { public class MenuRefreshConsumer implements Consumer<MenuRefreshMessage> {
@Resource @Resource
private MenuService menuService; private MenuService menuService;
@Override @Override
public void onMessage(MenuRefreshMessage message) { public void accept(MenuRefreshMessage menuRefreshMessage) {
log.info("[onMessage][收到 Menu 刷新消息]"); log.info("[onMessage][收到 Menu 刷新消息]");
menuService.initLocalCache(); menuService.initLocalCache();
} }
} }

View File

@ -1,12 +1,12 @@
package cn.iocoder.yudao.module.system.mq.consumer.permission; package cn.iocoder.yudao.module.system.mq.consumer.permission;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
import cn.iocoder.yudao.module.system.service.permission.RoleService; import cn.iocoder.yudao.module.system.service.permission.RoleService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.function.Consumer;
/** /**
* {@link RoleRefreshMessage} * {@link RoleRefreshMessage}
@ -15,13 +15,13 @@ import javax.annotation.Resource;
*/ */
@Component @Component
@Slf4j @Slf4j
public class RoleRefreshConsumer extends AbstractChannelMessageListener<RoleRefreshMessage> { public class RoleRefreshConsumer implements Consumer<RoleRefreshMessage> {
@Resource @Resource
private RoleService roleService; private RoleService roleService;
@Override @Override
public void onMessage(RoleRefreshMessage message) { public void accept(RoleRefreshMessage message) {
log.info("[onMessage][收到 Role 刷新消息]"); log.info("[onMessage][收到 Role 刷新消息]");
roleService.initLocalCache(); roleService.initLocalCache();
} }

View File

@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.system.mq.message.permission; package cn.iocoder.yudao.module.system.mq.message.permission;
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
/** /**
* Message * Message
@ -10,12 +8,5 @@ import lombok.EqualsAndHashCode;
* @author * @author
*/ */
@Data @Data
@EqualsAndHashCode(callSuper = true) public class RoleRefreshMessage {
public class RoleRefreshMessage extends AbstractChannelMessage {
@Override
public String getChannel() {
return "system.role.refresh";
}
} }

View File

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.system.mq.producer.permission;
import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.MenuRefreshMessage;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -13,14 +14,14 @@ import javax.annotation.Resource;
public class MenuProducer { public class MenuProducer {
@Resource @Resource
private RedisMQTemplate redisMQTemplate; private StreamBridge streamBridge;
/** /**
* {@link MenuRefreshMessage} * {@link MenuRefreshMessage}
*/ */
public void sendMenuRefreshMessage() { public void sendMenuRefreshMessage() {
MenuRefreshMessage message = new MenuRefreshMessage(); MenuRefreshMessage message = new MenuRefreshMessage();
redisMQTemplate.send(message); streamBridge.send("demo02-output", message);
} }
} }

View File

@ -2,6 +2,10 @@ package cn.iocoder.yudao.module.system.mq.producer.permission;
import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage; import cn.iocoder.yudao.module.system.mq.message.permission.RoleRefreshMessage;
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -15,14 +19,14 @@ import javax.annotation.Resource;
public class RoleProducer { public class RoleProducer {
@Resource @Resource
private RedisMQTemplate redisMQTemplate; private StreamBridge streamBridge;
/** /**
* {@link RoleRefreshMessage} * {@link RoleRefreshMessage}
*/ */
public void sendRoleRefreshMessage() { public void sendRoleRefreshMessage() {
RoleRefreshMessage message = new RoleRefreshMessage(); RoleRefreshMessage message = new RoleRefreshMessage();
redisMQTemplate.send(message); streamBridge.send("demo01-output", message);
} }
} }

View File

@ -50,6 +50,49 @@ dubbo:
registry: registry:
address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心 address: spring-cloud://localhost # 设置使用 Spring Cloud 注册中心
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: roleRefreshConsumer;menuRefreshConsumer;
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: TEST
roleRefreshConsumer-in-0:
destination: TEST
group: roleRefreshConsumer
demo02-output:
destination: TEST2
menuRefreshConsumer-in-0:
destination: TEST2
group: menuRefreshConsumer
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
roleRefreshConsumer-in-0:
consumer:
message-model: BROADCASTING
demo02-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
menuRefreshConsumer-in-0:
consumer:
message-model: BROADCASTING
--- #################### 芋道相关配置 #################### --- #################### 芋道相关配置 ####################
yudao: yudao: