mq:改造支持 redis、rocketmq、rabbitmq、kafka 作为消息实现
parent
2450d7afdc
commit
e21b8f977e
|
@ -17,7 +17,7 @@
|
|||
<revision>1.8.3-snapshot</revision>
|
||||
<flatten-maven-plugin.version>1.5.0</flatten-maven-plugin.version>
|
||||
<!-- 统一依赖管理 -->
|
||||
<spring.boot.version>2.7.16</spring.boot.version>
|
||||
<spring.boot.version>2.7.17</spring.boot.version>
|
||||
<spring.cloud.version>2021.0.5</spring.cloud.version>
|
||||
<spring.cloud.alibaba.version>2021.0.4.0</spring.cloud.alibaba.version>
|
||||
<!-- Web 相关 -->
|
||||
|
@ -32,6 +32,8 @@
|
|||
<mybatis-plus-join-boot-starter.version>1.4.6</mybatis-plus-join-boot-starter.version>
|
||||
<redisson.version>3.18.0</redisson.version>
|
||||
<dm8.jdbc.version>8.1.2.141</dm8.jdbc.version>
|
||||
<!-- 消息队列 -->
|
||||
<rocketmq-spring.version>2.2.3</rocketmq-spring.version>
|
||||
<!-- RPC 相关 -->
|
||||
<!-- Config 配置中心相关 -->
|
||||
<apollo.version>1.9.2</apollo.version>
|
||||
|
@ -303,6 +305,12 @@
|
|||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<version>${rocketmq-spring.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 服务保障相关 -->
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.cloud</groupId>
|
||||
|
|
|
@ -58,6 +58,21 @@
|
|||
<artifactId>yudao-spring-boot-starter-mq</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- Test 测试相关 -->
|
||||
<dependency>
|
||||
|
|
|
@ -6,6 +6,9 @@ import cn.iocoder.yudao.framework.redis.config.YudaoCacheProperties;
|
|||
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect;
|
||||
import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor;
|
||||
import cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect;
|
||||
import cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer;
|
||||
import cn.iocoder.yudao.framework.tenant.core.mq.redis.TenantRedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQInitializer;
|
||||
import cn.iocoder.yudao.framework.tenant.core.redis.TenantRedisCacheManager;
|
||||
import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter;
|
||||
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
|
||||
|
@ -93,6 +96,25 @@ public class YudaoTenantAutoConfiguration {
|
|||
return new TenantJobAspect(tenantFrameworkService);
|
||||
}
|
||||
|
||||
// ========== MQ ==========
|
||||
|
||||
@Bean
|
||||
public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() {
|
||||
return new TenantRedisMessageInterceptor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public TenantRabbitMQInitializer tenantRabbitMQInitializer() {
|
||||
return new TenantRabbitMQInitializer();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate")
|
||||
public TenantRocketMQInitializer tenantRocketMQInitializer() {
|
||||
return new TenantRocketMQInitializer();
|
||||
}
|
||||
|
||||
// ========== Redis ==========
|
||||
|
||||
@Bean
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
package cn.iocoder.yudao.framework.tenant.config;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.mq.TenantChannelInterceptor;
|
||||
import cn.iocoder.yudao.framework.tenant.core.mq.TenantFunctionAroundWrapper;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.integration.config.GlobalChannelInterceptor;
|
||||
|
||||
@AutoConfiguration
|
||||
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户
|
||||
@ConditionalOnClass(name = {
|
||||
"org.springframework.messaging.support.ChannelInterceptor",
|
||||
"org.springframework.cloud.function.context.catalog.FunctionAroundWrapper"
|
||||
})
|
||||
@EnableConfigurationProperties(TenantProperties.class)
|
||||
public class YudaoTenantMQAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@GlobalChannelInterceptor // 必须添加在方法上,否则无法生效
|
||||
public TenantChannelInterceptor tenantChannelInterceptor() {
|
||||
return new TenantChannelInterceptor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public FunctionAroundWrapper functionAroundWrapper() {
|
||||
return new TenantFunctionAroundWrapper();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.env.EnvironmentPostProcessor;
|
||||
import org.springframework.core.env.ConfigurableEnvironment;
|
||||
|
||||
/**
|
||||
* 多租户的 Kafka 的 {@link EnvironmentPostProcessor} 实现类
|
||||
*
|
||||
* Kafka Producer 发送消息时,增加 {@link TenantKafkaProducerInterceptor} 拦截器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {
|
||||
|
||||
private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";
|
||||
|
||||
@Override
|
||||
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
|
||||
// 添加 TenantKafkaProducerInterceptor 拦截器
|
||||
try {
|
||||
String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
|
||||
if (StrUtil.isEmpty(value)) {
|
||||
value = TenantKafkaProducerInterceptor.class.getName();
|
||||
} else {
|
||||
value += "," + TenantKafkaProducerInterceptor.class.getName();
|
||||
}
|
||||
environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
|
||||
} catch (NoClassDefFoundError ignore) {
|
||||
// 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* Kafka 消息队列的多租户 {@link ProducerInterceptor} 实现类
|
||||
*
|
||||
* 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
||||
* 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
|
||||
|
||||
@Override
|
||||
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId != null) {
|
||||
Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射
|
||||
headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes());
|
||||
}
|
||||
return record;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
|
||||
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
|
||||
/**
|
||||
* 多租户的 RabbitMQ 初始化器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRabbitMQInitializer implements BeanPostProcessor {
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof RabbitTemplate) {
|
||||
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
|
||||
rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor());
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import org.apache.kafka.clients.producer.ProducerInterceptor;
|
||||
import org.springframework.amqp.AmqpException;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessagePostProcessor;
|
||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列的多租户 {@link ProducerInterceptor} 实现类
|
||||
*
|
||||
* 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
||||
* 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor {
|
||||
|
||||
@Override
|
||||
public Message postProcessMessage(Message message) throws AmqpException {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId != null) {
|
||||
message.getMessageProperties().getHeaders().put(HEADER_TENANT_ID, tenantId);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.redis;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* 多租户 {@link AbstractRedisMessage} 拦截器
|
||||
*
|
||||
* 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
||||
* 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRedisMessageInterceptor implements RedisMessageInterceptor {
|
||||
|
||||
@Override
|
||||
public void sendMessageBefore(AbstractRedisMessage message) {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId != null) {
|
||||
message.addHeader(HEADER_TENANT_ID, tenantId.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
String tenantIdStr = message.getHeader(HEADER_TENANT_ID);
|
||||
if (StrUtil.isNotEmpty(tenantIdStr)) {
|
||||
TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
// 注意,Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
|
||||
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类
|
||||
*
|
||||
* Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
|
||||
|
||||
@Override
|
||||
public String hookName() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeMessageBefore(ConsumeMessageContext context) {
|
||||
// 校验,消息必须是单条,不然设置租户可能不正确
|
||||
List<MessageExt> messages = context.getMsgList();
|
||||
Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
|
||||
// 设置租户编号
|
||||
String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
|
||||
if (StrUtil.isNotEmpty(tenantId)) {
|
||||
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeMessageAfter(ConsumeMessageContext context) {
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
|
||||
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
||||
import org.apache.rocketmq.spring.core.RocketMQTemplate;
|
||||
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
|
||||
/**
|
||||
* 多租户的 RocketMQ 初始化器
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRocketMQInitializer implements BeanPostProcessor {
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof DefaultRocketMQListenerContainer) {
|
||||
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
|
||||
initTenantConsumer(container.getConsumer());
|
||||
} else if (bean instanceof RocketMQTemplate) {
|
||||
RocketMQTemplate template = (RocketMQTemplate) bean;
|
||||
initTenantProducer(template.getProducer());
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
private void initTenantProducer(DefaultMQProducer producer) {
|
||||
if (producer == null) {
|
||||
return;
|
||||
}
|
||||
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
|
||||
if (producerImpl == null) {
|
||||
return;
|
||||
}
|
||||
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
|
||||
}
|
||||
|
||||
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
|
||||
if (consumer == null) {
|
||||
return;
|
||||
}
|
||||
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
|
||||
if (consumerImpl == null) {
|
||||
return;
|
||||
}
|
||||
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import org.apache.rocketmq.client.hook.SendMessageContext;
|
||||
import org.apache.rocketmq.client.hook.SendMessageHook;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
|
||||
*
|
||||
* Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class TenantRocketMQSendMessageHook implements SendMessageHook {
|
||||
|
||||
@Override
|
||||
public String hookName() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessageBefore(SendMessageContext sendMessageContext) {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
if (tenantId == null) {
|
||||
return;
|
||||
}
|
||||
sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessageAfter(SendMessageContext sendMessageContext) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,269 @@
|
|||
/*
|
||||
* Copyright 2002-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.messaging.handler.invocation;
|
||||
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
|
||||
import org.springframework.core.DefaultParameterNameDiscoverer;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.ParameterNameDiscoverer;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.handler.HandlerMethod;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
||||
/**
|
||||
* Extension of {@link HandlerMethod} that invokes the underlying method with
|
||||
* argument values resolved from the current HTTP request through a list of
|
||||
* {@link HandlerMethodArgumentResolver}.
|
||||
*
|
||||
* 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中
|
||||
* TODO 芋艿:持续跟进,看看有没新的拓展点
|
||||
*
|
||||
* @author Rossen Stoyanchev
|
||||
* @author Juergen Hoeller
|
||||
* @since 4.0
|
||||
*/
|
||||
public class InvocableHandlerMethod extends HandlerMethod {
|
||||
|
||||
private static final Object[] EMPTY_ARGS = new Object[0];
|
||||
|
||||
private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
|
||||
|
||||
private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
|
||||
|
||||
/**
|
||||
* Create an instance from a {@code HandlerMethod}.
|
||||
*/
|
||||
public InvocableHandlerMethod(HandlerMethod handlerMethod) {
|
||||
super(handlerMethod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an instance from a bean instance and a method.
|
||||
*/
|
||||
public InvocableHandlerMethod(Object bean, Method method) {
|
||||
super(bean, method);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a new handler method with the given bean instance, method name and parameters.
|
||||
* @param bean the object bean
|
||||
* @param methodName the method name
|
||||
* @param parameterTypes the method parameter types
|
||||
* @throws NoSuchMethodException when the method cannot be found
|
||||
*/
|
||||
public InvocableHandlerMethod(Object bean, String methodName, Class<?>... parameterTypes)
|
||||
throws NoSuchMethodException {
|
||||
|
||||
super(bean, methodName, parameterTypes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
|
||||
*/
|
||||
public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
|
||||
this.resolvers = argumentResolvers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ParameterNameDiscoverer for resolving parameter names when needed
|
||||
* (e.g. default request attribute name).
|
||||
* <p>Default is a {@link DefaultParameterNameDiscoverer}.
|
||||
*/
|
||||
public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
|
||||
this.parameterNameDiscoverer = parameterNameDiscoverer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the method after resolving its argument values in the context of the given message.
|
||||
* <p>Argument values are commonly resolved through
|
||||
* {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
|
||||
* The {@code providedArgs} parameter however may supply argument values to be used directly,
|
||||
* i.e. without argument resolution.
|
||||
* <p>Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
|
||||
* resolved arguments.
|
||||
* @param message the current message being processed
|
||||
* @param providedArgs "given" arguments matched by type, not resolved
|
||||
* @return the raw value returned by the invoked method
|
||||
* @throws Exception raised if no suitable argument resolver can be found,
|
||||
* or if the method raised an exception
|
||||
* @see #getMethodArgumentValues
|
||||
* @see #doInvoke
|
||||
*/
|
||||
@Nullable
|
||||
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
|
||||
Object[] args = getMethodArgumentValues(message, providedArgs);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Arguments: " + Arrays.toString(args));
|
||||
}
|
||||
// 注意:如下是本类的改动点!!!
|
||||
// 情况一:无租户编号的情况
|
||||
Long tenantId= parseTenantId(message);
|
||||
if (tenantId == null) {
|
||||
return doInvoke(args);
|
||||
}
|
||||
// 情况二:有租户的情况下
|
||||
return TenantUtils.execute(tenantId, () -> doInvoke(args));
|
||||
}
|
||||
|
||||
private Long parseTenantId(Message<?> message) {
|
||||
Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
|
||||
if (tenantId == null) {
|
||||
return null;
|
||||
}
|
||||
if (tenantId instanceof Long) {
|
||||
return (Long) tenantId;
|
||||
}
|
||||
if (tenantId instanceof Number) {
|
||||
return ((Number) tenantId).longValue();
|
||||
}
|
||||
if (tenantId instanceof String) {
|
||||
return Long.parseLong((String) tenantId);
|
||||
}
|
||||
if (tenantId instanceof byte[]) {
|
||||
return Long.parseLong(new String((byte[]) tenantId));
|
||||
}
|
||||
throw new IllegalArgumentException("未知的数据类型:" + tenantId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the method argument values for the current message, checking the provided
|
||||
* argument values and falling back to the configured argument resolvers.
|
||||
* <p>The resulting array will be passed into {@link #doInvoke}.
|
||||
* @since 5.1.2
|
||||
*/
|
||||
protected Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
|
||||
MethodParameter[] parameters = getMethodParameters();
|
||||
if (ObjectUtils.isEmpty(parameters)) {
|
||||
return EMPTY_ARGS;
|
||||
}
|
||||
|
||||
Object[] args = new Object[parameters.length];
|
||||
for (int i = 0; i < parameters.length; i++) {
|
||||
MethodParameter parameter = parameters[i];
|
||||
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
|
||||
args[i] = findProvidedArgument(parameter, providedArgs);
|
||||
if (args[i] != null) {
|
||||
continue;
|
||||
}
|
||||
if (!this.resolvers.supportsParameter(parameter)) {
|
||||
throw new MethodArgumentResolutionException(
|
||||
message, parameter, formatArgumentError(parameter, "No suitable resolver"));
|
||||
}
|
||||
try {
|
||||
args[i] = this.resolvers.resolveArgument(parameter, message);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
// Leave stack trace for later, exception may actually be resolved and handled...
|
||||
if (logger.isDebugEnabled()) {
|
||||
String exMsg = ex.getMessage();
|
||||
if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
|
||||
logger.debug(formatArgumentError(parameter, exMsg));
|
||||
}
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
return args;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the handler method with the given argument values.
|
||||
*/
|
||||
@Nullable
|
||||
protected Object doInvoke(Object... args) throws Exception {
|
||||
try {
|
||||
return getBridgedMethod().invoke(getBean(), args);
|
||||
}
|
||||
catch (IllegalArgumentException ex) {
|
||||
assertTargetBean(getBridgedMethod(), getBean(), args);
|
||||
String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
|
||||
throw new IllegalStateException(formatInvokeError(text, args), ex);
|
||||
}
|
||||
catch (InvocationTargetException ex) {
|
||||
// Unwrap for HandlerExceptionResolvers ...
|
||||
Throwable targetException = ex.getTargetException();
|
||||
if (targetException instanceof RuntimeException) {
|
||||
throw (RuntimeException) targetException;
|
||||
}
|
||||
else if (targetException instanceof Error) {
|
||||
throw (Error) targetException;
|
||||
}
|
||||
else if (targetException instanceof Exception) {
|
||||
throw (Exception) targetException;
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
|
||||
return new AsyncResultMethodParameter(returnValue);
|
||||
}
|
||||
|
||||
private class AsyncResultMethodParameter extends HandlerMethodParameter {
|
||||
|
||||
@Nullable
|
||||
private final Object returnValue;
|
||||
|
||||
private final ResolvableType returnType;
|
||||
|
||||
public AsyncResultMethodParameter(@Nullable Object returnValue) {
|
||||
super(-1);
|
||||
this.returnValue = returnValue;
|
||||
this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
|
||||
}
|
||||
|
||||
protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
|
||||
super(original);
|
||||
this.returnValue = original.returnValue;
|
||||
this.returnType = original.returnType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<?> getParameterType() {
|
||||
if (this.returnValue != null) {
|
||||
return this.returnValue.getClass();
|
||||
}
|
||||
if (!ResolvableType.NONE.equals(this.returnType)) {
|
||||
return this.returnType.toClass();
|
||||
}
|
||||
return super.getParameterType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getGenericParameterType() {
|
||||
return this.returnType.getType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AsyncResultMethodParameter clone() {
|
||||
return new AsyncResultMethodParameter(this);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,3 +1,2 @@
|
|||
cn.iocoder.yudao.framework.tenant.config.YudaoTenantRpcAutoConfiguration
|
||||
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
|
||||
cn.iocoder.yudao.framework.tenant.config.YudaoTenantMQAutoConfiguration
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
org.springframework.boot.env.EnvironmentPostProcessor=\
|
||||
cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
|
|
@ -12,15 +12,32 @@
|
|||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>
|
||||
消息队列:
|
||||
1. 基于 Spring Cloud Stream 实现异步消息
|
||||
2. 基于 Spring Cloud Bus 实现事件总线
|
||||
</description>
|
||||
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
|
||||
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
|
||||
|
||||
<dependencies>
|
||||
<!-- MQ 相关 -->
|
||||
<!-- DB 相关 -->
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.cloud</groupId>
|
||||
<artifactId>yudao-spring-boot-starter-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 消息队列相关 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.rocketmq</groupId>
|
||||
<artifactId>rocketmq-spring-boot-starter</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
package cn.iocoder.yudao.framework.mq.config;
|
||||
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
|
||||
/**
|
||||
* 消息队列配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AutoConfiguration
|
||||
public class YudaoMQAutoConfiguration {
|
||||
|
||||
}
|
|
@ -1,4 +0,0 @@
|
|||
/**
|
||||
* TODO 芋艿,后续删除,临时占位
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.core;
|
|
@ -1,6 +1,4 @@
|
|||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
* 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq;
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package cn.iocoder.yudao.framework.mq.rabbitmq.config;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.utils.SerializationUtils;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
/**
|
||||
* RabbitMQ 消息队列配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AutoConfiguration
|
||||
@Slf4j
|
||||
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
|
||||
public class YudaoRabbitMQAutoConfiguration {
|
||||
|
||||
static {
|
||||
// 强制设置 SerializationUtils 的 TRUST_ALL 为 true,避免 RabbitMQ Consumer 反序列化消息报错
|
||||
// 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后
|
||||
Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL");
|
||||
ReflectUtil.removeFinalModify(trustAllField);
|
||||
ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
/**
|
||||
* 占位符,无特殊逻辑
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.rabbitmq.core;
|
|
@ -0,0 +1,4 @@
|
|||
/**
|
||||
* 消息队列,基于 RabbitMQ 提供
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.rabbitmq;
|
|
@ -0,0 +1,164 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.config;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* 消息队列配置类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Slf4j
|
||||
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
|
||||
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
|
||||
public class YudaoRedisMQAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
List<RedisMessageInterceptor> interceptors) {
|
||||
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
|
||||
// 添加拦截器
|
||||
interceptors.forEach(redisMQTemplate::addInterceptor);
|
||||
return redisMQTemplate;
|
||||
}
|
||||
|
||||
// ========== 消费者相关 ==========
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||
// 设置 RedisConnection 工厂。
|
||||
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
|
||||
// 添加监听器
|
||||
listeners.forEach(listener -> {
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
|
||||
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
|
||||
listener.getChannel(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 重新消费的任务
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
|
||||
RedisMQTemplate redisTemplate,
|
||||
@Value("${spring.application.name}") String groupName,
|
||||
RedissonClient redissonClient) {
|
||||
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
*
|
||||
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
|
||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||
checkRedisVersion(redisTemplate);
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
||||
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
listeners.parallelStream().forEach(listener -> {
|
||||
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {
|
||||
}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisMQTemplate(redisMQTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
|
||||
listener.getStreamKey(), listener.getClass().getName());
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
private static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
/**
|
||||
* 校验 Redis 版本号,是否满足最低的版本号要求!
|
||||
*/
|
||||
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
|
||||
// 获得 Redis 版本
|
||||
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
|
||||
String version = MapUtil.getStr(info, "redis_version");
|
||||
// 校验最低版本必须大于等于 5.0.0
|
||||
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
|
||||
if (majorVersion < 5) {
|
||||
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
|
||||
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis MQ 操作模板类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
public class RedisMQTemplate {
|
||||
|
||||
@Getter
|
||||
private final RedisTemplate<String, ?> redisTemplate;
|
||||
/**
|
||||
* 拦截器数组
|
||||
*/
|
||||
@Getter
|
||||
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis pub/sub 实现
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public <T extends AbstractRedisChannelMessage> void send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
|
||||
try {
|
||||
sendMessageBefore(message);
|
||||
// 发送消息
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
} finally {
|
||||
sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加拦截器
|
||||
*
|
||||
* @param interceptor 拦截器
|
||||
*/
|
||||
public void addInterceptor(RedisMessageInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
private void sendMessageBefore(AbstractRedisMessage message) {
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
|
||||
}
|
||||
|
||||
private void sendMessageAfter(AbstractRedisMessage message) {
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).sendMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
||||
* 通过拦截器,作为插件机制,实现拓展。
|
||||
* 例如说,多租户场景下的 MQ 消息处理
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface RedisMessageInterceptor {
|
||||
|
||||
default void sendMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void sendMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
default void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.data.domain.Range;
|
||||
import org.springframework.data.redis.connection.stream.*;
|
||||
import org.springframework.data.redis.core.StreamOperations;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* 这个任务用于处理,crash 之后的消费者未消费完的消息
|
||||
*/
|
||||
@Slf4j
|
||||
@AllArgsConstructor
|
||||
public class RedisPendingMessageResendJob {
|
||||
|
||||
private static final String LOCK_KEY = "redis:pending:msg:lock";
|
||||
|
||||
/**
|
||||
* 消息超时时间,默认 5 分钟
|
||||
*
|
||||
* 1. 超时的消息才会被重新投递
|
||||
* 2. 由于定时任务 1 分钟一次,消息超时后不会被立即重投,极端情况下消息5分钟过期后,再等 1 分钟才会被扫瞄到
|
||||
*/
|
||||
private static final int EXPIRE_TIME = 5 * 60;
|
||||
|
||||
private final List<AbstractRedisStreamMessageListener<?>> listeners;
|
||||
private final RedisMQTemplate redisTemplate;
|
||||
private final String groupName;
|
||||
private final RedissonClient redissonClient;
|
||||
|
||||
/**
|
||||
* 一分钟执行一次,这里选择每分钟的35秒执行,是为了避免整点任务过多的问题
|
||||
*/
|
||||
@Scheduled(cron = "35 * * * * ?")
|
||||
public void messageResend() {
|
||||
RLock lock = redissonClient.getLock(LOCK_KEY);
|
||||
// 尝试加锁
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
execute();
|
||||
} catch (Exception ex) {
|
||||
log.error("[messageResend][执行异常]", ex);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行清理逻辑
|
||||
*
|
||||
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files">讨论</a>
|
||||
*/
|
||||
private void execute() {
|
||||
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
|
||||
listeners.forEach(listener -> {
|
||||
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
|
||||
// 每个消费者的 pending 队列消息数量
|
||||
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
|
||||
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
|
||||
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
|
||||
// 每个消费者的 pending消息的详情信息
|
||||
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
|
||||
if (pendingMessages.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
pendingMessages.forEach(pendingMessage -> {
|
||||
// 获取消息上一次传递到 consumer 的时间,
|
||||
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
|
||||
if (lastDelivery < EXPIRE_TIME){
|
||||
return;
|
||||
}
|
||||
// 获取指定 id 的消息体
|
||||
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
|
||||
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
|
||||
if (CollUtil.isEmpty(records)) {
|
||||
return;
|
||||
}
|
||||
// 重新投递消息
|
||||
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(records.get(0).getValue()) // 设置内容
|
||||
.withStreamKey(listener.getStreamKey()));
|
||||
// ack 消息消费完成
|
||||
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
|
||||
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.message;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Redis 消息抽象基类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public abstract class AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 头
|
||||
*/
|
||||
private Map<String, String> headers = new HashMap<>();
|
||||
|
||||
public String getHeader(String key) {
|
||||
return headers.get(key);
|
||||
}
|
||||
|
||||
public void addHeader(String key, String value) {
|
||||
headers.put(key, value);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 抽象类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
||||
public String getChannel() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Pub/Sub 监听器抽象类,用于实现广播消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
private final String channel;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractRedisChannelMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获得 Sub 订阅的 Redis Channel 通道
|
||||
*
|
||||
* @return channel
|
||||
*/
|
||||
public final String getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onMessage(Message message, byte[] bytes) {
|
||||
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.stream;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 抽象类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
public String getStreamKey() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
package cn.iocoder.yudao.framework.mq.redis.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
* RedisMQTemplate
|
||||
*/
|
||||
@Setter
|
||||
private RedisMQTemplate redisMQTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractRedisStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
try {
|
||||
consumeMessageBefore(messageObj);
|
||||
// 消费消息
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
|
||||
// TODO 芋艿:需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
} finally {
|
||||
consumeMessageAfter(messageObj);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
private void consumeMessageBefore(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 正序
|
||||
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
|
||||
}
|
||||
|
||||
private void consumeMessageAfter(AbstractRedisMessage message) {
|
||||
assert redisMQTemplate != null;
|
||||
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
|
||||
// 倒序
|
||||
for (int i = interceptors.size() - 1; i >= 0; i--) {
|
||||
interceptors.get(i).consumeMessageAfter(message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.redis;
|
|
@ -1 +1,2 @@
|
|||
cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
|
||||
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
|
||||
cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>
|
|
@ -0,0 +1 @@
|
|||
<http://www.iocoder.cn/Spring-Boot/Kafka/?yudao>
|
|
@ -0,0 +1 @@
|
|||
<http://www.iocoder.cn/Spring-Boot/RabbitMQ/?yudao>
|
|
@ -0,0 +1 @@
|
|||
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>
|
Loading…
Reference in New Issue