messages = context.getMsgList();
+ Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
+ // 设置租户编号
+ String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
+ if (StrUtil.isNotEmpty(tenantId)) {
+ TenantContextHolder.setTenantId(Long.parseLong(tenantId));
+ }
+ }
+
+ @Override
+ public void consumeMessageAfter(ConsumeMessageContext context) {
+ TenantContextHolder.clear();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
new file mode 100644
index 000000000..7f12ac520
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * 多租户的 RocketMQ 初始化器
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQInitializer implements BeanPostProcessor {
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof DefaultRocketMQListenerContainer) {
+ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
+ initTenantConsumer(container.getConsumer());
+ } else if (bean instanceof RocketMQTemplate) {
+ RocketMQTemplate template = (RocketMQTemplate) bean;
+ initTenantProducer(template.getProducer());
+ }
+ return bean;
+ }
+
+ private void initTenantProducer(DefaultMQProducer producer) {
+ if (producer == null) {
+ return;
+ }
+ DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
+ if (producerImpl == null) {
+ return;
+ }
+ producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
+ }
+
+ private void initTenantConsumer(DefaultMQPushConsumer consumer) {
+ if (consumer == null) {
+ return;
+ }
+ DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
+ if (consumerImpl == null) {
+ return;
+ }
+ consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
new file mode 100644
index 000000000..4f0307465
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
+ *
+ * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQSendMessageHook implements SendMessageHook {
+
+ @Override
+ public String hookName() {
+ return getClass().getSimpleName();
+ }
+
+ @Override
+ public void sendMessageBefore(SendMessageContext sendMessageContext) {
+ Long tenantId = TenantContextHolder.getTenantId();
+ if (tenantId == null) {
+ return;
+ }
+ sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
+ }
+
+ @Override
+ public void sendMessageAfter(SendMessageContext sendMessageContext) {
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
new file mode 100644
index 000000000..059d8f97f
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2002-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.handler.invocation;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import org.springframework.core.DefaultParameterNameDiscoverer;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ParameterNameDiscoverer;
+import org.springframework.core.ResolvableType;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.HandlerMethod;
+import org.springframework.util.ObjectUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Extension of {@link HandlerMethod} that invokes the underlying method with
+ * argument values resolved from the current HTTP request through a list of
+ * {@link HandlerMethodArgumentResolver}.
+ *
+ * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中
+ * TODO 芋艿:持续跟进,看看有没新的拓展点
+ *
+ * @author Rossen Stoyanchev
+ * @author Juergen Hoeller
+ * @since 4.0
+ */
+public class InvocableHandlerMethod extends HandlerMethod {
+
+ private static final Object[] EMPTY_ARGS = new Object[0];
+
+ private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
+
+ private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
+
+ /**
+ * Create an instance from a {@code HandlerMethod}.
+ */
+ public InvocableHandlerMethod(HandlerMethod handlerMethod) {
+ super(handlerMethod);
+ }
+
+ /**
+ * Create an instance from a bean instance and a method.
+ */
+ public InvocableHandlerMethod(Object bean, Method method) {
+ super(bean, method);
+ }
+
+ /**
+ * Construct a new handler method with the given bean instance, method name and parameters.
+ * @param bean the object bean
+ * @param methodName the method name
+ * @param parameterTypes the method parameter types
+ * @throws NoSuchMethodException when the method cannot be found
+ */
+ public InvocableHandlerMethod(Object bean, String methodName, Class>... parameterTypes)
+ throws NoSuchMethodException {
+
+ super(bean, methodName, parameterTypes);
+ }
+
+ /**
+ * Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
+ */
+ public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
+ this.resolvers = argumentResolvers;
+ }
+
+ /**
+ * Set the ParameterNameDiscoverer for resolving parameter names when needed
+ * (e.g. default request attribute name).
+ * Default is a {@link DefaultParameterNameDiscoverer}.
+ */
+ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
+ this.parameterNameDiscoverer = parameterNameDiscoverer;
+ }
+
+ /**
+ * Invoke the method after resolving its argument values in the context of the given message.
+ *
Argument values are commonly resolved through
+ * {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
+ * The {@code providedArgs} parameter however may supply argument values to be used directly,
+ * i.e. without argument resolution.
+ *
Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
+ * resolved arguments.
+ * @param message the current message being processed
+ * @param providedArgs "given" arguments matched by type, not resolved
+ * @return the raw value returned by the invoked method
+ * @throws Exception raised if no suitable argument resolver can be found,
+ * or if the method raised an exception
+ * @see #getMethodArgumentValues
+ * @see #doInvoke
+ */
+ @Nullable
+ public Object invoke(Message> message, Object... providedArgs) throws Exception {
+ Object[] args = getMethodArgumentValues(message, providedArgs);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Arguments: " + Arrays.toString(args));
+ }
+ // 注意:如下是本类的改动点!!!
+ // 情况一:无租户编号的情况
+ Long tenantId= parseTenantId(message);
+ if (tenantId == null) {
+ return doInvoke(args);
+ }
+ // 情况二:有租户的情况下
+ return TenantUtils.execute(tenantId, () -> doInvoke(args));
+ }
+
+ private Long parseTenantId(Message> message) {
+ Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
+ if (tenantId == null) {
+ return null;
+ }
+ if (tenantId instanceof Long) {
+ return (Long) tenantId;
+ }
+ if (tenantId instanceof Number) {
+ return ((Number) tenantId).longValue();
+ }
+ if (tenantId instanceof String) {
+ return Long.parseLong((String) tenantId);
+ }
+ if (tenantId instanceof byte[]) {
+ return Long.parseLong(new String((byte[]) tenantId));
+ }
+ throw new IllegalArgumentException("未知的数据类型:" + tenantId);
+ }
+
+ /**
+ * Get the method argument values for the current message, checking the provided
+ * argument values and falling back to the configured argument resolvers.
+ *
The resulting array will be passed into {@link #doInvoke}.
+ * @since 5.1.2
+ */
+ protected Object[] getMethodArgumentValues(Message> message, Object... providedArgs) throws Exception {
+ MethodParameter[] parameters = getMethodParameters();
+ if (ObjectUtils.isEmpty(parameters)) {
+ return EMPTY_ARGS;
+ }
+
+ Object[] args = new Object[parameters.length];
+ for (int i = 0; i < parameters.length; i++) {
+ MethodParameter parameter = parameters[i];
+ parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
+ args[i] = findProvidedArgument(parameter, providedArgs);
+ if (args[i] != null) {
+ continue;
+ }
+ if (!this.resolvers.supportsParameter(parameter)) {
+ throw new MethodArgumentResolutionException(
+ message, parameter, formatArgumentError(parameter, "No suitable resolver"));
+ }
+ try {
+ args[i] = this.resolvers.resolveArgument(parameter, message);
+ }
+ catch (Exception ex) {
+ // Leave stack trace for later, exception may actually be resolved and handled...
+ if (logger.isDebugEnabled()) {
+ String exMsg = ex.getMessage();
+ if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
+ logger.debug(formatArgumentError(parameter, exMsg));
+ }
+ }
+ throw ex;
+ }
+ }
+ return args;
+ }
+
+ /**
+ * Invoke the handler method with the given argument values.
+ */
+ @Nullable
+ protected Object doInvoke(Object... args) throws Exception {
+ try {
+ return getBridgedMethod().invoke(getBean(), args);
+ }
+ catch (IllegalArgumentException ex) {
+ assertTargetBean(getBridgedMethod(), getBean(), args);
+ String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
+ throw new IllegalStateException(formatInvokeError(text, args), ex);
+ }
+ catch (InvocationTargetException ex) {
+ // Unwrap for HandlerExceptionResolvers ...
+ Throwable targetException = ex.getTargetException();
+ if (targetException instanceof RuntimeException) {
+ throw (RuntimeException) targetException;
+ }
+ else if (targetException instanceof Error) {
+ throw (Error) targetException;
+ }
+ else if (targetException instanceof Exception) {
+ throw (Exception) targetException;
+ }
+ else {
+ throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
+ }
+ }
+ }
+
+ MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
+ return new AsyncResultMethodParameter(returnValue);
+ }
+
+ private class AsyncResultMethodParameter extends HandlerMethodParameter {
+
+ @Nullable
+ private final Object returnValue;
+
+ private final ResolvableType returnType;
+
+ public AsyncResultMethodParameter(@Nullable Object returnValue) {
+ super(-1);
+ this.returnValue = returnValue;
+ this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
+ }
+
+ protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
+ super(original);
+ this.returnValue = original.returnValue;
+ this.returnType = original.returnType;
+ }
+
+ @Override
+ public Class> getParameterType() {
+ if (this.returnValue != null) {
+ return this.returnValue.getClass();
+ }
+ if (!ResolvableType.NONE.equals(this.returnType)) {
+ return this.returnType.toClass();
+ }
+ return super.getParameterType();
+ }
+
+ @Override
+ public Type getGenericParameterType() {
+ return this.returnType.getType();
+ }
+
+ @Override
+ public AsyncResultMethodParameter clone() {
+ return new AsyncResultMethodParameter(this);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..a495842a0
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.env.EnvironmentPostProcessor=\
+ cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 1a18bae89..26f472e4d 100644
--- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1,3 +1,2 @@
cn.iocoder.yudao.framework.tenant.config.YudaoTenantRpcAutoConfiguration
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
-cn.iocoder.yudao.framework.tenant.config.YudaoTenantMQAutoConfiguration
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
index 5a6b4b1ec..504d2711c 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -12,25 +12,31 @@
jar
${project.artifactId}
-
- 消息队列:
- 1. 基于 Spring Cloud Stream 实现异步消息
- 2. 基于 Spring Cloud Bus 实现事件总线
-
+ 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
https://github.com/YunaiV/ruoyi-vue-pro
-
+
- com.alibaba.cloud
-
- spring-cloud-starter-stream-rocketmq
+ cn.iocoder.cloud
+ yudao-spring-boot-starter-redis
+
- com.alibaba.cloud
-
- spring-cloud-starter-bus-rocketmq
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+ org.springframework.amqp
+ spring-rabbit
+ true
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ true
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
deleted file mode 100644
index 32ff82c8c..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package cn.iocoder.yudao.framework.mq.config;
-
-import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
-import org.springframework.boot.autoconfigure.AutoConfiguration;
-import org.springframework.boot.autoconfigure.AutoConfigureAfter;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.messaging.converter.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * 消息队列配置类
- *
- * @author 芋道源码
- */
-@AutoConfiguration
-public class YudaoMQAutoConfiguration {
-
- /**
- * 覆盖 {@link RocketMQMessageConverter} 的配置,去掉 fastjson 的转换器,解决不兼容的问题
- */
- @Bean(RocketMQMessageConverter.DEFAULT_NAME)
- @ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
- public CompositeMessageConverter rocketMQMessageConverter() {
- List messageConverters = new ArrayList<>();
- ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
- byteArrayMessageConverter.setContentTypeResolver(null);
- messageConverters.add(byteArrayMessageConverter);
- messageConverters.add(new StringMessageConverter());
- messageConverters.add(new MappingJackson2MessageConverter());
- return new CompositeMessageConverter(messageConverters);
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java
deleted file mode 100644
index 9c85299fa..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/bus/AbstractBusProducer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.bus;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.cloud.bus.ServiceMatcher;
-import org.springframework.cloud.bus.event.RemoteApplicationEvent;
-import org.springframework.context.ApplicationEventPublisher;
-
-import javax.annotation.Resource;
-
-/**
- * 基于 Spring Cloud Bus 实现的 Producer 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractBusProducer {
-
- @Resource
- protected ApplicationEventPublisher applicationEventPublisher;
-
- @Resource
- protected ServiceMatcher serviceMatcher;
-
- @Value("${spring.application.name}")
- protected String applicationName;
-
- protected void publishEvent(RemoteApplicationEvent event) {
- applicationEventPublisher.publishEvent(event);
- }
-
- /**
- * @return 只广播给自己服务的实例
- */
- protected String selfDestinationService() {
- return applicationName + ":**";
- }
-
- protected String getBusId() {
- return serviceMatcher.getBusId();
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java
deleted file mode 100644
index 9953ae6e0..000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * TODO 芋艿,后续删除,临时占位
- */
-package cn.iocoder.yudao.framework.mq.core;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
index 48eaf2386..3b716cb77 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
@@ -1,6 +1,4 @@
/**
- * 消息队列,基于 Redis 提供:
- * 1. 基于 Pub/Sub 实现广播消费
- * 2. 基于 Stream 实现集群消费
+ * 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
*/
package cn.iocoder.yudao.framework.mq;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java
new file mode 100644
index 000000000..770c50ff7
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java
@@ -0,0 +1,29 @@
+package cn.iocoder.yudao.framework.mq.rabbitmq.config;
+
+import cn.hutool.core.util.ReflectUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.utils.SerializationUtils;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+
+import java.lang.reflect.Field;
+
+/**
+ * RabbitMQ 消息队列配置类
+ *
+ * @author 芋道源码
+ */
+@AutoConfiguration
+@Slf4j
+@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
+public class YudaoRabbitMQAutoConfiguration {
+
+ static {
+ // 强制设置 SerializationUtils 的 TRUST_ALL 为 true,避免 RabbitMQ Consumer 反序列化消息报错
+ // 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后
+ Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL");
+ ReflectUtil.removeFinalModify(trustAllField);
+ ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true);
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java
new file mode 100644
index 000000000..2773b5828
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * 占位符,无特殊逻辑
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq.core;
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java
new file mode 100644
index 000000000..9f6032c92
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * 消息队列,基于 RabbitMQ 提供
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
new file mode 100644
index 000000000..bbc63b719
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
@@ -0,0 +1,164 @@
+package cn.iocoder.yudao.framework.mq.redis.config;
+
+import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
+import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.data.redis.connection.RedisServerCommands;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.data.redis.listener.ChannelTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.stream.StreamMessageListenerContainer;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * 消息队列配置类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
+@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
+public class YudaoRedisMQAutoConfiguration {
+
+ @Bean
+ public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
+ List interceptors) {
+ RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
+ // 添加拦截器
+ interceptors.forEach(redisMQTemplate::addInterceptor);
+ return redisMQTemplate;
+ }
+
+ // ========== 消费者相关 ==========
+
+ /**
+ * 创建 Redis Pub/Sub 广播消费的容器
+ */
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
+ public RedisMessageListenerContainer redisMessageListenerContainer(
+ RedisMQTemplate redisMQTemplate, List> listeners) {
+ // 创建 RedisMessageListenerContainer 对象
+ RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+ // 设置 RedisConnection 工厂。
+ container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
+ // 添加监听器
+ listeners.forEach(listener -> {
+ listener.setRedisMQTemplate(redisMQTemplate);
+ container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
+ log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
+ listener.getChannel(), listener.getClass().getName());
+ });
+ return container;
+ }
+
+ /**
+ * 创建 Redis Stream 重新消费的任务
+ */
+ @Bean
+ @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
+ public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners,
+ RedisMQTemplate redisTemplate,
+ @Value("${spring.application.name}") String groupName,
+ RedissonClient redissonClient) {
+ return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
+ }
+
+ /**
+ * 创建 Redis Stream 集群消费的容器
+ *
+ * 基础知识:Redis Stream 的 xreadgroup 命令
+ */
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
+ public StreamMessageListenerContainer> redisStreamMessageListenerContainer(
+ RedisMQTemplate redisMQTemplate, List> listeners) {
+ RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate();
+ checkRedisVersion(redisTemplate);
+ // 第一步,创建 StreamMessageListenerContainer 容器
+ // 创建 options 配置
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions =
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
+ .batchSize(10) // 一次性最多拉取多少条消息
+ .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
+ .build();
+ // 创建 container 对象
+ StreamMessageListenerContainer> container =
+ StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
+
+ // 第二步,注册监听器,消费对应的 Stream 主题
+ String consumerName = buildConsumerName();
+ listeners.parallelStream().forEach(listener -> {
+ log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
+ listener.getStreamKey(), listener.getClass().getName());
+ // 创建 listener 对应的消费者分组
+ try {
+ redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
+ } catch (Exception ignore) {
+ }
+ // 设置 listener 对应的 redisTemplate
+ listener.setRedisMQTemplate(redisMQTemplate);
+ // 创建 Consumer 对象
+ Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
+ // 设置 Consumer 消费进度,以最小消费进度为准
+ StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
+ // 设置 Consumer 监听
+ StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest
+ .builder(streamOffset).consumer(consumer)
+ .autoAcknowledge(false) // 不自动 ack
+ .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
+ container.register(builder.build(), listener);
+ log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
+ listener.getStreamKey(), listener.getClass().getName());
+ });
+ return container;
+ }
+
+ /**
+ * 构建消费者名字,使用本地 IP + 进程编号的方式。
+ * 参考自 RocketMQ clientId 的实现
+ *
+ * @return 消费者名字
+ */
+ private static String buildConsumerName() {
+ return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
+ }
+
+ /**
+ * 校验 Redis 版本号,是否满足最低的版本号要求!
+ */
+ private static void checkRedisVersion(RedisTemplate redisTemplate) {
+ // 获得 Redis 版本
+ Properties info = redisTemplate.execute((RedisCallback) RedisServerCommands::info);
+ String version = MapUtil.getStr(info, "redis_version");
+ // 校验最低版本必须大于等于 5.0.0
+ int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
+ if (majorVersion < 5) {
+ throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
+ "请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
new file mode 100644
index 000000000..5755ffa51
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
@@ -0,0 +1,87 @@
+package cn.iocoder.yudao.framework.mq.redis.core;
+
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.springframework.data.redis.connection.stream.RecordId;
+import org.springframework.data.redis.connection.stream.StreamRecords;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Redis MQ 操作模板类
+ *
+ * @author 芋道源码
+ */
+@AllArgsConstructor
+public class RedisMQTemplate {
+
+ @Getter
+ private final RedisTemplate redisTemplate;
+ /**
+ * 拦截器数组
+ */
+ @Getter
+ private final List interceptors = new ArrayList<>();
+
+ /**
+ * 发送 Redis 消息,基于 Redis pub/sub 实现
+ *
+ * @param message 消息
+ */
+ public void send(T message) {
+ try {
+ sendMessageBefore(message);
+ // 发送消息
+ redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
+ } finally {
+ sendMessageAfter(message);
+ }
+ }
+
+ /**
+ * 发送 Redis 消息,基于 Redis Stream 实现
+ *
+ * @param message 消息
+ * @return 消息记录的编号对象
+ */
+ public RecordId send(T message) {
+ try {
+ sendMessageBefore(message);
+ // 发送消息
+ return redisTemplate.opsForStream().add(StreamRecords.newRecord()
+ .ofObject(JsonUtils.toJsonString(message)) // 设置内容
+ .withStreamKey(message.getStreamKey())); // 设置 stream key
+ } finally {
+ sendMessageAfter(message);
+ }
+ }
+
+ /**
+ * 添加拦截器
+ *
+ * @param interceptor 拦截器
+ */
+ public void addInterceptor(RedisMessageInterceptor interceptor) {
+ interceptors.add(interceptor);
+ }
+
+ private void sendMessageBefore(AbstractRedisMessage message) {
+ // 正序
+ interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
+ }
+
+ private void sendMessageAfter(AbstractRedisMessage message) {
+ // 倒序
+ for (int i = interceptors.size() - 1; i >= 0; i--) {
+ interceptors.get(i).sendMessageAfter(message);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
new file mode 100644
index 000000000..dbcee7fe2
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
@@ -0,0 +1,26 @@
+package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+
+/**
+ * {@link AbstractRedisMessage} 消息拦截器
+ * 通过拦截器,作为插件机制,实现拓展。
+ * 例如说,多租户场景下的 MQ 消息处理
+ *
+ * @author 芋道源码
+ */
+public interface RedisMessageInterceptor {
+
+ default void sendMessageBefore(AbstractRedisMessage message) {
+ }
+
+ default void sendMessageAfter(AbstractRedisMessage message) {
+ }
+
+ default void consumeMessageBefore(AbstractRedisMessage message) {
+ }
+
+ default void consumeMessageAfter(AbstractRedisMessage message) {
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
new file mode 100644
index 000000000..b84f17c15
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
@@ -0,0 +1,100 @@
+package cn.iocoder.yudao.framework.mq.redis.core.job;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+import org.springframework.data.domain.Range;
+import org.springframework.data.redis.connection.stream.*;
+import org.springframework.data.redis.core.StreamOperations;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 这个任务用于处理,crash 之后的消费者未消费完的消息
+ */
+@Slf4j
+@AllArgsConstructor
+public class RedisPendingMessageResendJob {
+
+ private static final String LOCK_KEY = "redis:pending:msg:lock";
+
+ /**
+ * 消息超时时间,默认 5 分钟
+ *
+ * 1. 超时的消息才会被重新投递
+ * 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到
+ */
+ private static final int EXPIRE_TIME = 5 * 60;
+
+ private final List> listeners;
+ private final RedisMQTemplate redisTemplate;
+ private final String groupName;
+ private final RedissonClient redissonClient;
+
+ /**
+ * 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
+ */
+ @Scheduled(cron = "35 * * * * ?")
+ public void messageResend() {
+ RLock lock = redissonClient.getLock(LOCK_KEY);
+ // 尝试加锁
+ if (lock.tryLock()) {
+ try {
+ execute();
+ } catch (Exception ex) {
+ log.error("[messageResend][执行异常]", ex);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * 执行清理逻辑
+ *
+ * @see 讨论
+ */
+ private void execute() {
+ StreamOperations ops = redisTemplate.getRedisTemplate().opsForStream();
+ listeners.forEach(listener -> {
+ PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
+ // 每个消费者的 pending 队列消息数量
+ Map pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
+ pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
+ log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
+ // 每个消费者的 pending消息的详情信息
+ PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
+ if (pendingMessages.isEmpty()) {
+ return;
+ }
+ pendingMessages.forEach(pendingMessage -> {
+ // 获取消息上一次传递到 consumer 的时间,
+ long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
+ if (lastDelivery < EXPIRE_TIME){
+ return;
+ }
+ // 获取指定 id 的消息体
+ List> records = ops.range(listener.getStreamKey(),
+ Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
+ if (CollUtil.isEmpty(records)) {
+ return;
+ }
+ // 重新投递消息
+ redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
+ .ofObject(records.get(0).getValue()) // 设置内容
+ .withStreamKey(listener.getStreamKey()));
+ // ack 消息消费完成
+ redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
+ log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
+ });
+ });
+ });
+ }
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
new file mode 100644
index 000000000..ee40814dd
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
@@ -0,0 +1,29 @@
+package cn.iocoder.yudao.framework.mq.redis.core.message;
+
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Redis 消息抽象基类
+ *
+ * @author 芋道源码
+ */
+@Data
+public abstract class AbstractRedisMessage {
+
+ /**
+ * 头
+ */
+ private Map headers = new HashMap<>();
+
+ public String getHeader(String key) {
+ return headers.get(key);
+ }
+
+ public void addHeader(String key, String value) {
+ headers.put(key, value);
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java
new file mode 100644
index 000000000..d5ea5b9d5
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java
@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Redis Channel Message 抽象类
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
+
+ /**
+ * 获得 Redis Channel,默认使用类名
+ *
+ * @return Channel
+ */
+ @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
+ public String getChannel() {
+ return getClass().getSimpleName();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
new file mode 100644
index 000000000..fd7c910c9
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
@@ -0,0 +1,103 @@
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
+
+import cn.hutool.core.util.TypeUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Redis Pub/Sub 监听器抽象类,用于实现广播消费
+ *
+ * @param 消息类型。一定要填写噢,不然会报错
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisChannelMessageListener implements MessageListener {
+
+ /**
+ * 消息类型
+ */
+ private final Class messageType;
+ /**
+ * Redis Channel
+ */
+ private final String channel;
+ /**
+ * RedisMQTemplate
+ */
+ @Setter
+ private RedisMQTemplate redisMQTemplate;
+
+ @SneakyThrows
+ protected AbstractRedisChannelMessageListener() {
+ this.messageType = getMessageClass();
+ this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
+ }
+
+ /**
+ * 获得 Sub 订阅的 Redis Channel 通道
+ *
+ * @return channel
+ */
+ public final String getChannel() {
+ return channel;
+ }
+
+ @Override
+ public final void onMessage(Message message, byte[] bytes) {
+ T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
+ try {
+ consumeMessageBefore(messageObj);
+ // 消费消息
+ this.onMessage(messageObj);
+ } finally {
+ consumeMessageAfter(messageObj);
+ }
+ }
+
+ /**
+ * 处理消息
+ *
+ * @param message 消息
+ */
+ public abstract void onMessage(T message);
+
+ /**
+ * 通过解析类上的泛型,获得消息类型
+ *
+ * @return 消息类型
+ */
+ @SuppressWarnings("unchecked")
+ private Class getMessageClass() {
+ Type type = TypeUtil.getTypeArgument(getClass(), 0);
+ if (type == null) {
+ throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
+ }
+ return (Class) type;
+ }
+
+ private void consumeMessageBefore(AbstractRedisMessage message) {
+ assert redisMQTemplate != null;
+ List interceptors = redisMQTemplate.getInterceptors();
+ // 正序
+ interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
+ }
+
+ private void consumeMessageAfter(AbstractRedisMessage message) {
+ assert redisMQTemplate != null;
+ List interceptors = redisMQTemplate.getInterceptors();
+ // 倒序
+ for (int i = interceptors.size() - 1; i >= 0; i--) {
+ interceptors.get(i).consumeMessageAfter(message);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessage.java
new file mode 100644
index 000000000..9017e0876
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessage.java
@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.mq.redis.core.stream;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Redis Stream Message 抽象类
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
+
+ /**
+ * 获得 Redis Stream Key,默认使用类名
+ *
+ * @return Channel
+ */
+ @JsonIgnore // 避免序列化
+ public String getStreamKey() {
+ return getClass().getSimpleName();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java
new file mode 100644
index 000000000..3e656af3f
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/AbstractRedisStreamMessageListener.java
@@ -0,0 +1,113 @@
+package cn.iocoder.yudao.framework.mq.redis.core.stream;
+
+import cn.hutool.core.util.TypeUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.SneakyThrows;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.stream.StreamListener;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * Redis Stream 监听器抽象类,用于实现集群消费
+ *
+ * @param 消息类型。一定要填写噢,不然会报错
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisStreamMessageListener
+ implements StreamListener> {
+
+ /**
+ * 消息类型
+ */
+ private final Class messageType;
+ /**
+ * Redis Channel
+ */
+ @Getter
+ private final String streamKey;
+
+ /**
+ * Redis 消费者分组,默认使用 spring.application.name 名字
+ */
+ @Value("${spring.application.name}")
+ @Getter
+ private String group;
+ /**
+ * RedisMQTemplate
+ */
+ @Setter
+ private RedisMQTemplate redisMQTemplate;
+
+ @SneakyThrows
+ protected AbstractRedisStreamMessageListener() {
+ this.messageType = getMessageClass();
+ this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
+ }
+
+ @Override
+ public void onMessage(ObjectRecord message) {
+ // 消费消息
+ T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
+ try {
+ consumeMessageBefore(messageObj);
+ // 消费消息
+ this.onMessage(messageObj);
+ // ack 消息消费完成
+ redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
+ // TODO 芋艿:需要额外考虑以下几个点:
+ // 1. 处理异常的情况
+ // 2. 发送日志;以及事务的结合
+ // 3. 消费日志;以及通用的幂等性
+ // 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
+ } finally {
+ consumeMessageAfter(messageObj);
+ }
+ }
+
+ /**
+ * 处理消息
+ *
+ * @param message 消息
+ */
+ public abstract void onMessage(T message);
+
+ /**
+ * 通过解析类上的泛型,获得消息类型
+ *
+ * @return 消息类型
+ */
+ @SuppressWarnings("unchecked")
+ private Class getMessageClass() {
+ Type type = TypeUtil.getTypeArgument(getClass(), 0);
+ if (type == null) {
+ throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
+ }
+ return (Class) type;
+ }
+
+ private void consumeMessageBefore(AbstractRedisMessage message) {
+ assert redisMQTemplate != null;
+ List interceptors = redisMQTemplate.getInterceptors();
+ // 正序
+ interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
+ }
+
+ private void consumeMessageAfter(AbstractRedisMessage message) {
+ assert redisMQTemplate != null;
+ List interceptors = redisMQTemplate.getInterceptors();
+ // 倒序
+ for (int i = interceptors.size() - 1; i >= 0; i--) {
+ interceptors.get(i).consumeMessageAfter(message);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java
new file mode 100644
index 000000000..6621fc1ea
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/package-info.java
@@ -0,0 +1,6 @@
+/**
+ * 消息队列,基于 Redis 提供:
+ * 1. 基于 Pub/Sub 实现广播消费
+ * 2. 基于 Stream 实现集群消费
+ */
+package cn.iocoder.yudao.framework.mq.redis;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index c45ba7600..f0f201707 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1 +1,2 @@
-cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md
new file mode 100644
index 000000000..08586b379
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md
new file mode 100644
index 000000000..b66d6334c
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md
new file mode 100644
index 000000000..eff46e2f7
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md
new file mode 100644
index 000000000..08586b379
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-dev.yaml b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-dev.yaml
index e889708d4..13e35b51c 100644
--- a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-dev.yaml
+++ b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-dev.yaml
@@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
-spring:
- cloud:
- stream:
- rocketmq:
- # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
- binder:
- name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:
diff --git a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-local.yaml b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-local.yaml
index f9c2673b5..6b22d9a50 100644
--- a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-local.yaml
+++ b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application-local.yaml
@@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
-spring:
- cloud:
- stream:
- rocketmq:
- # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
- binder:
- name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
- binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################
diff --git a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application.yaml b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application.yaml
index b72d9de77..2908b5ce3 100644
--- a/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application.yaml
+++ b/yudao-module-bpm/yudao-module-bpm-biz/src/main/resources/application.yaml
@@ -85,26 +85,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
-spring:
- cloud:
- # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
- stream:
- # Spring Cloud Stream RocketMQ 配置项
- rocketmq:
- # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
- binder:
- name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
- default: # 默认 bindings 全局配置
- producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
- group: bpm_producer_group # 生产者分组
- send-type: SYNC # 发送模式,SYNC 同步
-
- # Spring Cloud Bus 配置项,对应 BusProperties 类
- bus:
- enabled: true # 是否开启,默认为 true
- id: ${spring.application.name}:${server.port} # 编号,Spring Cloud Alibaba 建议使用“应用:端口”的格式
- destination: springCloudBus # 目标消息队列,默认为 springCloudBus
-
--- #################### 定时任务相关配置 ####################
xxl:
diff --git a/yudao-module-infra/yudao-module-infra-biz/pom.xml b/yudao-module-infra/yudao-module-infra-biz/pom.xml
index 99cd709f6..5df268b29 100644
--- a/yudao-module-infra/yudao-module-infra-biz/pom.xml
+++ b/yudao-module-infra/yudao-module-infra-biz/pom.xml
@@ -101,10 +101,6 @@