gateway:增加访问日志,待优化~
parent
f32e43863b
commit
f4c83c0168
|
@ -0,0 +1,251 @@
|
|||
package cn.iocoder.yudao.gateway.filter.logging;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import com.alibaba.nacos.common.utils.StringUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
|
||||
import org.springframework.cloud.gateway.route.Route;
|
||||
import org.springframework.cloud.gateway.support.BodyInserterContext;
|
||||
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferFactory;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.reactive.function.BodyInserter;
|
||||
import org.springframework.web.reactive.function.BodyInserters;
|
||||
import org.springframework.web.reactive.function.server.HandlerStrategies;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class AccessLogFilter implements GlobalFilter, Ordered {
|
||||
|
||||
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return -100;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
|
||||
// 请求路径
|
||||
String requestPath = request.getPath().pathWithinApplication().value();
|
||||
|
||||
Route route = getGatewayRoute(exchange);
|
||||
|
||||
|
||||
// String ipAddress = WebUtils.getServerHttpRequestIpAddress(request);
|
||||
String ipAddress = "127.0.0.1";
|
||||
|
||||
GatewayLog gatewayLog = new GatewayLog();
|
||||
gatewayLog.setSchema(request.getURI().getScheme());
|
||||
gatewayLog.setRequestMethod(request.getMethodValue());
|
||||
gatewayLog.setRequestPath(requestPath);
|
||||
gatewayLog.setTargetServer(route.getId());
|
||||
gatewayLog.setRequestTime(new Date());
|
||||
gatewayLog.setIp(ipAddress);
|
||||
|
||||
MediaType mediaType = request.getHeaders().getContentType();
|
||||
|
||||
if(MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)){
|
||||
return writeBodyLog(exchange, chain, gatewayLog);
|
||||
}else{
|
||||
return writeBasicLog(exchange, chain, gatewayLog);
|
||||
}
|
||||
}
|
||||
|
||||
private Mono<Void> writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog accessLog) {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
|
||||
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
|
||||
builder.append(entry.getKey()).append("=").append(StringUtils.join(entry.getValue(), ","));
|
||||
}
|
||||
accessLog.setRequestBody(builder.toString());
|
||||
|
||||
//获取响应体
|
||||
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
|
||||
|
||||
return chain.filter(exchange.mutate().response(decoratedResponse).build())
|
||||
.then(Mono.fromRunnable(() -> {
|
||||
// 打印日志
|
||||
writeAccessLog(accessLog);
|
||||
}));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 解决 request body 只能读取一次问题,
|
||||
* 参考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
|
||||
* @param exchange
|
||||
* @param chain
|
||||
* @param gatewayLog
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Mono writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
|
||||
ServerRequest serverRequest = ServerRequest.create(exchange,messageReaders);
|
||||
|
||||
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class)
|
||||
.flatMap(body ->{
|
||||
gatewayLog.setRequestBody(body);
|
||||
return Mono.just(body);
|
||||
});
|
||||
|
||||
// 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次
|
||||
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.putAll(exchange.getRequest().getHeaders());
|
||||
// the new content type will be computed by bodyInserter
|
||||
// and then set in the request decorator
|
||||
headers.remove(HttpHeaders.CONTENT_LENGTH);
|
||||
|
||||
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
|
||||
|
||||
return bodyInserter.insert(outputMessage,new BodyInserterContext())
|
||||
.then(Mono.defer(() -> {
|
||||
// 重新封装请求
|
||||
ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
|
||||
|
||||
// 记录响应日志
|
||||
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
|
||||
|
||||
// 记录普通的
|
||||
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build())
|
||||
.then(Mono.fromRunnable(() -> {
|
||||
// 打印日志
|
||||
writeAccessLog(gatewayLog);
|
||||
}));
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* 打印日志
|
||||
* @author javadaily
|
||||
* @date 2021/3/24 14:53
|
||||
* @param gatewayLog 网关日志
|
||||
*/
|
||||
private void writeAccessLog(GatewayLog gatewayLog) {
|
||||
log.info(gatewayLog.toString());
|
||||
}
|
||||
|
||||
|
||||
|
||||
private Route getGatewayRoute(ServerWebExchange exchange) {
|
||||
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 请求装饰器,重新计算 headers
|
||||
* @param exchange
|
||||
* @param headers
|
||||
* @param outputMessage
|
||||
* @return
|
||||
*/
|
||||
private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers,
|
||||
CachedBodyOutputMessage outputMessage) {
|
||||
return new ServerHttpRequestDecorator(exchange.getRequest()) {
|
||||
@Override
|
||||
public HttpHeaders getHeaders() {
|
||||
long contentLength = headers.getContentLength();
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.putAll(super.getHeaders());
|
||||
if (contentLength > 0) {
|
||||
httpHeaders.setContentLength(contentLength);
|
||||
} else {
|
||||
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
|
||||
// httpbin.org
|
||||
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
|
||||
}
|
||||
return httpHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return outputMessage.getBody();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 记录响应日志
|
||||
* 通过 DataBufferFactory 解决响应体分段传输问题。
|
||||
*/
|
||||
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
DataBufferFactory bufferFactory = response.bufferFactory();
|
||||
|
||||
return new ServerHttpResponseDecorator(response) {
|
||||
@Override
|
||||
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
|
||||
if (body instanceof Flux) {
|
||||
Date responseTime = new Date();
|
||||
gatewayLog.setResponseTime(responseTime);
|
||||
// 计算执行时间
|
||||
long executeTime = (responseTime.getTime() - gatewayLog.getRequestTime().getTime());
|
||||
|
||||
gatewayLog.setExecuteTime(executeTime);
|
||||
|
||||
// 获取响应类型,如果是 json 就打印
|
||||
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
|
||||
|
||||
|
||||
if (ObjectUtil.equal(this.getStatusCode(), HttpStatus.OK)
|
||||
&& StringUtils.isNotBlank(originalResponseContentType)
|
||||
&& originalResponseContentType.contains("application/json")) {
|
||||
|
||||
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
|
||||
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
|
||||
|
||||
// 合并多个流集合,解决返回体分段传输
|
||||
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
|
||||
DataBuffer join = dataBufferFactory.join(dataBuffers);
|
||||
byte[] content = new byte[join.readableByteCount()];
|
||||
join.read(content);
|
||||
|
||||
// 释放掉内存
|
||||
DataBufferUtils.release(join);
|
||||
String responseResult = new String(content, StandardCharsets.UTF_8);
|
||||
|
||||
|
||||
|
||||
gatewayLog.setResponseData(responseResult);
|
||||
|
||||
return bufferFactory.wrap(content);
|
||||
}));
|
||||
}
|
||||
}
|
||||
// if body is not a flux. never got there.
|
||||
return super.writeWith(body);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package cn.iocoder.yudao.gateway.filter.logging;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Data
|
||||
public class GatewayLog {
|
||||
/**访问实例*/
|
||||
private String targetServer;
|
||||
/**请求路径*/
|
||||
private String requestPath;
|
||||
/**请求方法*/
|
||||
private String requestMethod;
|
||||
/**协议 */
|
||||
private String schema;
|
||||
/**请求体*/
|
||||
private String requestBody;
|
||||
/**响应体*/
|
||||
private String responseData;
|
||||
/**请求ip*/
|
||||
private String ip;
|
||||
/**请求时间*/
|
||||
private Date requestTime;
|
||||
/**响应时间*/
|
||||
private Date responseTime;
|
||||
/**执行时间*/
|
||||
private long executeTime;
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
package cn.iocoder.yudao.gateway.filter.web;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
|
||||
import org.springframework.cloud.gateway.filter.GlobalFilter;
|
||||
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
|
||||
import org.springframework.cloud.gateway.support.BodyInserterContext;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DataBufferUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.codec.HttpMessageReader;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.function.BodyInserter;
|
||||
import org.springframework.web.reactive.function.BodyInserters;
|
||||
import org.springframework.web.reactive.function.server.HandlerStrategies;
|
||||
import org.springframework.web.reactive.function.server.ServerRequest;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 缓存 Request Body 和 的过滤器
|
||||
*
|
||||
* 小知识:Request Body 都是无法重复读取的,所以需要实现一个缓存的。
|
||||
* 从功能上,它类似 yudao-spring-boot-starter-web 的 CacheRequestBodyFilter 过滤器
|
||||
*
|
||||
* 实现基本是拷贝 {@link org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory} 类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Component
|
||||
public class CacheRequestBodyFilter implements GlobalFilter, Ordered {
|
||||
|
||||
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
|
||||
|
||||
@Override
|
||||
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
|
||||
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
|
||||
|
||||
// TODO: flux or mono
|
||||
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class);
|
||||
// .flatMap()
|
||||
// .switchIfEmpty(Mono.just(""));
|
||||
|
||||
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.putAll(exchange.getRequest().getHeaders());
|
||||
|
||||
// the new content type will be computed by bodyInserter
|
||||
// and then set in the request decorator
|
||||
headers.remove(HttpHeaders.CONTENT_LENGTH);
|
||||
|
||||
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
|
||||
return bodyInserter.insert(outputMessage, new BodyInserterContext())
|
||||
// .log("modify_request", Level.INFO)
|
||||
.then(Mono.defer(() -> {
|
||||
ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
|
||||
return chain.filter(exchange.mutate().request(decorator).build());
|
||||
})).onErrorResume((Function<Throwable, Mono<Void>>) throwable -> release(exchange,
|
||||
outputMessage, throwable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getOrder() {
|
||||
// 必须小于等于 -2,否则无法获取响应结果. 因为 WebClientWriteResponseFilter 和 NettyWriteResponseFilter 是 -1,优先级要高于它们
|
||||
return -2;
|
||||
}
|
||||
|
||||
protected Mono<Void> release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage,
|
||||
Throwable throwable) {
|
||||
// add by 芋道源码:由于 CachedBodyOutputMessage 的 isCached 非 public 方法,所以只能反射调用
|
||||
if ((boolean) ReflectUtil.getFieldValue(outputMessage, "cached")) {
|
||||
return outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable));
|
||||
}
|
||||
return Mono.error(throwable);
|
||||
}
|
||||
|
||||
ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
|
||||
CachedBodyOutputMessage outputMessage) {
|
||||
return new ServerHttpRequestDecorator(exchange.getRequest()) {
|
||||
|
||||
@Override
|
||||
public HttpHeaders getHeaders() {
|
||||
long contentLength = headers.getContentLength();
|
||||
HttpHeaders httpHeaders = new HttpHeaders();
|
||||
httpHeaders.putAll(headers);
|
||||
if (contentLength > 0) {
|
||||
httpHeaders.setContentLength(contentLength);
|
||||
}
|
||||
else {
|
||||
// TODO: this causes a 'HTTP/1.1 411 Length Required' // on
|
||||
// httpbin.org
|
||||
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
|
||||
}
|
||||
return httpHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return outputMessage.getBody();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue