Merge remote-tracking branch 'origin/master'
# Conflicts: # yudao-module-member/yudao-module-member-server/src/main/java/cn/iocoder/yudao/module/member/controller/admin/user/vo/MemberUserBaseVO.java # yudao-module-member/yudao-module-member-server/src/main/java/cn/iocoder/yudao/module/member/service/user/MemberUserServiceImpl.javapull/254/head
commit
ffb4e8c158
|
|
@ -128,6 +128,26 @@ public class IotMessageBusAutoConfiguration {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// ==================== RabbitMQ 实现 ====================
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rabbitmq")
|
||||||
|
@ConditionalOnClass(RabbitTemplate.class)
|
||||||
|
public static class IotRabbitMQMessageBusConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean
|
||||||
|
public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
|
||||||
|
return new RabbitAdmin(rabbitTemplate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public IotRabbitMQMessageBus iotRabbitMQMessageBus(RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin) {
|
||||||
|
log.info("[iotRabbitMQMessageBus][创建 IoT RabbitMQ 消息总线]");
|
||||||
|
return new IotRabbitMQMessageBus(rabbitTemplate, rabbitAdmin);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ==================== RabbitMQ 实现 ====================
|
// ==================== RabbitMQ 实现 ====================
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,101 @@
|
||||||
|
package cn.iocoder.yudao.module.iot.core.messagebus.core.rabbitmq;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.TypeUtil;
|
||||||
|
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||||
|
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||||
|
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.amqp.core.*;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||||
|
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||||
|
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
||||||
|
|
||||||
|
import javax.annotation.PreDestroy;
|
||||||
|
import java.lang.reflect.Type;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 基于 RabbitMQ 的 {@link IotMessageBus} 实现类
|
||||||
|
*
|
||||||
|
* @author ywc
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class IotRabbitMQMessageBus implements IotMessageBus {
|
||||||
|
|
||||||
|
private final RabbitTemplate rabbitTemplate;
|
||||||
|
|
||||||
|
private final RabbitAdmin rabbitAdmin;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final List<IotMessageSubscriber<?>> subscribers = new ArrayList<>();
|
||||||
|
|
||||||
|
private final List<SimpleMessageListenerContainer> containers = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void post(String topic, Object message) {
|
||||||
|
String json = JsonUtils.toJsonString(message);
|
||||||
|
rabbitTemplate.send(topic, "#", new Message(json.getBytes()));
|
||||||
|
log.info("[post][topic({}) 发送消息({})]", topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void register(IotMessageSubscriber<?> subscriber) {
|
||||||
|
Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
|
||||||
|
if (type == null) {
|
||||||
|
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
String topic = subscriber.getTopic();
|
||||||
|
String group = subscriber.getGroup();
|
||||||
|
|
||||||
|
Queue queue = new Queue(group, true, false, false);
|
||||||
|
rabbitAdmin.declareQueue(queue);
|
||||||
|
|
||||||
|
TopicExchange exchange = new TopicExchange(topic);
|
||||||
|
rabbitAdmin.declareExchange(exchange);
|
||||||
|
|
||||||
|
Binding binding = BindingBuilder.bind(queue).to(exchange).with("#");
|
||||||
|
rabbitAdmin.declareBinding(binding);
|
||||||
|
|
||||||
|
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
|
||||||
|
container.setQueues(queue);
|
||||||
|
container.setConcurrentConsumers(1);
|
||||||
|
container.setMaxConcurrentConsumers(10);
|
||||||
|
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
||||||
|
|
||||||
|
container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
|
||||||
|
String body = new String(message.getBody());
|
||||||
|
try {
|
||||||
|
subscriber.onMessage(JsonUtils.parseObject(body, type));
|
||||||
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
log.error("[onMessage][topic({}/{}) message({}) 处理异常]", topic, group, body, ex);
|
||||||
|
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
container.start();
|
||||||
|
|
||||||
|
containers.add(container);
|
||||||
|
subscribers.add(subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
for (SimpleMessageListenerContainer container : containers) {
|
||||||
|
try {
|
||||||
|
container.stop();
|
||||||
|
container.destroy();
|
||||||
|
log.info("[destroy][关闭 RabbitMQ 消费者容器成功]");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("[destroy]关闭 RabbitMQ 消费者容器异常]", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -24,6 +24,9 @@ public class MemberUserRespDTO {
|
||||||
@Schema(description = "手机号", example = "15601691300")
|
@Schema(description = "手机号", example = "15601691300")
|
||||||
private String mobile;
|
private String mobile;
|
||||||
|
|
||||||
|
@Schema(description = "邮箱", example = "member@iocoder.cn")
|
||||||
|
private String email;
|
||||||
|
|
||||||
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
|
@Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED)
|
||||||
private LocalDateTime createTime;
|
private LocalDateTime createTime;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,9 @@ import jakarta.validation.constraints.Size;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.hibernate.validator.constraints.URL;
|
import org.hibernate.validator.constraints.URL;
|
||||||
|
|
||||||
|
import javax.validation.constraints.Email;
|
||||||
|
import javax.validation.constraints.Size;
|
||||||
|
|
||||||
@Schema(description = "用户 App - 会员用户更新 Request VO")
|
@Schema(description = "用户 App - 会员用户更新 Request VO")
|
||||||
@Data
|
@Data
|
||||||
public class AppMemberUserUpdateReqVO {
|
public class AppMemberUserUpdateReqVO {
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.system.service.member;
|
||||||
import cn.hutool.core.util.ClassUtil;
|
import cn.hutool.core.util.ClassUtil;
|
||||||
import cn.hutool.core.util.ReflectUtil;
|
import cn.hutool.core.util.ReflectUtil;
|
||||||
import cn.hutool.extra.spring.SpringUtil;
|
import cn.hutool.extra.spring.SpringUtil;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -14,8 +13,7 @@ import org.springframework.stereotype.Service;
|
||||||
@Service
|
@Service
|
||||||
public class MemberServiceImpl implements MemberService {
|
public class MemberServiceImpl implements MemberService {
|
||||||
|
|
||||||
@Value("${yudao.info.base-package}")
|
private static final String MEMBER_USER_API_CLASS_NAME = "cn.iocoder.yudao.module.member.api.user.MemberUserApi";
|
||||||
private String basePackage;
|
|
||||||
|
|
||||||
private volatile Object memberUserApi;
|
private volatile Object memberUserApi;
|
||||||
|
|
||||||
|
|
@ -46,7 +44,7 @@ public class MemberServiceImpl implements MemberService {
|
||||||
|
|
||||||
private Object getMemberUserApi() {
|
private Object getMemberUserApi() {
|
||||||
if (memberUserApi == null) {
|
if (memberUserApi == null) {
|
||||||
memberUserApi = SpringUtil.getBean(ClassUtil.loadClass(String.format("%s.module.member.api.user.MemberUserApi", basePackage)));
|
memberUserApi = SpringUtil.getBean(ClassUtil.loadClass(MEMBER_USER_API_CLASS_NAME));
|
||||||
}
|
}
|
||||||
return memberUserApi;
|
return memberUserApi;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue