!64 增加 RocketMQ、Kafka、RabbitMQ 消息队列的支持

Merge pull request !64 from 芋道源码/feature/mq-optimize
pull/62/MERGE
芋道源码 2023-11-02 12:52:48 +00:00 committed by Gitee
commit 7afe119d72
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
85 changed files with 1518 additions and 641 deletions

View File

@ -17,7 +17,7 @@
<revision>1.8.3-snapshot</revision>
<flatten-maven-plugin.version>1.5.0</flatten-maven-plugin.version>
<!-- 统一依赖管理 -->
<spring.boot.version>2.7.16</spring.boot.version>
<spring.boot.version>2.7.17</spring.boot.version>
<spring.cloud.version>2021.0.5</spring.cloud.version>
<spring.cloud.alibaba.version>2021.0.4.0</spring.cloud.alibaba.version>
<!-- Web 相关 -->
@ -32,6 +32,8 @@
<mybatis-plus-join-boot-starter.version>1.4.6</mybatis-plus-join-boot-starter.version>
<redisson.version>3.18.0</redisson.version>
<dm8.jdbc.version>8.1.2.141</dm8.jdbc.version>
<!-- 消息队列 -->
<rocketmq-spring.version>2.2.3</rocketmq-spring.version>
<!-- RPC 相关 -->
<!-- Config 配置中心相关 -->
<apollo.version>1.9.2</apollo.version>
@ -303,6 +305,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring.version}</version>
</dependency>
<!-- 服务保障相关 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>

View File

@ -1,10 +1,11 @@
package cn.iocoder.yudao.framework.common.core;
import com.google.common.base.Objects;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* Key Value
*
@ -13,7 +14,7 @@ import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KeyValue<K, V> {
public class KeyValue<K, V> implements Serializable {
private K key;
private V value;

View File

@ -58,6 +58,21 @@
<artifactId>yudao-spring-boot-starter-mq</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<optional>true</optional>
</dependency>
<!-- Test 测试相关 -->
<dependency>

View File

@ -6,6 +6,9 @@ import cn.iocoder.yudao.framework.redis.config.YudaoCacheProperties;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect;
import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect;
import cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer;
import cn.iocoder.yudao.framework.tenant.core.mq.redis.TenantRedisMessageInterceptor;
import cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQInitializer;
import cn.iocoder.yudao.framework.tenant.core.redis.TenantRedisCacheManager;
import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter;
import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService;
@ -93,6 +96,25 @@ public class YudaoTenantAutoConfiguration {
return new TenantJobAspect(tenantFrameworkService);
}
// ========== MQ ==========
@Bean
public TenantRedisMessageInterceptor tenantRedisMessageInterceptor() {
return new TenantRedisMessageInterceptor();
}
@Bean
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public TenantRabbitMQInitializer tenantRabbitMQInitializer() {
return new TenantRabbitMQInitializer();
}
@Bean
@ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate")
public TenantRocketMQInitializer tenantRocketMQInitializer() {
return new TenantRocketMQInitializer();
}
// ========== Redis ==========
@Bean

View File

@ -1,33 +0,0 @@
package cn.iocoder.yudao.framework.tenant.config;
import cn.iocoder.yudao.framework.tenant.core.mq.TenantChannelInterceptor;
import cn.iocoder.yudao.framework.tenant.core.mq.TenantFunctionAroundWrapper;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.config.GlobalChannelInterceptor;
@AutoConfiguration
@ConditionalOnProperty(prefix = "yudao.tenant", value = "enable", matchIfMissing = true) // 允许使用 yudao.tenant.enable=false 禁用多租户
@ConditionalOnClass(name = {
"org.springframework.messaging.support.ChannelInterceptor",
"org.springframework.cloud.function.context.catalog.FunctionAroundWrapper"
})
@EnableConfigurationProperties(TenantProperties.class)
public class YudaoTenantMQAutoConfiguration {
@Bean
@GlobalChannelInterceptor // 必须添加在方法上,否则无法生效
public TenantChannelInterceptor tenantChannelInterceptor() {
return new TenantChannelInterceptor();
}
@Bean
public FunctionAroundWrapper functionAroundWrapper() {
return new TenantFunctionAroundWrapper();
}
}

View File

@ -1,32 +0,0 @@
package cn.iocoder.yudao.framework.tenant.core.mq;
import cn.hutool.core.util.ReflectUtil;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import java.util.Map;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* {@link ChannelInterceptor}
* Header
*
* @author
*/
public class TenantChannelInterceptor implements ChannelInterceptor {
@Override
@SuppressWarnings({"unchecked", "NullableProblems"})
public Message<?> preSend(Message<?> message, MessageChannel channel) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
Map<String, Object> headers = (Map<String, Object>) ReflectUtil.getFieldValue(message.getHeaders(), "headers");
headers.put(HEADER_TENANT_ID, tenantId);
}
return message;
}
}

View File

@ -1,36 +0,0 @@
package cn.iocoder.yudao.framework.tenant.core.mq;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.messaging.Message;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* FunctionAroundWrapper
* Context
*
* @author
*/
public class TenantFunctionAroundWrapper extends FunctionAroundWrapper {
@Override
protected Object doApply(Object input, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
// 如果不是 MQ 消息,则直接跳过
if (!(input instanceof Message)) {
return targetFunction.apply(input);
}
// 如果没有多租户,则直接跳过
Message<?> message = (Message<?>) input;
Long tenantId = MapUtil.getLong(message.getHeaders(), HEADER_TENANT_ID);
if (tenantId == null) {
return targetFunction.apply(input);
}
// 如果有多租户,则使用多租户上下文
return TenantUtils.execute(tenantId, () -> targetFunction.apply(input));
}
}

View File

@ -0,0 +1,37 @@
package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
/**
* Kafka {@link EnvironmentPostProcessor}
*
* Kafka Producer {@link TenantKafkaProducerInterceptor}
*
* @author
*/
@Slf4j
public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor {
private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes";
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
// 添加 TenantKafkaProducerInterceptor 拦截器
try {
String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES);
if (StrUtil.isEmpty(value)) {
value = TenantKafkaProducerInterceptor.class.getName();
} else {
value += "," + TenantKafkaProducerInterceptor.class.getName();
}
environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value);
} catch (NoClassDefFoundError ignore) {
// 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖
}
}
}

View File

@ -0,0 +1,47 @@
package cn.iocoder.yudao.framework.tenant.core.mq.kafka;
import cn.hutool.core.util.ReflectUtil;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import java.util.Map;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* Kafka {@link ProducerInterceptor}
*
* 1. Producer {@link TenantContextHolder} Header
* 2. Consumer Header {@link TenantContextHolder} {@link InvocableHandlerMethod}
*
* @author
*/
public class TenantKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射
headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes());
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}

View File

@ -0,0 +1,23 @@
package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
/**
* RabbitMQ
*
* @author
*/
public class TenantRabbitMQInitializer implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof RabbitTemplate) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) bean;
rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor());
}
return bean;
}
}

View File

@ -0,0 +1,31 @@
package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* RabbitMQ {@link ProducerInterceptor}
*
* 1. Producer {@link TenantContextHolder} Header
* 2. Consumer Header {@link TenantContextHolder} {@link InvocableHandlerMethod}
*
* @author
*/
public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
message.getMessageProperties().getHeaders().put(HEADER_TENANT_ID, tenantId);
}
return message;
}
}

View File

@ -0,0 +1,42 @@
package cn.iocoder.yudao.framework.tenant.core.mq.redis;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* {@link AbstractRedisMessage}
*
* 1. Producer {@link TenantContextHolder} Header
* 2. Consumer Header {@link TenantContextHolder}
*
* @author
*/
public class TenantRedisMessageInterceptor implements RedisMessageInterceptor {
@Override
public void sendMessageBefore(AbstractRedisMessage message) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
message.addHeader(HEADER_TENANT_ID, tenantId.toString());
}
}
@Override
public void consumeMessageBefore(AbstractRedisMessage message) {
String tenantIdStr = message.getHeader(HEADER_TENANT_ID);
if (StrUtil.isNotEmpty(tenantIdStr)) {
TenantContextHolder.setTenantId(Long.valueOf(tenantIdStr));
}
}
@Override
public void consumeMessageAfter(AbstractRedisMessage message) {
// 注意Consumer 是一个逻辑的入口,所以不考虑原本上下文就存在租户编号的情况
TenantContextHolder.clear();
}
}

View File

@ -0,0 +1,46 @@
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import java.util.List;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* RocketMQ {@link ConsumeMessageHook}
*
* Consumer Header {@link TenantContextHolder} {@link InvocableHandlerMethod}
*
* @author
*/
public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
// 校验,消息必须是单条,不然设置租户可能不正确
List<MessageExt> messages = context.getMsgList();
Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
// 设置租户编号
String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
if (StrUtil.isNotEmpty(tenantId)) {
TenantContextHolder.setTenantId(Long.parseLong(tenantId));
}
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
TenantContextHolder.clear();
}
}

View File

@ -0,0 +1,53 @@
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
/**
* RocketMQ
*
* @author
*/
public class TenantRocketMQInitializer implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DefaultRocketMQListenerContainer) {
DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
initTenantConsumer(container.getConsumer());
} else if (bean instanceof RocketMQTemplate) {
RocketMQTemplate template = (RocketMQTemplate) bean;
initTenantProducer(template.getProducer());
}
return bean;
}
private void initTenantProducer(DefaultMQProducer producer) {
if (producer == null) {
return;
}
DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
if (producerImpl == null) {
return;
}
producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
}
private void initTenantConsumer(DefaultMQPushConsumer consumer) {
if (consumer == null) {
return;
}
DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
if (consumerImpl == null) {
return;
}
consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
}
}

View File

@ -0,0 +1,36 @@
package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* RocketMQ {@link SendMessageHook}
*
* Producer {@link TenantContextHolder} Header
*
* @author
*/
public class TenantRocketMQSendMessageHook implements SendMessageHook {
@Override
public String hookName() {
return getClass().getSimpleName();
}
@Override
public void sendMessageBefore(SendMessageContext sendMessageContext) {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId == null) {
return;
}
sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
}
@Override
public void sendMessageAfter(SendMessageContext sendMessageContext) {
}
}

View File

@ -0,0 +1,269 @@
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.handler.invocation;
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import org.springframework.core.DefaultParameterNameDiscoverer;
import org.springframework.core.MethodParameter;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.core.ResolvableType;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.HandlerMethod;
import org.springframework.util.ObjectUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* Extension of {@link HandlerMethod} that invokes the underlying method with
* argument values resolved from the current HTTP request through a list of
* {@link HandlerMethodArgumentResolver}.
*
* rabbitmq-spring kafka-spring Consumer Header tenant-id {@link TenantContextHolder}
* TODO
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.0
*/
public class InvocableHandlerMethod extends HandlerMethod {
private static final Object[] EMPTY_ARGS = new Object[0];
private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
/**
* Create an instance from a {@code HandlerMethod}.
*/
public InvocableHandlerMethod(HandlerMethod handlerMethod) {
super(handlerMethod);
}
/**
* Create an instance from a bean instance and a method.
*/
public InvocableHandlerMethod(Object bean, Method method) {
super(bean, method);
}
/**
* Construct a new handler method with the given bean instance, method name and parameters.
* @param bean the object bean
* @param methodName the method name
* @param parameterTypes the method parameter types
* @throws NoSuchMethodException when the method cannot be found
*/
public InvocableHandlerMethod(Object bean, String methodName, Class<?>... parameterTypes)
throws NoSuchMethodException {
super(bean, methodName, parameterTypes);
}
/**
* Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
*/
public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
this.resolvers = argumentResolvers;
}
/**
* Set the ParameterNameDiscoverer for resolving parameter names when needed
* (e.g. default request attribute name).
* <p>Default is a {@link DefaultParameterNameDiscoverer}.
*/
public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
this.parameterNameDiscoverer = parameterNameDiscoverer;
}
/**
* Invoke the method after resolving its argument values in the context of the given message.
* <p>Argument values are commonly resolved through
* {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
* The {@code providedArgs} parameter however may supply argument values to be used directly,
* i.e. without argument resolution.
* <p>Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
* resolved arguments.
* @param message the current message being processed
* @param providedArgs "given" arguments matched by type, not resolved
* @return the raw value returned by the invoked method
* @throws Exception raised if no suitable argument resolver can be found,
* or if the method raised an exception
* @see #getMethodArgumentValues
* @see #doInvoke
*/
@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) {
logger.trace("Arguments: " + Arrays.toString(args));
}
// 注意:如下是本类的改动点!!!
// 情况一:无租户编号的情况
Long tenantId= parseTenantId(message);
if (tenantId == null) {
return doInvoke(args);
}
// 情况二:有租户的情况下
return TenantUtils.execute(tenantId, () -> doInvoke(args));
}
private Long parseTenantId(Message<?> message) {
Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
if (tenantId == null) {
return null;
}
if (tenantId instanceof Long) {
return (Long) tenantId;
}
if (tenantId instanceof Number) {
return ((Number) tenantId).longValue();
}
if (tenantId instanceof String) {
return Long.parseLong((String) tenantId);
}
if (tenantId instanceof byte[]) {
return Long.parseLong(new String((byte[]) tenantId));
}
throw new IllegalArgumentException("未知的数据类型:" + tenantId);
}
/**
* Get the method argument values for the current message, checking the provided
* argument values and falling back to the configured argument resolvers.
* <p>The resulting array will be passed into {@link #doInvoke}.
* @since 5.1.2
*/
protected Object[] getMethodArgumentValues(Message<?> message, Object... providedArgs) throws Exception {
MethodParameter[] parameters = getMethodParameters();
if (ObjectUtils.isEmpty(parameters)) {
return EMPTY_ARGS;
}
Object[] args = new Object[parameters.length];
for (int i = 0; i < parameters.length; i++) {
MethodParameter parameter = parameters[i];
parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
args[i] = findProvidedArgument(parameter, providedArgs);
if (args[i] != null) {
continue;
}
if (!this.resolvers.supportsParameter(parameter)) {
throw new MethodArgumentResolutionException(
message, parameter, formatArgumentError(parameter, "No suitable resolver"));
}
try {
args[i] = this.resolvers.resolveArgument(parameter, message);
}
catch (Exception ex) {
// Leave stack trace for later, exception may actually be resolved and handled...
if (logger.isDebugEnabled()) {
String exMsg = ex.getMessage();
if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
logger.debug(formatArgumentError(parameter, exMsg));
}
}
throw ex;
}
}
return args;
}
/**
* Invoke the handler method with the given argument values.
*/
@Nullable
protected Object doInvoke(Object... args) throws Exception {
try {
return getBridgedMethod().invoke(getBean(), args);
}
catch (IllegalArgumentException ex) {
assertTargetBean(getBridgedMethod(), getBean(), args);
String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
throw new IllegalStateException(formatInvokeError(text, args), ex);
}
catch (InvocationTargetException ex) {
// Unwrap for HandlerExceptionResolvers ...
Throwable targetException = ex.getTargetException();
if (targetException instanceof RuntimeException) {
throw (RuntimeException) targetException;
}
else if (targetException instanceof Error) {
throw (Error) targetException;
}
else if (targetException instanceof Exception) {
throw (Exception) targetException;
}
else {
throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
}
}
}
MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
return new AsyncResultMethodParameter(returnValue);
}
private class AsyncResultMethodParameter extends HandlerMethodParameter {
@Nullable
private final Object returnValue;
private final ResolvableType returnType;
public AsyncResultMethodParameter(@Nullable Object returnValue) {
super(-1);
this.returnValue = returnValue;
this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
}
protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
super(original);
this.returnValue = original.returnValue;
this.returnType = original.returnType;
}
@Override
public Class<?> getParameterType() {
if (this.returnValue != null) {
return this.returnValue.getClass();
}
if (!ResolvableType.NONE.equals(this.returnType)) {
return this.returnType.toClass();
}
return super.getParameterType();
}
@Override
public Type getGenericParameterType() {
return this.returnType.getType();
}
@Override
public AsyncResultMethodParameter clone() {
return new AsyncResultMethodParameter(this);
}
}
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.env.EnvironmentPostProcessor=\
cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor

View File

@ -1,3 +1,2 @@
cn.iocoder.yudao.framework.tenant.config.YudaoTenantRpcAutoConfiguration
cn.iocoder.yudao.framework.tenant.config.YudaoTenantAutoConfiguration
cn.iocoder.yudao.framework.tenant.config.YudaoTenantMQAutoConfiguration

View File

@ -12,25 +12,31 @@
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
消息队列:
1. 基于 Spring Cloud Stream 实现异步消息
2. 基于 Spring Cloud Bus 实现事件总线
</description>
<description>消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种</description>
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
<dependencies>
<!-- MQ 相关 -->
<!-- DB 相关 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-redis</artifactId>
</dependency>
<!-- 消息队列相关 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<!-- 引入基于 RocketMQ 的 Spring Cloud Bus 的实现的依赖,并实现对其的自动配置 -->
<artifactId>spring-cloud-starter-bus-rocketmq</artifactId>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

View File

@ -1,37 +0,0 @@
package cn.iocoder.yudao.framework.mq.config;
import com.alibaba.cloud.stream.binder.rocketmq.convert.RocketMQMessageConverter;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.*;
import java.util.ArrayList;
import java.util.List;
/**
*
*
* @author
*/
@AutoConfiguration
public class YudaoMQAutoConfiguration {
/**
* {@link RocketMQMessageConverter} fastjson
*/
@Bean(RocketMQMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = { RocketMQMessageConverter.DEFAULT_NAME })
public CompositeMessageConverter rocketMQMessageConverter() {
List<MessageConverter> messageConverters = new ArrayList<>();
ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
byteArrayMessageConverter.setContentTypeResolver(null);
messageConverters.add(byteArrayMessageConverter);
messageConverters.add(new StringMessageConverter());
messageConverters.add(new MappingJackson2MessageConverter());
return new CompositeMessageConverter(messageConverters);
}
}

View File

@ -1,41 +0,0 @@
package cn.iocoder.yudao.framework.mq.core.bus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.bus.ServiceMatcher;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import javax.annotation.Resource;
/**
* Spring Cloud Bus Producer
*
* @author
*/
public abstract class AbstractBusProducer {
@Resource
protected ApplicationEventPublisher applicationEventPublisher;
@Resource
protected ServiceMatcher serviceMatcher;
@Value("${spring.application.name}")
protected String applicationName;
protected void publishEvent(RemoteApplicationEvent event) {
applicationEventPublisher.publishEvent(event);
}
/**
* @return 广
*/
protected String selfDestinationService() {
return applicationName + ":**";
}
protected String getBusId() {
return serviceMatcher.getBusId();
}
}

View File

@ -1,4 +0,0 @@
/**
* TODO
*/
package cn.iocoder.yudao.framework.mq.core;

View File

@ -1,6 +1,4 @@
/**
* Redis
* 1. Pub/Sub 广
* 2. Stream
* RedisRocketMQRabbitMQKafka
*/
package cn.iocoder.yudao.framework.mq;

View File

@ -0,0 +1,29 @@
package cn.iocoder.yudao.framework.mq.rabbitmq.config;
import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.utils.SerializationUtils;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import java.lang.reflect.Field;
/**
* RabbitMQ
*
* @author
*/
@AutoConfiguration
@Slf4j
@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
public class YudaoRabbitMQAutoConfiguration {
static {
// 强制设置 SerializationUtils 的 TRUST_ALL 为 true避免 RabbitMQ Consumer 反序列化消息报错
// 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后
Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL");
ReflectUtil.removeFinalModify(trustAllField);
ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true);
}
}

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.framework.mq.rabbitmq.core;

View File

@ -0,0 +1,4 @@
/**
* RabbitMQ
*/
package cn.iocoder.yudao.framework.mq.rabbitmq;

View File

