diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java index acf3ad043..e520d7879 100644 --- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/config/IotMessageBusAutoConfiguration.java @@ -7,6 +7,7 @@ import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessag import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.rabbitmq.IotRabbitMQMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus; import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.IotRocketMQMessageBus; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; @@ -14,8 +15,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.redisson.api.RedissonClient; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; @@ -125,5 +129,25 @@ public class IotMessageBusAutoConfiguration { } } + // ==================== RabbitMQ 实现 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rabbitmq") + @ConditionalOnClass(RabbitTemplate.class) + public static class IotRabbitMQMessageBusConfiguration { + + @Bean + @ConditionalOnMissingBean + public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) { + return new RabbitAdmin(rabbitTemplate); + } + + @Bean + public IotRabbitMQMessageBus iotRabbitMQMessageBus(RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin) { + log.info("[iotRabbitMQMessageBus][创建 IoT RabbitMQ 消息总线]"); + return new IotRabbitMQMessageBus(rabbitTemplate, rabbitAdmin); + } + + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rabbitmq/IotRabbitMQMessageBus.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rabbitmq/IotRabbitMQMessageBus.java new file mode 100644 index 000000000..89fa7de45 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/messagebus/core/rabbitmq/IotRabbitMQMessageBus.java @@ -0,0 +1,101 @@ +package cn.iocoder.yudao.module.iot.core.messagebus.core.rabbitmq; + +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.core.RabbitAdmin; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; + +import javax.annotation.PreDestroy; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * 基于 RabbitMQ 的 {@link IotMessageBus} 实现类 + * + * @author ywc + */ +@Slf4j +@RequiredArgsConstructor +public class IotRabbitMQMessageBus implements IotMessageBus { + + private final RabbitTemplate rabbitTemplate; + + private final RabbitAdmin rabbitAdmin; + + @Getter + private final List> subscribers = new ArrayList<>(); + + private final List containers = new ArrayList<>(); + + @Override + public void post(String topic, Object message) { + String json = JsonUtils.toJsonString(message); + rabbitTemplate.send(topic, "#", new Message(json.getBytes())); + log.info("[post][topic({}) 发送消息({})]", topic, message); + } + + @Override + public void register(IotMessageSubscriber subscriber) { + Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0); + if (type == null) { + throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); + } + + String topic = subscriber.getTopic(); + String group = subscriber.getGroup(); + + Queue queue = new Queue(group, true, false, false); + rabbitAdmin.declareQueue(queue); + + TopicExchange exchange = new TopicExchange(topic); + rabbitAdmin.declareExchange(exchange); + + Binding binding = BindingBuilder.bind(queue).to(exchange).with("#"); + rabbitAdmin.declareBinding(binding); + + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()); + container.setQueues(queue); + container.setConcurrentConsumers(1); + container.setMaxConcurrentConsumers(10); + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + + container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> { + String body = new String(message.getBody()); + try { + subscriber.onMessage(JsonUtils.parseObject(body, type)); + channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + } catch (Exception ex) { + log.error("[onMessage][topic({}/{}) message({}) 处理异常]", topic, group, body, ex); + channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); + } + }); + + container.start(); + + containers.add(container); + subscribers.add(subscriber); + } + + @PreDestroy + public void destroy() { + for (SimpleMessageListenerContainer container : containers) { + try { + container.stop(); + container.destroy(); + log.info("[destroy][关闭 RabbitMQ 消费者容器成功]"); + } catch (Exception e) { + log.error("[destroy]关闭 RabbitMQ 消费者容器异常]", e); + } + } + } + +}