简介

分布式系统中,由多个服务构成,每个请求路由过来后,会在多个服务中留下追踪ID,可以基于此追踪ID排查问题,分析请求的执行链路。

业界也有比较成熟的链路追踪ID方案,比如Skywalking,它基于动态字节码技术,本身会增加系统的复杂性,以及它需要单独部署服务对信息进行采集,这对于ToB资源敏感的场景下并不适用。traceID是基于自身必需框架开发的,主要技术是日志MDC、跨线程包装、中间件拦截器,所以可以几乎不增加任何额外资源开销。

本次traceId支持dubbo,kafka,http请求,线程池

接入步骤

1、日志中增加traceId变量

logback.xml日志配置,当然你也可以自定义格式,变量%X{traceId}是追踪ID替换值:

若框架使用log4j,在日志格式中增加%X{traceId}变量

2、代码实现

2.0 依赖,不同实现按需添加依赖,例如kafka相关接入traceId,项目本身已经引入kafka

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-aop

2.1 http请求

编写拦截器

public class TraceWebInterceptor extends HandlerInterceptorAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(TraceWebInterceptor.class);

@Override

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

request.setAttribute("startTime", System.currentTimeMillis());

//traceOrigin、traceCaller、traceId

String traceOrigin = request.getHeader(TraceConstants.LOG_TRACE_ORIGIN);

String traceCaller = request.getHeader(TraceConstants.LOG_TRACE_CALLER);

String traceId = request.getHeader(TraceConstants.LOG_TRACE_ID);

//如果不存在traceId需要生成

if (StringUtils.isBlank(traceId)) {

boolean generate = TraceUtil.loadTraceInfo();

if(generate) {

LOGGER.debug("[生成追踪信息]" + TraceUtil.getTraceInfoString());

}

}else {

//设置MDC

MDC.put(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin);

MDC.put(TraceConstants.LOG_TRACE_CALLER, traceCaller);

MDC.put(TraceConstants.LOG_TRACE_ID, traceId);

}

//IP

String traceIp = IpUtil.getIp(request);

MDC.put(TraceConstants.LOG_TRACE_IP, traceIp);

//响应返回

response.setHeader(TraceConstants.LOG_TRACE_ID, TraceUtil.getTraceId());

return super.preHandle(request, response, handler);

}

@Override

public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws IOException {

if (LOGGER.isInfoEnabled()) {

long upmsStartTime = (long) request.getAttribute("startTime");

long upmsEndTime = System.currentTimeMillis();

long upmsIntervalTime = upmsEndTime - upmsStartTime;

LOGGER.info("{} {}接口耗时{}毫秒", request.getRequestURL(), request.getMethod(), upmsIntervalTime);

}

MDC.clear();

}

编写Config类, 将拦截器TraceWebInterceptor添加到容器

@Configuration

@ConditionalOnClass({HandlerInterceptorAdapter.class, MDC.class, HttpServletRequest.class})

public class TraceWebAutoConfiguration implements WebMvcConfigurer {

private static List EXCLUDE_PATHS = new ArrayList<>();

@Value("${" + TraceConstants.CONFIG_TRACE_EXCLUDE_PATHS + ":}")

private String excludePaths;

@Bean

public TraceWebInterceptor traceWebInterceptor() {

return new TraceWebInterceptor();

}

@Override

public void addInterceptors(InterceptorRegistry registry) {

EXCLUDE_PATHS.add("/error");

EXCLUDE_PATHS.add("/actuator/**");

if (StringUtils.isNotBlank(excludePaths)) {

if (excludePaths.contains(",")) {

String[] split = excludePaths.split(",");

EXCLUDE_PATHS.addAll(Arrays.asList(split));

} else {

EXCLUDE_PATHS.add(excludePaths);

}

}

//该方式不能过全部过滤掉

registry.addInterceptor(traceWebInterceptor()).order(-100).excludePathPatterns(EXCLUDE_PATHS);

}

}

工具类