@ -0,0 +1,164 @@
package cn.iocoder.yudao.framework.mq.redis.config;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.annotation.EnableScheduling;
import java.util.List;
import java.util.Properties;
/**
*
*
* @author
*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}
// ========== 消费者相关 ==========
/**
* Redis Pub/Sub 广
*/
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
// 创建 RedisMessageListenerContainer 对象
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置 RedisConnection 工厂。
container.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
// 添加监听器
listeners.forEach(listener -> {
listener.setRedisMQTemplate(redisMQTemplate);
container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
listener.getChannel(), listener.getClass().getName());
});
return container;
}
/**
* Redis Stream
*/
@Bean
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> listeners,
RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) {
return new RedisPendingMessageResendJob(listeners, redisTemplate, groupName, redissonClient);
}
/**
* Redis Stream
*
* <a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream xreadgroup </a>
*/
@Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> listeners) {
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
checkRedisVersion(redisTemplate);
// 第一步,创建 StreamMessageListenerContainer 容器
// 创建 options 配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10) // 一次性最多拉取多少条消息
.targetType(String.class) // 目标类型。统一使用 String通过自己封装的 AbstractStreamMessageListener 去反序列化
.build();
// 创建 container 对象
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
// 第二步,注册监听器,消费对应的 Stream 主题
String consumerName = buildConsumerName();
listeners.parallelStream().forEach(listener -> {
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
listener.getStreamKey(), listener.getClass().getName());
// 创建 listener 对应的消费者分组
try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
} catch (Exception ignore) {
}
// 设置 listener 对应的 redisTemplate
listener.setRedisMQTemplate(redisMQTemplate);
// 创建 Consumer 对象
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
// 设置 Consumer 消费进度,以最小消费进度为准
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
// 设置 Consumer 监听
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset).consumer(consumer)
.autoAcknowledge(false) // 不自动 ack
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
container.register(builder.build(), listener);
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
listener.getStreamKey(), listener.getClass().getName());
});
return container;
}
/**
* 使 IP +
* RocketMQ clientId
*
* @return
*/
private static String buildConsumerName() {
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
}
/**
* Redis
*/
private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
// 获得 Redis 版本
Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info);
String version = MapUtil.getStr(info, "redis_version");
// 校验最低版本必须大于等于 5.0.0
int majorVersion = Integer.parseInt(StrUtil.subBefore(version, '.', false));
if (majorVersion < 5) {
throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {},小于最低要求的 5.0.0 版本!" +
"请参考 {} 文档进行安装。", version, DocumentEnum.REDIS_INSTALL.getUrl()));
}
}
}

View File

@ -0,0 +1,87 @@
package cn.iocoder.yudao.framework.mq.redis.core;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.ArrayList;
import java.util.List;
/**
* Redis MQ
*
* @author
*/
@AllArgsConstructor
public class RedisMQTemplate {
@Getter
private final RedisTemplate<String, ?> redisTemplate;
/**
*
*/
@Getter
private final List<RedisMessageInterceptor> interceptors = new ArrayList<>();
/**
* Redis Redis pub/sub
*
* @param message
*/
public <T extends AbstractRedisChannelMessage> void send(T message) {
try {
sendMessageBefore(message);
// 发送消息
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
} finally {
sendMessageAfter(message);
}
}
/**
* Redis Redis Stream
*
* @param message
* @return
*/
public <T extends AbstractRedisStreamMessage> RecordId send(T message) {
try {
sendMessageBefore(message);
// 发送消息
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key
} finally {
sendMessageAfter(message);
}
}
/**
*
*
* @param interceptor
*/
public void addInterceptor(RedisMessageInterceptor interceptor) {
interceptors.add(interceptor);
}
private void sendMessageBefore(AbstractRedisMessage message) {
// 正序
interceptors.forEach(interceptor -> interceptor.sendMessageBefore(message));
}
private void sendMessageAfter(AbstractRedisMessage message) {
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).sendMessageAfter(message);
}
}
}

View File

@ -0,0 +1,26 @@
package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
/**
* {@link AbstractRedisMessage}
*
* MQ
*
* @author
*/
public interface RedisMessageInterceptor {
default void sendMessageBefore(AbstractRedisMessage message) {
}
default void sendMessageAfter(AbstractRedisMessage message) {
}
default void consumeMessageBefore(AbstractRedisMessage message) {
}
default void consumeMessageAfter(AbstractRedisMessage message) {
}
}

View File

@ -0,0 +1,100 @@
package cn.iocoder.yudao.framework.mq.redis.core.job;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* crash
*/
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {
private static final String LOCK_KEY = "redis:pending:msg:lock";
/**
* 5
*
* 1.
* 2. 1 5 1
*/
private static final int EXPIRE_TIME = 5 * 60;
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
/**
* ,35
*/
@Scheduled(cron = "35 * * * * ?")
public void messageResend() {
RLock lock = redissonClient.getLock(LOCK_KEY);
// 尝试加锁
if (lock.tryLock()) {
try {
execute();
} catch (Exception ex) {
log.error("[messageResend][执行异常]", ex);
} finally {
lock.unlock();
}
}
}
/**
*
*
* @see <a href="https://gitee.com/zhijiantianya/ruoyi-vue-pro/pulls/480/files"></a>
*/
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
// 每个消费者的 pending 队列消息数量
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
log.info("[processPendingMessage][消费者({}) 消息数量({})]", consumerName, pendingMessageCount);
// 每个消费者的 pending消息的详情信息
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
if (pendingMessages.isEmpty()) {
return;
}
pendingMessages.forEach(pendingMessage -> {
// 获取消息上一次传递到 consumer 的时间,
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
if (lastDelivery < EXPIRE_TIME){
return;
}
// 获取指定 id 的消息体
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
if (CollUtil.isEmpty(records)) {
return;
}
// 重新投递消息
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
.ofObject(records.get(0).getValue()) // 设置内容
.withStreamKey(listener.getStreamKey()));
// ack 消息消费完成
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
});
});
});
}
}

View File

@ -0,0 +1,29 @@
package cn.iocoder.yudao.framework.mq.redis.core.message;
import lombok.Data;
import java.util.HashMap;
import java.util.Map;
/**
* Redis
*
* @author
*/
@Data
public abstract class AbstractRedisMessage {
/**
*
*/
private Map<String, String> headers = new HashMap<>();
public String getHeader(String key) {
return headers.get(key);
}
public void addHeader(String key, String value) {
headers.put(key, value);
}
}

View File

@ -0,0 +1,23 @@
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Channel Message
*
* @author
*/
public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
/**
* Redis Channel使
*
* @return Channel
*/
@JsonIgnore // 避免序列化。原因是Redis 发布 Channel 消息的时候,已经会指定。
public String getChannel() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,103 @@
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import java.lang.reflect.Type;
import java.util.List;
/**
* Redis Pub/Sub 广
*
* @param <T>
*
* @author
*/
public abstract class AbstractRedisChannelMessageListener<T extends AbstractRedisChannelMessage> implements MessageListener {
/**
*
*/
private final Class<T> messageType;
/**
* Redis Channel
*/
private final String channel;
/**
* RedisMQTemplate
*/
@Setter
private RedisMQTemplate redisMQTemplate;
@SneakyThrows
protected AbstractRedisChannelMessageListener() {
this.messageType = getMessageClass();
this.channel = messageType.getDeclaredConstructor().newInstance().getChannel();
}
/**
* Sub Redis Channel
*
* @return channel
*/
public final String getChannel() {
return channel;
}
@Override
public final void onMessage(Message message, byte[] bytes) {
T messageObj = JsonUtils.parseObject(message.getBody(), messageType);
try {
consumeMessageBefore(messageObj);
// 消费消息
this.onMessage(messageObj);
} finally {
consumeMessageAfter(messageObj);
}
}
/**
*
*
* @param message
*/
public abstract void onMessage(T message);
/**
*
*
* @return
*/
@SuppressWarnings("unchecked")
private Class<T> getMessageClass() {
Type type = TypeUtil.getTypeArgument(getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) type;
}
private void consumeMessageBefore(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 正序
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
}
private void consumeMessageAfter(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).consumeMessageAfter(message);
}
}
}

View File

@ -0,0 +1,23 @@
package cn.iocoder.yudao.framework.mq.redis.core.stream;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Stream Message
*
* @author
*/
public abstract class AbstractRedisStreamMessage extends AbstractRedisMessage {
/**
* Redis Stream Key使
*
* @return Channel
*/
@JsonIgnore // 避免序列化
public String getStreamKey() {
return getClass().getSimpleName();
}
}

View File

@ -0,0 +1,113 @@
package cn.iocoder.yudao.framework.mq.redis.core.stream;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import java.lang.reflect.Type;
import java.util.List;
/**
* Redis Stream
*
* @param <T>
*
* @author
*/
public abstract class AbstractRedisStreamMessageListener<T extends AbstractRedisStreamMessage>
implements StreamListener<String, ObjectRecord<String, String>> {
/**
*
*/
private final Class<T> messageType;
/**
* Redis Channel
*/
@Getter
private final String streamKey;
/**
* Redis 使 spring.application.name
*/
@Value("${spring.application.name}")
@Getter
private String group;
/**
* RedisMQTemplate
*/
@Setter
private RedisMQTemplate redisMQTemplate;
@SneakyThrows
protected AbstractRedisStreamMessageListener() {
this.messageType = getMessageClass();
this.streamKey = messageType.getDeclaredConstructor().newInstance().getStreamKey();
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
// 消费消息
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
try {
consumeMessageBefore(messageObj);
// 消费消息
this.onMessage(messageObj);
// ack 消息消费完成
redisMQTemplate.getRedisTemplate().opsForStream().acknowledge(group, message);
// TODO 芋艿:需要额外考虑以下几个点:
// 1. 处理异常的情况
// 2. 发送日志;以及事务的结合
// 3. 消费日志;以及通用的幂等性
// 4. 消费失败的重试https://zhuanlan.zhihu.com/p/60501638
} finally {
consumeMessageAfter(messageObj);
}
}
/**
*
*
* @param message
*/
public abstract void onMessage(T message);
/**
*
*
* @return
*/
@SuppressWarnings("unchecked")
private Class<T> getMessageClass() {
Type type = TypeUtil.getTypeArgument(getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) type;
}
private void consumeMessageBefore(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 正序
interceptors.forEach(interceptor -> interceptor.consumeMessageBefore(message));
}
private void consumeMessageAfter(AbstractRedisMessage message) {
assert redisMQTemplate != null;
List<RedisMessageInterceptor> interceptors = redisMQTemplate.getInterceptors();
// 倒序
for (int i = interceptors.size() - 1; i >= 0; i--) {
interceptors.get(i).consumeMessageAfter(message);
}
}
}

View File

@ -0,0 +1,6 @@
/**
* Redis
* 1. Pub/Sub 广
* 2. Stream
*/
package cn.iocoder.yudao.framework.mq.redis;

View File

@ -1 +1,2 @@
cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration

View File

@ -0,0 +1 @@
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

View File

