fix messageModel=BROADCASTING

Signed-off-by: Fanjc <271366833@qq.com>
pull/43/head
Fanjc 2023-07-17 08:01:08 +00:00 committed by Gitee
parent d68fa54e55
commit 5484ae14de
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
1 changed files with 21 additions and 0 deletions

View File

@ -28,6 +28,27 @@ public class EnvEnvironmentPostProcessor implements EnvironmentPostProcessor {
@Override @Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
// 兼容RocketMQBus
// spring-cloud-starter-bus-rocketmq 2021.0.4.0中固定了springcloudbus的input名为springCloudBusInput
// 但是并没有设置为广播模式导致bus总线依旧为集群消费
// 解决方案 在defaultProperties中添加messageModel=BROADCASTING
// 参考https://github.com/alibaba/spring-cloud-alibaba/pull/2785
MutablePropertySources propertySources = environment.getPropertySources();
if (!propertySources.contains("defaultProperties")) {
Map<String, Object> map = new HashMap();
map.put("spring.cloud.stream.rocketmq.bindings.springCloudBusInput.consumer.messageModel","BROADCASTING");
MapPropertySource target = new MapPropertySource("defaultProperties", map);
propertySources.addLast(target);
}else{
PropertySource<?> source = propertySources.get("defaultProperties");
if (source instanceof MapPropertySource) {
MapPropertySource target = (MapPropertySource)source;
target.getSource().put("spring.cloud.stream.rocketmq.bindings.springCloudBusInput.consumer.messageModel", "BROADCASTING");
}
}
// 0. 设置 ${HOST_NAME} 兜底的环境变量 // 0. 设置 ${HOST_NAME} 兜底的环境变量
String hostNameKey = StrUtil.subBetween(HOST_NAME_VALUE, "{", "}"); String hostNameKey = StrUtil.subBetween(HOST_NAME_VALUE, "{", "}");
if (!environment.containsProperty(hostNameKey)) { if (!environment.containsProperty(hostNameKey)) {