public class IpUtil {

private static final String UNKNOWN = "unknown";

public static String getIp(HttpServletRequest request) {

if (request == null) {

return UNKNOWN;

}

String ip = request.getHeader("x-forwarded-for");

if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {

ip = request.getHeader("Proxy-Client-IP");

}

if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {

ip = request.getHeader("X-Forwarded-For");

}

if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {

ip = request.getHeader("WL-Proxy-Client-IP");

}

if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {

ip = request.getHeader("X-Real-IP");

}

if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {

ip = request.getRemoteAddr();

}

return "0:0:0:0:0:0:0:1".equals(ip) ? "127.0.0.1" : ip;

}

}

public class TraceConstants {

public static final String LOG_TRACE_ORIGIN = "traceOrigin";

public static final String LOG_TRACE_CALLER = "traceCaller";

public static final String LOG_TRACE_IP = "traceIp";

public static final String LOG_TRACE_ID = "traceId";

public static final String CONFIG_TRACE_EXCLUDE_PATHS = "trace.exclude.paths";

public TraceConstants() {

}

}

import java.util.UUID;

import org.apache.commons.lang3.StringUtils;

import org.slf4j.MDC;

public class TraceUtil {

private static boolean simbaHttpClientInterceptorFlag = true;

private static boolean sdkInterceptorFlag = false;

private static String applicationName;

public TraceUtil() {

}

public static void setApplicationName(String applicationName) {

TraceUtil.applicationName = applicationName;

}

public static String getApplicationName() {

return applicationName;

}

public static boolean getSimbaHttpClientInterceptorFlag() {

return simbaHttpClientInterceptorFlag;

}

public static void setSimbaHttpClientInterceptorFlag(boolean simbaHttpClientInterceptorFlag) {

TraceUtil.simbaHttpClientInterceptorFlag = simbaHttpClientInterceptorFlag;

}

public static boolean getSdkInterceptorFlag() {

return sdkInterceptorFlag;

}

public static void setSdkInterceptorFlag(boolean sdkInterceptorFlag) {

TraceUtil.sdkInterceptorFlag = sdkInterceptorFlag;

}

public static void setTraceCaller(String traceCaller) {

MDC.put("traceCaller", traceCaller);

}

public static String getTraceCaller() {

return MDC.get("traceCaller");

}

public static void setTraceOrigin(String traceOrigin) {

MDC.put("traceOrigin", traceOrigin);

}

public static String getTraceOrigin() {

return MDC.get("traceOrigin");

}

public static void setTraceId(String traceId) {

MDC.put("traceId", traceId);

}

public static void removeTraceId() {

MDC.remove("traceId");

}

public static void clearMdc() {

MDC.clear();

}

public static String getTraceId() {

return MDC.get("traceId");

}

public static String genTraceId() {

return UUID.randomUUID().toString().replace("-", "");

}

public static String getTraceIp() {

return MDC.get("traceIp");

}

public static void setTraceIp(String traceIp) {

MDC.put("traceIp", traceIp);

}

public static boolean loadTraceInfo() {

boolean generate = false;

String traceId = getTraceId();

if (StringUtils.isBlank(traceId)) {

traceId = genTraceId();

generate = true;

}

setTraceId(traceId);

return generate;

}

public static String getTraceInfoString() {

return "TraceId:" + getTraceId() + ". traceCaller:" + getTraceCaller() + ". traceOrigin:" + getTraceOrigin();

}

}

import org.slf4j.MDC;

import java.util.Map;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Callable;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.ForkJoinTask;

import java.util.concurrent.Future;

import java.util.concurrent.RejectedExecutionHandler;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class WrapUtil {

public static Callable wrap(final Callable callable, final Map context) {

return () -> {

if (context == null) {

MDC.clear();

} else {

MDC.setContextMap(context);

}

TraceUtil.loadTraceInfo();

try {

return callable.call();

} finally {

MDC.clear();

}

};

}

public static Callable wrap(final Callable callable) {

return wrap(callable, MDC.getCopyOfContextMap());

}

public static Runnable wrap(final Runnable runnable, final Map context) {

return () -> {

if (context == null) {

MDC.clear();

} else {

MDC.setContextMap(context);

}

TraceUtil.loadTraceInfo();

try {

runnable.run();

} finally {

MDC.clear();

}

};

}

public static Runnable wrap(final Runnable runnable) {

return wrap(runnable, MDC.getCopyOfContextMap());

}

public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue, ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

}

public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue, ThreadFactory threadFactory) {

return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

}

public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue) {

return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

}

public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue,

RejectedExecutionHandler handler) {

return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

}

public static ForkJoinPool newForkJoinPool() {

return new ForkJoinPoolMdcWrapper();

}

public static class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor {

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue, ThreadFactory threadFactory) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue, RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);

}

public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

BlockingQueue workQueue, ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

}

@Override

public void execute(Runnable task) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public Future submit(Runnable task, T result) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), result);

}

@Override

public Future submit(Callable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public Future submit(Runnable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

}

public static class ForkJoinPoolMdcWrapper extends ForkJoinPool {

public ForkJoinPoolMdcWrapper() {

super();

}

public ForkJoinPoolMdcWrapper(int parallelism) {

super(parallelism);

}

public ForkJoinPoolMdcWrapper(int parallelism, ForkJoinWorkerThreadFactory factory,

Thread.UncaughtExceptionHandler handler, boolean asyncMode) {

super(parallelism, factory, handler, asyncMode);

}

@Override

public void execute(Runnable task) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public ForkJoinTask submit(Runnable task, T result) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), result);

}

@Override

public ForkJoinTask submit(Callable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

}

}

2.2 Spring中的@Async注解中的线程池

@Configuration

public class TaskExecutorConfig {

@Bean

public TaskExecutor taskExecutor() {

ThreadPoolTaskExecutor executor = TraceSpringTraceWrapUtil.newThreadPoolTaskExecutor();

// 设置核心线程数

executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());

// 设置最大线程数

executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10);

// 设置队列容量

executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 10);

// 设置线程活跃时间(秒)

executor.setKeepAliveSeconds(10);

// 设置默认线程名称

executor.setThreadNamePrefix("scheduled-");

// 设置拒绝策略

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 等待所有任务结束后再关闭线程池

executor.setWaitForTasksToCompleteOnShutdown(true);

executor.initialize();

return executor;

}

}

相关工具类 

import com.startdt.license.util.WrapUtil;

import org.slf4j.MDC;

import org.springframework.scheduling.Trigger;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import org.springframework.util.concurrent.ListenableFuture;

import java.util.Date;

import java.util.concurrent.Callable;

import java.util.concurrent.Future;

import java.util.concurrent.RunnableScheduledFuture;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.ScheduledThreadPoolExecutor;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.TimeUnit;

public class TraceSpringTraceWrapUtil extends WrapUtil {

public static class ThreadPoolTaskExecutorMdcWrapper extends ThreadPoolTaskExecutor {

@Override

public void execute(Runnable task) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public void execute(Runnable task, long startTimeout) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);

}

@Override

public Future submit(Callable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public Future submit(Runnable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public ListenableFuture submitListenable(Runnable task) {

return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public ListenableFuture submitListenable(Callable task) {

return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

}

public static class ScheduledThreadPoolTaskExecutorMdcWrapper extends ScheduledThreadPoolExecutor {

public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize) {

super(corePoolSize);

}

public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize, ThreadFactory threadFactory) {

super(corePoolSize, threadFactory);

}

@Override

protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) {

return super.decorateTask(WrapUtil.wrap(callable, MDC.getCopyOfContextMap()), task);

}

@Override

protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) {

return super.decorateTask(WrapUtil.wrap(runnable, MDC.getCopyOfContextMap()), task);

}

@Override

public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) {

return super.schedule(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), delay, unit);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {

return super.scheduleAtFixedRate(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), initialDelay, period, unit);

}

}

public static class ThreadPoolTaskSchedulerWrapper extends ThreadPoolTaskScheduler {

@Override

public ScheduledFuture schedule(Runnable task, Date startTime) {

return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime);

}

@Override

public ScheduledFuture schedule(Runnable task, Trigger trigger) {

return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), trigger);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {

return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), period);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {

return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime, period);

}

@Override

public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {

return super.scheduleWithFixedDelay(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), delay);

}

}

public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor() {

return new ThreadPoolTaskExecutorMdcWrapper();

}

public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize) {

return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize);

}

public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {

return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize, threadFactory);

}

public static ThreadPoolTaskScheduler newThreadPoolTaskScheduler() {

return new ThreadPoolTaskSchedulerWrapper();

}

}

2.3 spring线程池

import com.startdt.license.util.WrapUtil;

import org.slf4j.MDC;

import org.springframework.scheduling.Trigger;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import org.springframework.util.concurrent.ListenableFuture;

import java.util.Date;

import java.util.concurrent.Callable;

import java.util.concurrent.Future;

import java.util.concurrent.RunnableScheduledFuture;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.ScheduledThreadPoolExecutor;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.TimeUnit;

public class TraceSpringTraceWrapUtil extends WrapUtil {

public static class ThreadPoolTaskExecutorMdcWrapper extends ThreadPoolTaskExecutor {

@Override

public void execute(Runnable task) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public void execute(Runnable task, long startTimeout) {

super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);

}

@Override

public Future submit(Callable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public Future submit(Runnable task) {

return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public ListenableFuture submitListenable(Runnable task) {

return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

@Override

public ListenableFuture submitListenable(Callable task) {

return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap()));

}

}

public static class ScheduledThreadPoolTaskExecutorMdcWrapper extends ScheduledThreadPoolExecutor {

public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize) {

super(corePoolSize);

}

public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize, ThreadFactory threadFactory) {

super(corePoolSize, threadFactory);

}

@Override

protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) {

return super.decorateTask(WrapUtil.wrap(callable, MDC.getCopyOfContextMap()), task);

}

@Override

protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) {

return super.decorateTask(WrapUtil.wrap(runnable, MDC.getCopyOfContextMap()), task);

}

@Override

public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) {

return super.schedule(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), delay, unit);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {

return super.scheduleAtFixedRate(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), initialDelay, period, unit);

}

}

public static class ThreadPoolTaskSchedulerWrapper extends ThreadPoolTaskScheduler {

@Override

public ScheduledFuture schedule(Runnable task, Date startTime) {

return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime);

}

@Override

public ScheduledFuture schedule(Runnable task, Trigger trigger) {

return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), trigger);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable task, long period) {

return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), period);

}

@Override

public ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period) {

return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime, period);

}

@Override

public ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay) {

return super.scheduleWithFixedDelay(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), delay);

}

}

public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor() {

return new ThreadPoolTaskExecutorMdcWrapper();

}

public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize) {

return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize);

}

public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {

return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize, threadFactory);

}

public static ThreadPoolTaskScheduler newThreadPoolTaskScheduler() {

return new ThreadPoolTaskSchedulerWrapper();

}

}

2.4 dubbo rpc

@Activate(group = {DubboConstants.CONSUMER} , order = -9999)

public class TraceDubboConsumerFilter implements Filter {

private Logger logger = LoggerFactory.getLogger(TraceDubboConsumerFilter.class);

@Override

public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {

String traceId = TraceUtil.getTraceId();

String traceCaller = TraceUtil.getTraceCaller();

String traceOrigin = TraceUtil.getTraceOrigin();

if(StringUtils.isBlank(traceId)) {

traceId = TraceUtil.genTraceId();

traceCaller = TraceUtil.getApplicationName();

traceOrigin = TraceUtil.getApplicationName();

logger.debug("[仅生成追踪信息]traceId:{}. traceCaller:{}. traceOrigin:{}", traceId, traceCaller, traceOrigin);

}

RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_ID, traceId);

RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_CALLER, traceCaller);

RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin);

return invoker.invoke(invocation);

}

}

@Activate(group = {DubboConstants.PROVIDER}, order = -10000)

public class TraceDubboProviderFilter implements Filter {

private Logger logger = LoggerFactory.getLogger(TraceDubboConsumerFilter.class);

@Override

public Result invoke(Invoker invoker, Invocation invocation) throws RpcException {

//traceOrigin、traceCaller、traceId

String traceId = invocation.getAttachment(TraceConstants.LOG_TRACE_ID);

String traceCaller = invocation.getAttachment(TraceConstants.LOG_TRACE_CALLER);

String traceOrigin = invocation.getAttachment(TraceConstants.LOG_TRACE_ORIGIN);

//如果不存在traceId需要生成

if (StringUtils.isBlank(traceId)) {

boolean generate = TraceUtil.loadTraceInfo();

if(generate) {

logger.info("[生成追踪信息]" + TraceUtil.getTraceInfoString());

}

}else {

//设置MDC

MDC.put(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin);

MDC.put(TraceConstants.LOG_TRACE_CALLER, traceCaller);

MDC.put(TraceConstants.LOG_TRACE_ID, traceId);

}

//IP

String clientIp = RpcContext.getContext().getRemoteHost();

MDC.put(TraceConstants.LOG_TRACE_IP, clientIp);

try {

return invoker.invoke(invocation);

} finally {

MDC.clear();

}

}

}

2.5 kafka

生产者:

a:增加配置项:

spring:

kafka:

producer:

properties:

interceptor:

classes: com.trace.kafka.TraceProducerInterceptor

b.kafkaTemplateAutoConfig代码中增加

if (StringUtils.isNotBlank(producerInterceptors)) {

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, producerInterceptors);

}

消费者前增加代码:

TraceConsumerUtil.trace(consumerRecord);

 消费者工具类

import com.startdt.simba.boot.common.constants.TraceConstants;

import com.startdt.simba.boot.common.utils.TraceUtil;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.common.header.Header;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class TraceConsumerUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(TraceConsumerUtil.class);

public static void trace(ConsumerRecord consumerRecord) {

try {

if(consumerRecord == null) {

return;

}

Iterable

headers = consumerRecord.headers().headers(TraceConstants.LOG_TRACE_ID);

Header firstHeader = null;

for(Header header : headers) {

if(firstHeader != null) {

break;

}

if(header != null) {

firstHeader = header;

}

}

if(firstHeader == null) {

TraceUtil.setTraceId(TraceUtil.genTraceId());

}else {

TraceUtil.setTraceId(new String(firstHeader.value()));

}

}catch (Exception ex) {

LOGGER.warn("Kafka tracking ID setting failed, reason:" + ex.getMessage(), ex);

}

}

}

 生产者拦截器

public class TraceProducerInterceptor implements ProducerInterceptor {

private static final Logger LOGGER = LoggerFactory.getLogger(TraceProducerInterceptor.class);

@Override

public ProducerRecord onSend(ProducerRecord record) {

try {

TraceUtil.loadTraceInfo();

String traceId = TraceUtil.getTraceId();

if(StrUtil.isNotBlank(traceId)) {

record.headers().add(TraceConstants.LOG_TRACE_ID, traceId.getBytes());

}

}catch (Exception ex) {

LOGGER.warn("Kafka tracking ID setting failed, reason:" + ex.getMessage(), ex);

}

return record;

}

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

}

@Override

public void close() {

}

@Override

public void configure(Map configs) {

}

}

2.6 线程池使用步骤

替换ThreadPoolExecutor为包装类

private static ThreadPoolExecutor executor = new TraceWrapUtil.ThreadPoolExecutorMdcWrapper(defaultPoolSize, defaultPoolSize,

10L,TimeUnit.SECONDS,

new ArrayBlockingQueue<>(1000),

new NameThreadFactory("warn.message.handler."), new AbortPolicy());

精彩链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: