From f4c83c0168d496709d14cbeaea65a70260d2d045 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Mon, 6 Jun 2022 01:04:42 +0800 Subject: [PATCH] =?UTF-8?q?gateway=EF=BC=9A=E5=A2=9E=E5=8A=A0=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E6=97=A5=E5=BF=97=EF=BC=8C=E5=BE=85=E4=BC=98=E5=8C=96?= =?UTF-8?q?~?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../filter/logging/AccessLogFilter.java | 251 ++++++++++++++++++ .../gateway/filter/logging/GatewayLog.java | 29 ++ .../filter/web/CacheRequestBodyFilter.java | 111 ++++++++ 3 files changed, 391 insertions(+) create mode 100644 yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/AccessLogFilter.java create mode 100644 yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/GatewayLog.java create mode 100644 yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/web/CacheRequestBodyFilter.java diff --git a/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/AccessLogFilter.java b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/AccessLogFilter.java new file mode 100644 index 000000000..08edc6355 --- /dev/null +++ b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/AccessLogFilter.java @@ -0,0 +1,251 @@ +package cn.iocoder.yudao.gateway.filter.logging; + +import cn.hutool.core.util.ObjectUtil; +import com.alibaba.nacos.common.utils.StringUtils; +import lombok.extern.slf4j.Slf4j; +import org.reactivestreams.Publisher; +import org.springframework.cloud.gateway.filter.GatewayFilterChain; +import org.springframework.cloud.gateway.filter.GlobalFilter; +import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage; +import org.springframework.cloud.gateway.route.Route; +import org.springframework.cloud.gateway.support.BodyInserterContext; +import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; +import org.springframework.core.Ordered; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.stereotype.Component; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.BodyInserter; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.server.HandlerStrategies; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.List; +import java.util.Map; + +@Slf4j +@Component +public class AccessLogFilter implements GlobalFilter, Ordered { + + private final List> messageReaders = HandlerStrategies.withDefaults().messageReaders(); + + @Override + public int getOrder() { + return -100; + } + + @Override + @SuppressWarnings("unchecked") + public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { + + ServerHttpRequest request = exchange.getRequest(); + + // 请求路径 + String requestPath = request.getPath().pathWithinApplication().value(); + + Route route = getGatewayRoute(exchange); + + +// String ipAddress = WebUtils.getServerHttpRequestIpAddress(request); + String ipAddress = "127.0.0.1"; + + GatewayLog gatewayLog = new GatewayLog(); + gatewayLog.setSchema(request.getURI().getScheme()); + gatewayLog.setRequestMethod(request.getMethodValue()); + gatewayLog.setRequestPath(requestPath); + gatewayLog.setTargetServer(route.getId()); + gatewayLog.setRequestTime(new Date()); + gatewayLog.setIp(ipAddress); + + MediaType mediaType = request.getHeaders().getContentType(); + + if(MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)){ + return writeBodyLog(exchange, chain, gatewayLog); + }else{ + return writeBasicLog(exchange, chain, gatewayLog); + } + } + + private Mono writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog accessLog) { + StringBuilder builder = new StringBuilder(); + MultiValueMap queryParams = exchange.getRequest().getQueryParams(); + for (Map.Entry> entry : queryParams.entrySet()) { + builder.append(entry.getKey()).append("=").append(StringUtils.join(entry.getValue(), ",")); + } + accessLog.setRequestBody(builder.toString()); + + //获取响应体 + ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog); + + return chain.filter(exchange.mutate().response(decoratedResponse).build()) + .then(Mono.fromRunnable(() -> { + // 打印日志 + writeAccessLog(accessLog); + })); + } + + + /** + * 解决 request body 只能读取一次问题, + * 参考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory + * @param exchange + * @param chain + * @param gatewayLog + * @return + */ + @SuppressWarnings("unchecked") + private Mono writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) { + ServerRequest serverRequest = ServerRequest.create(exchange,messageReaders); + + Mono modifiedBody = serverRequest.bodyToMono(String.class) + .flatMap(body ->{ + gatewayLog.setRequestBody(body); + return Mono.just(body); + }); + + // 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次 + BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); + HttpHeaders headers = new HttpHeaders(); + headers.putAll(exchange.getRequest().getHeaders()); + // the new content type will be computed by bodyInserter + // and then set in the request decorator + headers.remove(HttpHeaders.CONTENT_LENGTH); + + CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); + + return bodyInserter.insert(outputMessage,new BodyInserterContext()) + .then(Mono.defer(() -> { + // 重新封装请求 + ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage); + + // 记录响应日志 + ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog); + + // 记录普通的 + return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build()) + .then(Mono.fromRunnable(() -> { + // 打印日志 + writeAccessLog(gatewayLog); + })); + })); + } + + /** + * 打印日志 + * @author javadaily + * @date 2021/3/24 14:53 + * @param gatewayLog 网关日志 + */ + private void writeAccessLog(GatewayLog gatewayLog) { + log.info(gatewayLog.toString()); + } + + + + private Route getGatewayRoute(ServerWebExchange exchange) { + return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); + } + + + /** + * 请求装饰器,重新计算 headers + * @param exchange + * @param headers + * @param outputMessage + * @return + */ + private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, + CachedBodyOutputMessage outputMessage) { + return new ServerHttpRequestDecorator(exchange.getRequest()) { + @Override + public HttpHeaders getHeaders() { + long contentLength = headers.getContentLength(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(super.getHeaders()); + if (contentLength > 0) { + httpHeaders.setContentLength(contentLength); + } else { + // TODO: this causes a 'HTTP/1.1 411 Length Required' // on + // httpbin.org + httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); + } + return httpHeaders; + } + + @Override + public Flux getBody() { + return outputMessage.getBody(); + } + }; + } + + + /** + * 记录响应日志 + * 通过 DataBufferFactory 解决响应体分段传输问题。 + */ + private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) { + ServerHttpResponse response = exchange.getResponse(); + DataBufferFactory bufferFactory = response.bufferFactory(); + + return new ServerHttpResponseDecorator(response) { + @Override + public Mono writeWith(Publisher body) { + if (body instanceof Flux) { + Date responseTime = new Date(); + gatewayLog.setResponseTime(responseTime); + // 计算执行时间 + long executeTime = (responseTime.getTime() - gatewayLog.getRequestTime().getTime()); + + gatewayLog.setExecuteTime(executeTime); + + // 获取响应类型,如果是 json 就打印 + String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); + + + if (ObjectUtil.equal(this.getStatusCode(), HttpStatus.OK) + && StringUtils.isNotBlank(originalResponseContentType) + && originalResponseContentType.contains("application/json")) { + + Flux fluxBody = Flux.from(body); + return super.writeWith(fluxBody.buffer().map(dataBuffers -> { + + // 合并多个流集合,解决返回体分段传输 + DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); + DataBuffer join = dataBufferFactory.join(dataBuffers); + byte[] content = new byte[join.readableByteCount()]; + join.read(content); + + // 释放掉内存 + DataBufferUtils.release(join); + String responseResult = new String(content, StandardCharsets.UTF_8); + + + + gatewayLog.setResponseData(responseResult); + + return bufferFactory.wrap(content); + })); + } + } + // if body is not a flux. never got there. + return super.writeWith(body); + } + }; + } +} diff --git a/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/GatewayLog.java b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/GatewayLog.java new file mode 100644 index 000000000..1f8793e50 --- /dev/null +++ b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/logging/GatewayLog.java @@ -0,0 +1,29 @@ +package cn.iocoder.yudao.gateway.filter.logging; + +import lombok.Data; + +import java.util.Date; + +@Data +public class GatewayLog { + /**访问实例*/ + private String targetServer; + /**请求路径*/ + private String requestPath; + /**请求方法*/ + private String requestMethod; + /**协议 */ + private String schema; + /**请求体*/ + private String requestBody; + /**响应体*/ + private String responseData; + /**请求ip*/ + private String ip; + /**请求时间*/ + private Date requestTime; + /**响应时间*/ + private Date responseTime; + /**执行时间*/ + private long executeTime; +} diff --git a/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/web/CacheRequestBodyFilter.java b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/web/CacheRequestBodyFilter.java new file mode 100644 index 000000000..2b617818b --- /dev/null +++ b/yudao-gateway/src/main/java/cn/iocoder/yudao/gateway/filter/web/CacheRequestBodyFilter.java @@ -0,0 +1,111 @@ +package cn.iocoder.yudao.gateway.filter.web; + +import cn.hutool.core.util.ReflectUtil; +import org.springframework.cloud.gateway.filter.GatewayFilterChain; +import org.springframework.cloud.gateway.filter.GlobalFilter; +import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage; +import org.springframework.cloud.gateway.support.BodyInserterContext; +import org.springframework.core.Ordered; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.codec.HttpMessageReader; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserter; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.server.HandlerStrategies; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.function.Function; + +/** + * 缓存 Request Body 和 的过滤器 + * + * 小知识:Request Body 都是无法重复读取的,所以需要实现一个缓存的。 + * 从功能上,它类似 yudao-spring-boot-starter-web 的 CacheRequestBodyFilter 过滤器 + * + * 实现基本是拷贝 {@link org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory} 类 + * + * @author 芋道源码 + */ +@Component +public class CacheRequestBodyFilter implements GlobalFilter, Ordered { + + private final List> messageReaders = HandlerStrategies.withDefaults().messageReaders(); + + @Override + public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { + ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders); + + // TODO: flux or mono + Mono modifiedBody = serverRequest.bodyToMono(String.class); +// .flatMap() +// .switchIfEmpty(Mono.just("")); + + BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); + HttpHeaders headers = new HttpHeaders(); + headers.putAll(exchange.getRequest().getHeaders()); + + // the new content type will be computed by bodyInserter + // and then set in the request decorator + headers.remove(HttpHeaders.CONTENT_LENGTH); + + CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); + return bodyInserter.insert(outputMessage, new BodyInserterContext()) + // .log("modify_request", Level.INFO) + .then(Mono.defer(() -> { + ServerHttpRequest decorator = decorate(exchange, headers, outputMessage); + return chain.filter(exchange.mutate().request(decorator).build()); + })).onErrorResume((Function>) throwable -> release(exchange, + outputMessage, throwable)); + } + + @Override + public int getOrder() { + // 必须小于等于 -2,否则无法获取响应结果. 因为 WebClientWriteResponseFilter 和 NettyWriteResponseFilter 是 -1,优先级要高于它们 + return -2; + } + + protected Mono release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage, + Throwable throwable) { + // add by 芋道源码:由于 CachedBodyOutputMessage 的 isCached 非 public 方法,所以只能反射调用 + if ((boolean) ReflectUtil.getFieldValue(outputMessage, "cached")) { + return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable)); + } + return Mono.error(throwable); + } + + ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, + CachedBodyOutputMessage outputMessage) { + return new ServerHttpRequestDecorator(exchange.getRequest()) { + + @Override + public HttpHeaders getHeaders() { + long contentLength = headers.getContentLength(); + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.putAll(headers); + if (contentLength > 0) { + httpHeaders.setContentLength(contentLength); + } + else { + // TODO: this causes a 'HTTP/1.1 411 Length Required' // on + // httpbin.org + httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked"); + } + return httpHeaders; + } + + @Override + public Flux getBody() { + return outputMessage.getBody(); + } + }; + } + +}