@ -0,0 +1 @@
<http://www.iocoder.cn/Spring-Boot/Kafka/?yudao>

View File

@ -0,0 +1 @@
<http://www.iocoder.cn/Spring-Boot/RabbitMQ/?yudao>

View File

@ -0,0 +1 @@
<http://www.iocoder.cn/Spring-Boot/RocketMQ/?yudao>

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -85,26 +85,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: bpm_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -101,10 +101,6 @@
</dependency>
<!-- 消息队列相关 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>
<!-- Test 测试相关 -->
<dependency>

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -74,14 +74,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -74,34 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer
# Binding 配置项,对应 BindingProperties Map
# bindings:
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: infra_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -74,33 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;
# Binding 配置项,对应 BindingProperties Map
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: product_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -0,0 +1,31 @@
package cn.iocoder.yudao.module.promotion.mq.consumer.coupon;
import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage;
import cn.iocoder.yudao.module.promotion.service.coupon.CouponService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* {@link MemberUserCreateMessage}
*
* @author owen
*/
@Component
@Slf4j
public class CouponTakeByRegisterConsumer {
@Resource
private CouponService couponService;
@EventListener
@Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
public void onMessage(MemberUserCreateMessage message) {
log.info("[onMessage][消息内容({})]", message);
couponService.takeCouponByRegister(message.getUserId());
}
}

View File

@ -1,29 +0,0 @@
package cn.iocoder.yudao.module.promotion.mq.consumer.coupon;
import cn.iocoder.yudao.module.promotion.mq.message.coupon.UserCreateMessage;
import cn.iocoder.yudao.module.promotion.service.coupon.CouponService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.function.Consumer;
/**
* {@link UserCreateMessage}
*
* @author owen
*/
@Component
@Slf4j
public class UserCreateConsumer implements Consumer<UserCreateMessage> {
@Resource
private CouponService couponService;
@Override
public void accept(UserCreateMessage message) {
log.info("[onMessage][消息内容({})]", message);
couponService.takeCouponByRegister(message.getUserId());
}
}

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.promotion.mq.consumer;

View File

@ -1,21 +0,0 @@
package cn.iocoder.yudao.module.promotion.mq.message.coupon;
import lombok.Data;
import javax.validation.constraints.NotNull;
/**
*
*
* @author owen
*/
@Data
public class UserCreateMessage {
/**
*
*/
@NotNull(message = "用户编号不能为空")
private Long userId;
}

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.promotion.mq.message;

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.promotion.mq.producer;

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -74,33 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;
# Binding 配置项,对应 BindingProperties Map
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: promotion_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -74,33 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;
# Binding 配置项,对应 BindingProperties Map
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: statistics_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -74,33 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;
# Binding 配置项,对应 BindingProperties Map
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: trade_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.member.message;

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.member.mq.message.user;
package cn.iocoder.yudao.module.member.message.user;
import lombok.Data;
@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull;
* @author owen
*/
@Data
public class UserCreateMessage {
public class MemberUserCreateMessage {
/**
*

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.member.mq.consumer;

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.member.mq.message;

View File

@ -0,0 +1,4 @@
/**
*
*/
package cn.iocoder.yudao.module.member.mq.producer;

View File

@ -1,9 +1,8 @@
package cn.iocoder.yudao.module.member.mq.producer.user;
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
import cn.iocoder.yudao.module.member.mq.message.user.UserCreateMessage;
import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -15,19 +14,18 @@ import javax.annotation.Resource;
*/
@Slf4j
@Component
public class MemberUserProducer extends AbstractBusProducer {
public class MemberUserProducer {
@Resource
private StreamBridge streamBridge;
private ApplicationContext applicationContext;
// TODO 芋艿:后续要在细看下;
/**
* {@link UserCreateMessage}
* {@link MemberUserCreateMessage}
*
* @param userId
*/
public void sendUserCreateMessage(Long userId) {
streamBridge.send("member-create-out-0",new UserCreateMessage().setUserId(userId));
applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId));
}
}

View File

@ -59,13 +59,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,6 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
--- #################### 定时任务相关配置 ####################

View File

@ -74,33 +74,6 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;
# Binding 配置项,对应 BindingProperties Map
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: member_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -96,16 +96,8 @@
</dependency>
<!-- Job 定时任务相关 -->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.cloud</groupId>-->
<!-- <artifactId>yudao-spring-boot-starter-job</artifactId>-->
<!-- </dependency>-->
<!-- 消息队列相关 -->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.cloud</groupId>-->
<!-- <artifactId>yudao-spring-boot-starter-mq</artifactId>-->
<!-- </dependency>-->
<!-- Test 测试相关 -->
<dependency>

View File

@ -6,9 +6,9 @@ tenant-id: {{adminTenentId}}
{
"templateCode": "test_01",
"mobile": "156016913900",
"params": {
"key01": "value01",
"key02": "value02"
"mobile": "15601691390",
"templateParams": {
"operation": "value01",
"code": "value02"
}
}

View File

@ -3,10 +3,11 @@ package cn.iocoder.yudao.module.system.mq.consumer.mail;
import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
import cn.iocoder.yudao.module.system.service.mail.MailSendService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.function.Consumer;
/**
* {@link MailSendMessage}
@ -15,14 +16,15 @@ import java.util.function.Consumer;
*/
@Component
@Slf4j
public class MailSendConsumer implements Consumer<MailSendMessage> {
public class MailSendConsumer {
@Resource
private MailSendService mailSendService;
@Override
public void accept(MailSendMessage message) {
log.info("[accept][消息内容({})]", message);
@EventListener
@Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
public void onMessage(MailSendMessage message) {
log.info("[onMessage][消息内容({})]", message);
mailSendService.doSendMail(message);
}

View File

@ -3,10 +3,11 @@ package cn.iocoder.yudao.module.system.mq.consumer.sms;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
import cn.iocoder.yudao.module.system.service.sms.SmsSendService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.function.Consumer;
/**
* {@link SmsSendMessage}
@ -15,14 +16,16 @@ import java.util.function.Consumer;
*/
@Component
@Slf4j
public class SmsSendConsumer implements Consumer<SmsSendMessage> {
public class SmsSendConsumer {
@Resource
private SmsSendService smsSendService;
@Override
public void accept(SmsSendMessage message) {
log.info("[accept][消息内容({})]", message);
@EventListener
@Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
public void onMessage(SmsSendMessage message) {
log.info("[onMessage][消息内容({})]", message);
smsSendService.doSendSms(message);
}
}

View File

@ -1,9 +1,8 @@
package cn.iocoder.yudao.module.system.mq.producer.mail;
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -16,10 +15,10 @@ import javax.annotation.Resource;
*/
@Slf4j
@Component
public class MailProducer extends AbstractBusProducer {
public class MailProducer {
@Resource
private StreamBridge streamBridge;
private ApplicationContext applicationContext;
/**
* {@link MailSendMessage}
@ -36,7 +35,7 @@ public class MailProducer extends AbstractBusProducer {
MailSendMessage message = new MailSendMessage()
.setLogId(sendLogId).setMail(mail).setAccountId(accountId)
.setNickname(nickname).setTitle(title).setContent(content);
streamBridge.send("mailSend-out-0", message);
applicationContext.publishEvent(message);
}
}

View File

@ -1,4 +0,0 @@
/**
*
*/
package cn.iocoder.yudao.module.system.mq.producer;

View File

@ -1,10 +1,9 @@
package cn.iocoder.yudao.module.system.mq.producer.sms;
import cn.iocoder.yudao.framework.common.core.KeyValue;
import cn.iocoder.yudao.framework.mq.core.bus.AbstractBusProducer;
import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -14,14 +13,14 @@ import java.util.List;
* Sms Producer
*
* @author zzf
* @date 2021/3/9 16:35
* @since 2021/3/9 16:35
*/
@Slf4j
@Component
public class SmsProducer extends AbstractBusProducer {
public class SmsProducer {
@Resource
private StreamBridge streamBridge;
private ApplicationContext applicationContext;
/**
* {@link SmsSendMessage}
@ -36,7 +35,7 @@ public class SmsProducer extends AbstractBusProducer {
Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
streamBridge.send("smsSend-out-0", message);
applicationContext.publishEvent(message);
}
}

View File

@ -59,13 +59,21 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
--- #################### 定时任务相关配置 ####################
xxl:

View File

@ -70,14 +70,21 @@ spring:
# password: 123456 # 密码,建议生产环境开启
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
stream:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
binding-retry-interval: 7200 # 消息绑定重试间隔时间,单位:秒,默认为 30 秒。考虑到本地可能不启动 RocketMQ 服务,设置为 2 小时
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
--- #################### 定时任务相关配置 ####################

View File

@ -74,43 +74,31 @@ spring:
--- #################### MQ 消息队列相关配置 ####################
spring:
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
function:
definition: busConsumer;smsSendConsumer;mailSendConsumer
# Binding 配置项,对应 BindingProperties Map
bindings:
smsSend-out-0:
destination: system_sms_send
smsSendConsumer-in-0:
destination: system_sms_send
group: system_sms_send_consumer_group
mailSend-out-0:
destination: system_mail_send
mailSendConsumer-in-0:
destination: system_mail_send
group: system_mail_send_consumer_group
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
default: # 默认 bindings 全局配置
producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
group: system_producer_group # 生产者分组
send-type: SYNC # 发送模式SYNC 同步
bindings:
springCloudBusInput:
consumer:
message-model: BROADCASTING # 重要,解决 Spring Cloud Bus RocketMQ 默认不是 BROADCASTING 广播消费的问题
--- #################### 消息队列相关 ####################
# Spring Cloud Bus 配置项,对应 BusProperties 类
bus:
enabled: true # 是否开启,默认为 true
id: ${spring.application.name}:${server.port} # 编号Spring Cloud Alibaba 建议使用“应用:端口”的格式
destination: springCloudBus # 目标消息队列,默认为 springCloudBus
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
# Producer 配置项
producer:
group: ${spring.application.name}_PRODUCER # 生产者分组
spring:
# Kafka 配置项,对应 KafkaProperties 配置类
kafka:
# Kafka Producer 配置项
producer:
acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。
retries: 3 # 发送失败时,重试发送的次数
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化
# Kafka Consumer 配置项
consumer:
auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: '*'
# Kafka Consumer Listener 监听器配置
listener:
missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错
--- #################### 定时任务相关配置 ####################