跳至主要內容

Skywalking

HeChuangJun约 4283 字大约 14 分钟

Skywalking

Skywalking是专为微服务、云原生和基于容器的(Kubernetes)架构设计的分布式系统性能监控工具。

Skywalking关键特性

  • 分布式跟踪:端到端分布式跟踪。服务拓扑分析、以服务为中心的可观察性和API仪表板。
  • 针对您的技术栈的代理:Java、.Net Core、PHP、NodeJS、Golang、LUA、Rust、C++、客户端JavaScript和Python代理,持续积极开发和维护。
  • 早期采用eBPF技术:Rover代理作为指标收集器和性能分析工具,采用eBPF技术来诊断CPU和网络性能问题。
  • 扩展能力:从一个SkyWalking集群中可以收集和分析1000亿级别的遥测数据。
  • 成熟的遥测生态系统支持:支持成熟的遥测生态系统的指标、跟踪和日志,例如Zipkin、OpenTelemetry、Prometheus、Zabbix、Fluentd等。
  • 原生APM数据库:BanyanDB是一个2022年创建的可观察性数据库,旨在接收、分析和存储遥测/可观察性数据。
  • 一致的指标聚合:SkyWalking本机的度量格式和广泛使用的度量格式(如OpenCensus、OTLP、Telegraf、Zabbix等)通过相同的脚本管道进行处理。
  • 日志管理管道:通过脚本管道支持日志格式化、提取指标和各种采样策略,具有高性能。
  • 报警和遥测管道:支持基于服务、基于部署和基于API的报警规则设置。支持将报警和所有遥测数据转发给第三方。

Skywalking搭建

  1. 安装Skywalking
mkdir -p /etc/sca/skywalking
cd /etc/sca/skywalking
cat > docker-compose.yaml <<-'EOF'
version: '3'
services:
  mysql:
    container_name: mysql
    image: mysql:8
    restart: always
    environment: 
      - MYSQL_ROOT_PASSWORD=root
    ports:
      - "3306:3306"
  nacos:
    image: nacos/nacos-server:v2.2.1-slim
    container_name: nacos
    restart: always
    environment:
      - PREFER_HOST_MODE=hostname
      - MODE=standalone
      - NACOS_AUTH_IDENTITY_KEY=2222
      - NACOS_AUTH_IDENTITY_VALUE=2xxx
      - NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
    depends_on:
      - mysql
    ports:
      - "7848:7848"
      - "8848:8848"
      - "9848:9848"
  oap:
    image: apache/skywalking-oap-server:9.4.0
    container_name: oap
    restart: always
    ports:
      - "11800:11800"
      - "12800:12800"

  oap-ui:
    image: apache/skywalking-ui:9.4.0
    container_name: oap-ui
    restart: always
    environment:
      SW_OAP_ADDRESS: http://oap:12800
    ports:
      - "8080:8080"
    depends_on:
      - oap
EOF
docker-compose up -d

  1. 安装Java Agent
  • 下载后apache-skywalking-java-agent-8.15.0.tgz后解压缩到D:\java-agent
    打开D:\java-agent\config\agent.config,将104行127.0.0.1:11800改成 宿主机IP:11800
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:192.168.31.230:11800}
java jar \
-javaagent:D:\java-agent\skywalking-agent.jar \
-Dskywalking.agent.service_name=c-service \
-Dskywalking.logging.file_name=c-service-api.log \
xxx.jar

idea的配置
agentidea.png

通过WebUI获取监控数据
http://192.168.31.230:8080/open in new window
skywalking.png

Springboot整合+日志

  1. pom.xml,注意版本号与Skywalking的一致
<!-- 该引用用于logback获取tranceId,也就是tid -->
<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-logback-1.x</artifactId>
    <version>9.2.0</version>
</dependency>

<!-- 该引用用于代码获取tranceId -->
<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
    <version>9.2.0</version>
</dependency>
  1. logback配置logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod=" 5 seconds">

    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
            </layout>
        </encoder>
    </appender>

    <appender name="grpc-log" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
                <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
            </layout>
        </encoder>
    </appender>

    <appender name="fileAppender" class="ch.qos.logback.core.FileAppender">
        <file>/tmp/skywalking-logs/logback/e2e-service-provider.log</file>
        <encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
            <layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
                <Pattern>[%sw_ctx] [%level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger:%line - %msg%n</Pattern>
            </layout>
        </encoder>
    </appender>

    <!--需要将日志单独输出的包路径-->
    <logger name="com.itlaoqi.aservice" additivity="false" level="INFO">
        <appender-ref ref="stdout"/>
        <appender-ref ref="grpc-log"/>
        <appender-ref ref="fileAppender"/>
    </logger>

    <root level="INFO">
        <appender-ref ref="grpc-log"/>
        <appender-ref ref="stdout"/>
    </root>
    <logger name="fileLogger" level="INFO">
        <appender-ref ref="fileAppender"/>
    </logger>
</configuration>
  1. 配置agent配置文件,config/agent.config
agent.service_name=my-application
collector.backend_service=oap-server-host:11800
logging.level=DEBUG

plugin.logging=true
  1. 其他的跟上面差不多
  2. 应用里面使用@Sl4j打印日志即可
  3. 查看日志收集结果
    skywalkinglog.png

Sky源码

  • controller中traceId产生:SkyWalkingAgent的premain方法,访问每个插件的skywalking-plugin.def文件加载所有插件包括接口插件(org.apache.skywalking.apm.plugin.spring.mvc.v5.define.ControllerInstrumentation,指定拦截org.springframework.stereotype.Controller类,而它的父类AbstractControllerInstrumentation,已经拦截了RequestMapping,拦截类是RestMappingMethodInterceptor),同时调用installClassTransformer方法增强对应插件类名的类,里面调用了静态内部类Transformer的transform方法,调用所有插件的define方法,而所有插件继承了AbstractClassEnhancePluginDefine类,它的默认define方法调用子类ClassEnhancePluginDefine的enhanceClass和enhanceInstance分别增强了构造方法、实例方法和静态方法,都是调用子类插件的getConstructorsInterceptPoints(),getInstanceMethodsInterceptPoints(),getStaticMethodsInterceptPoints()3个方法返回拦截器的全类名实现调用拦截器接口实现类InstanceMethodsAroundInterceptor的beforeMethod目的,而traceId的产生由RestMappingMethodInterceptor的父类AbstractMethodInterceptor调用beforeMethod,然后里面再调用ContextManager的createEntrySpan方法创建Span,同时采集其他请求信息,服务间调用的时候通过HttpClientDoExecuteInterceptor的beforeMethod方法将trace信息添加到请求头
public class SkyWalkingAgent {
    //主入口。使用byte-buddy transform 来增强所有在插件中定义的类。
    public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
        final PluginFinder pluginFinder;
        try {
            //加载配置
            SnifferConfigInitializer.initializeCoreConfig(agentArgs);
        } catch (Exception e) {
            // try to resolve a new logger, and use the new logger to write the error log here
            LogManager.getLogger(SkyWalkingAgent.class)
                    .error(e, "SkyWalking agent initialized failure. Shutting down.");
            return;
        } finally {
            // refresh logger again after initialization finishes
            LOGGER = LogManager.getLogger(SkyWalkingAgent.class);
        }

        if (!Config.Agent.ENABLE) {
            LOGGER.warn("SkyWalking agent is disabled.");
            return;
        }

        try {
            //加载插件
            pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
        } catch (AgentPackageNotFoundException ape) {
            LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
            return;
        } catch (Exception e) {
            LOGGER.error(e, "SkyWalking agent initialized failure. Shutting down.");
            return;
        }

        try {
            //方法增强
            installClassTransformer(instrumentation, pluginFinder);
        } catch (Exception e) {
            LOGGER.error(e, "Skywalking agent installed class transformer failure.");
        }

        try {
            //启动服务
            ServiceManager.INSTANCE.boot();
        } catch (Exception e) {
            LOGGER.error(e, "Skywalking agent boot failure.");
        }

        Runtime.getRuntime()
               .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
    }

    static void installClassTransformer(Instrumentation instrumentation, PluginFinder pluginFinder) throws Exception {
        LOGGER.info("Skywalking agent begin to install transformer ...");

        AgentBuilder agentBuilder = newAgentBuilder().ignore(
            nameStartsWith("net.bytebuddy.")
                .or(nameStartsWith("org.slf4j."))
                .or(nameStartsWith("org.groovy."))
                .or(nameContains("javassist"))
                .or(nameContains(".asm."))
                .or(nameContains(".reflectasm."))
                .or(nameStartsWith("sun.reflect"))
                .or(allSkyWalkingAgentExcludeToolkit())
                .or(ElementMatchers.isSynthetic()));

        JDK9ModuleExporter.EdgeClasses edgeClasses = new JDK9ModuleExporter.EdgeClasses();
        try {
            agentBuilder = BootstrapInstrumentBoost.inject(pluginFinder, instrumentation, agentBuilder, edgeClasses);
        } catch (Exception e) {
            throw new Exception("SkyWalking agent inject bootstrap instrumentation failure. Shutting down.", e);
        }

        try {
            agentBuilder = JDK9ModuleExporter.openReadEdge(instrumentation, agentBuilder, edgeClasses);
        } catch (Exception e) {
            throw new Exception("SkyWalking agent open read edge in JDK 9+ failure. Shutting down.", e);
        }
        //核心。agentBuilder#transform,
        agentBuilder.type(pluginFinder.buildMatch())
                    .transform(new Transformer(pluginFinder))
                    .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
                    .with(new RedefinitionListener())
                    .with(new Listener())
                    .installOn(instrumentation);

        PluginFinder.pluginInitCompleted();

        LOGGER.info("Skywalking agent transformer has installed.");
    }

    private static class Transformer implements AgentBuilder.Transformer {
        private PluginFinder pluginFinder;

        Transformer(PluginFinder pluginFinder) {
            this.pluginFinder = pluginFinder;
        }

        @Override
        //调用所有插件的define
        public DynamicType.Builder<?> transform(final DynamicType.Builder<?> builder,
                                                final TypeDescription typeDescription,
                                                final ClassLoader classLoader,
                                                final JavaModule javaModule,
                                                final ProtectionDomain protectionDomain) {
            LoadedLibraryCollector.registerURLClassLoader(classLoader);
            List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);
            if (pluginDefines.size() > 0) {
                DynamicType.Builder<?> newBuilder = builder;
                EnhanceContext context = new EnhanceContext();
                for (AbstractClassEnhancePluginDefine define : pluginDefines) {
                    DynamicType.Builder<?> possibleNewBuilder = define.define(
                        typeDescription, newBuilder, classLoader, context);
                    if (possibleNewBuilder != null) {
                        newBuilder = possibleNewBuilder;
                    }
                }
                if (context.isEnhanced()) {
                    LOGGER.debug("Finish the prepare stage for {}.", typeDescription.getName());
                }

                return newBuilder;
            }

            LOGGER.debug("Matched class {}, but ignore by finding mechanism.", typeDescription.getTypeName());
            return builder;
        }
    }
}
public abstract class AbstractClassEnhancePluginDefine {
    /**
     * Main entrance of enhancing the class.
     *
     * @param typeDescription target class description.
     * @param builder         byte-buddy's builder to manipulate target class's bytecode.
     * @param classLoader     load the given transformClass
     * @return the new builder, or <code>null</code> if not be enhanced.
     * @throws PluginException when set builder failure.
     */
    public DynamicType.Builder<?> define(TypeDescription typeDescription, DynamicType.Builder<?> builder,
        ClassLoader classLoader, EnhanceContext context) throws PluginException {
        String interceptorDefineClassName = this.getClass().getName();
        String transformClassName = typeDescription.getTypeName();
        if (StringUtil.isEmpty(transformClassName)) {
            LOGGER.warn("classname of being intercepted is not defined by {}.", interceptorDefineClassName);
            return null;
        }

        LOGGER.debug("prepare to enhance class {} by {}.", transformClassName, interceptorDefineClassName);
        WitnessFinder finder = WitnessFinder.INSTANCE;
        /**
         * find witness classes for enhance class
         */
        String[] witnessClasses = witnessClasses();
        if (witnessClasses != null) {
            for (String witnessClass : witnessClasses) {
                if (!finder.exist(witnessClass, classLoader)) {
                    LOGGER.warn("enhance class {} by plugin {} is not activated. Witness class {} does not exist.", transformClassName, interceptorDefineClassName, witnessClass);
                    return null;
                }
            }
        }
        List<WitnessMethod> witnessMethods = witnessMethods();
        if (!CollectionUtil.isEmpty(witnessMethods)) {
            for (WitnessMethod witnessMethod : witnessMethods) {
                if (!finder.exist(witnessMethod, classLoader)) {
                    LOGGER.warn("enhance class {} by plugin {} is not activated. Witness method {} does not exist.", transformClassName, interceptorDefineClassName, witnessMethod);
                    return null;
                }
            }
        }

        /**
         * find origin class source code for interceptor
         */
        DynamicType.Builder<?> newClassBuilder = this.enhance(typeDescription, builder, classLoader, context);

        context.initializationStageCompleted();
        LOGGER.debug("enhance class {} by {} completely.", transformClassName, interceptorDefineClassName);

        return newClassBuilder;
    }

    /**
     * Begin to define how to enhance class. After invoke this method, only means definition is finished.
     *
     * @param typeDescription target class description
     * @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
     * @return new byte-buddy's builder for further manipulation.
     */
    protected DynamicType.Builder<?> enhance(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
                                             ClassLoader classLoader, EnhanceContext context) throws PluginException {
        newClassBuilder = this.enhanceClass(typeDescription, newClassBuilder, classLoader);

        newClassBuilder = this.enhanceInstance(typeDescription, newClassBuilder, classLoader, context);

        return newClassBuilder;
    }

    /**
     * Enhance a class to intercept constructors and class instance methods.
     *
     * @param typeDescription target class description
     * @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
     * @return new byte-buddy's builder for further manipulation.
     */
    protected abstract DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,
                                                     DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,
                                                     EnhanceContext context) throws PluginException;

    /**
     * Enhance a class to intercept class static methods.
     *
     * @param typeDescription target class description
     * @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
     * @return new byte-buddy's builder for further manipulation.
     */
    protected abstract DynamicType.Builder<?> enhanceClass(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
                                                  ClassLoader classLoader) throws PluginException;
}
public abstract class ClassEnhancePluginDefine extends AbstractClassEnhancePluginDefine {
    private static final ILog LOGGER = LogManager.getLogger(ClassEnhancePluginDefine.class);

    /**
     * Enhance a class to intercept constructors and class instance methods.
     *
     * @param typeDescription target class description
     * @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
     * @return new byte-buddy's builder for further manipulation.
     */
    @Override
    protected DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,
        DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,
        EnhanceContext context) throws PluginException {
        ConstructorInterceptPoint[] constructorInterceptPoints = getConstructorsInterceptPoints();
        InstanceMethodsInterceptPoint[] instanceMethodsInterceptPoints = getInstanceMethodsInterceptPoints();
        String enhanceOriginClassName = typeDescription.getTypeName();
        boolean existedConstructorInterceptPoint = false;
        if (constructorInterceptPoints != null && constructorInterceptPoints.length > 0) {
            existedConstructorInterceptPoint = true;
        }
        boolean existedMethodsInterceptPoints = false;
        if (instanceMethodsInterceptPoints != null && instanceMethodsInterceptPoints.length > 0) {
            existedMethodsInterceptPoints = true;
        }
        DelegateNamingResolver delegateNamingResolver = new DelegateNamingResolver(typeDescription.getTypeName(), this);

        /**
         * nothing need to be enhanced in class instance, maybe need enhance static methods.
         */
        if (!existedConstructorInterceptPoint && !existedMethodsInterceptPoints) {
            return newClassBuilder;
        }

        /**
         * Manipulate class source code.<br/>
         *
         * new class need:<br/>
         * 1.Add field, name {@link #CONTEXT_ATTR_NAME}.
         * 2.Add a field accessor for this field.
         *
         * And make sure the source codes manipulation only occurs once.
         *
         */
        if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
            if (!context.isObjectExtended()) {
                newClassBuilder = newClassBuilder.defineField(
                    CONTEXT_ATTR_NAME, Object.class, ACC_PRIVATE | ACC_VOLATILE)
                                                 .implement(EnhancedInstance.class)
                                                 .intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
                context.extendObjectCompleted();
            }
        }

        /**
         * 2. enhance constructors
         */
        if (existedConstructorInterceptPoint) {
            for (ConstructorInterceptPoint constructorInterceptPoint : constructorInterceptPoints) {
                if (isBootstrapInstrumentation()) {
                    newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
                                                     .intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
                                                                                                                 .to(BootstrapInstrumentBoost
                                                                                                                     .forInternalDelegateClass(constructorInterceptPoint
                                                                                                                         .getConstructorInterceptor()))));
                } else {
                    newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
                                                     .intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
                                                                                                                 .to(new ConstructorInter(constructorInterceptPoint
                                                                                                                     .getConstructorInterceptor(), classLoader), delegateNamingResolver.resolve(constructorInterceptPoint))));
                }
            }
        }

        /**
         * 3. enhance instance methods
         */
        if (existedMethodsInterceptPoints) {
            for (InstanceMethodsInterceptPoint instanceMethodsInterceptPoint : instanceMethodsInterceptPoints) {
                String interceptor = instanceMethodsInterceptPoint.getMethodsInterceptor();
                if (StringUtil.isEmpty(interceptor)) {
                    throw new EnhanceException("no InstanceMethodsAroundInterceptor define to enhance class " + enhanceOriginClassName);
                }
                ElementMatcher.Junction<MethodDescription> junction = not(isStatic()).and(instanceMethodsInterceptPoint.getMethodsMatcher());
                if (instanceMethodsInterceptPoint instanceof DeclaredInstanceMethodsInterceptPoint) {
                    junction = junction.and(ElementMatchers.<MethodDescription>isDeclaredBy(typeDescription));
                }
                if (instanceMethodsInterceptPoint.isOverrideArgs()) {
                    if (isBootstrapInstrumentation()) {
                        newClassBuilder = newClassBuilder.method(junction)
                                                         .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                    .withBinders(Morph.Binder.install(OverrideCallable.class))
                                                                                    .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
                    } else {
                        newClassBuilder = newClassBuilder.method(junction)
                                                         .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                    .withBinders(Morph.Binder.install(OverrideCallable.class))
                                                                                    .to(new InstMethodsInterWithOverrideArgs(interceptor, classLoader), delegateNamingResolver.resolve(instanceMethodsInterceptPoint)));
                    }
                } else {
                    if (isBootstrapInstrumentation()) {
                        newClassBuilder = newClassBuilder.method(junction)
                                                         .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                    .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
                    } else {
                        newClassBuilder = newClassBuilder.method(junction)
                                                         .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                    .to(new InstMethodsInter(interceptor, classLoader), delegateNamingResolver.resolve(instanceMethodsInterceptPoint)));
                    }
                }
            }
        }

        return newClassBuilder;
    }

    /**
     * Enhance a class to intercept class static methods.
     *
     * @param typeDescription target class description
     * @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
     * @return new byte-buddy's builder for further manipulation.
     */
    @Override
    protected DynamicType.Builder<?> enhanceClass(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
        ClassLoader classLoader) throws PluginException {
        StaticMethodsInterceptPoint[] staticMethodsInterceptPoints = getStaticMethodsInterceptPoints();
        String enhanceOriginClassName = typeDescription.getTypeName();
        if (staticMethodsInterceptPoints == null || staticMethodsInterceptPoints.length == 0) {
            return newClassBuilder;
        }
        DelegateNamingResolver delegateNamingResolver = new DelegateNamingResolver(typeDescription.getTypeName(), this);

        for (StaticMethodsInterceptPoint staticMethodsInterceptPoint : staticMethodsInterceptPoints) {
            String interceptor = staticMethodsInterceptPoint.getMethodsInterceptor();
            if (StringUtil.isEmpty(interceptor)) {
                throw new EnhanceException("no StaticMethodsAroundInterceptor define to enhance class " + enhanceOriginClassName);
            }

            if (staticMethodsInterceptPoint.isOverrideArgs()) {
                if (isBootstrapInstrumentation()) {
                    newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
                                                     .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                .withBinders(Morph.Binder.install(OverrideCallable.class))
                                                                                .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
                } else {
                    newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
                                                     .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                .withBinders(Morph.Binder.install(OverrideCallable.class))
                                                                                .to(new StaticMethodsInterWithOverrideArgs(interceptor), delegateNamingResolver.resolve(staticMethodsInterceptPoint)));
                }
            } else {
                if (isBootstrapInstrumentation()) {
                    newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
                                                     .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                .to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
                } else {
                    newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
                                                     .intercept(MethodDelegation.withDefaultConfiguration()
                                                                                .to(new StaticMethodsInter(interceptor), delegateNamingResolver.resolve(staticMethodsInterceptPoint)));
                }
            }

        }

        return newClassBuilder;
    }
}
public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefine {
  //三个拦截器负责生成traceId,spanId,SegementId,都是由ContextManager生成
    public static final String TRACE_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.TraceIDInterceptor";
    public static final String SEGMENT_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.SegmentIDInterceptor";
    public static final String SPAN_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.SpanIDInterceptor";

  @Override
    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
        return new StaticMethodsInterceptPoint[] {
            new StaticMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(ENHANCE_TRACE_ID_METHOD);
                }

                @Override
                public String getMethodsInterceptor() {
                    return TRACE_ID_INTERCEPT_CLASS;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },
            new StaticMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(ENHANCE_SEGMENT_ID_METHOD);
                }

                @Override
                public String getMethodsInterceptor() {
                    return SEGMENT_ID_INTERCEPT_CLASS;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },
            new StaticMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(ENHANCE_SPAN_ID_METHOD);
                }

                @Override
                public String getMethodsInterceptor() {
                    return SPAN_ID_INTERCEPT_CLASS;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },

            new StaticMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(ENHANCE_GET_CORRELATION_METHOD);
                }

                @Override
                public String getMethodsInterceptor() {
                    return INTERCEPT_GET_CORRELATION_CLASS;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },
            new StaticMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named(ENHANCE_PUT_CORRELATION_METHOD);
                }

                @Override
                public String getMethodsInterceptor() {
                    return INTERCEPT_PUT_CORRELATION_CLASS;
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }
}
public class ContextManager implements BootService {
  private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
  private static AbstractTracerContext get() {
        return CONTEXT.get();
    }

    /**
     * @return the first global trace id when tracing. Otherwise, "N/A".
     */
    public static String getGlobalTraceId() {
        AbstractTracerContext context = CONTEXT.get();
        return Objects.nonNull(context) ? context.getReadablePrimaryTraceId() : EMPTY_TRACE_CONTEXT_ID;
    }

    /**
     * @return the current segment id when tracing. Otherwise, "N/A".
     */
    public static String getSegmentId() {
        AbstractTracerContext context = CONTEXT.get();
        return Objects.nonNull(context) ? context.getSegmentId() : EMPTY_TRACE_CONTEXT_ID;
    }

    /**
     * @return the current span id when tracing. Otherwise, the value is -1.
     */
    public static int getSpanId() {
        AbstractTracerContext context = CONTEXT.get();
        return Objects.nonNull(context) ? context.getSpanId() : -1;
    }
}

插件定义

/**
 * 插件的定义,继承xxxPluginDefine,通常命名为xxxInstrumentation
 */
public class DruidDataSourceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
    private static final String ENHANCE_CLASS = "com.alibaba.druid.pool.DruidDataSource";
    private static final String ENHANCE_METHOD = "getConnection";
    private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.druid.v1.PoolingGetConnectInterceptor";

    /**
     * 在哪个类进行字节码增强
     * */
    @Override
    protected ClassMatch enhanceClass() {
        return byName(ENHANCE_CLASS);
    }
    /**
     * 进行构造方法的拦截
     * */
    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    /**
     * 进行实例方法的拦截
     * */
    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
                new InstanceMethodsInterceptPoint() {
                    //对getConnection无参方法记性增强
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        return named(ENHANCE_METHOD).and(takesNoArguments());
                    }
                    //增强逻辑在哪个具体的插件类中执行
                    @Override
                    public String getMethodsInterceptor() {
                        return INTERCEPTOR_CLASS;
                    }

                    @Override
                    public boolean isOverrideArgs() {
                        return false;
                    }
                },
                new InstanceMethodsInterceptPoint() {
                    //对getConnection有参方法记性增强
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        return named(ENHANCE_METHOD).and(takesArguments(String.class, String.class));
                    }
                    //增强逻辑在哪个具体的插件类中执行
                    @Override
                    public String getMethodsInterceptor() {
                        return INTERCEPTOR_CLASS;
                    }
                    //在增强时是否要对原方法的入参进行改变
                    @Override
                    public boolean isOverrideArgs() {
                        return false;
                    }
                }
        };
    }

    @Override
    public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
        return new StaticMethodsInterceptPoint[0];
    }

}

Skywalking链路追踪traceId

SkyWalking 中使用 DistributedTraceId 类来抽象 Trace ID,其中封装了一个 ID 类型的字段。DistributedTraceId 有两个实现类

  • NewDistirbutedTraceId 负责生成新 Trace ID,请求刚刚进入系统时,会创建 NewDistirbutedTraceId 对象,其构造方法内部会调用 GlobalIdGenerator.generate() 方法生成 ID 对象。

  • PropagatedTraceId 负责处理 Trace 传播过程中的 TraceId。PropagatedTraceId 的构造方法接收一个 String 类型参数(也就是在跨进程传播时序列化后的 Trace ID),解析之后得到 ID 对象。

  • 在 SkyWalking 中,TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span。在微服务架构中,一个请求基本都会涉及跨进程(以及跨线程)的操作,例如, RPC 调用、通过 MQ 异步执行、HTTP 请求远端资源等,处理一个请求就需要涉及到多个服务的多个线程。TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace。

  • TraceId的生成:ContextManager的createEntrySpan方法,调用getOrCreate方法,再调用ContextManagerExtendService的createTraceContext方法,里面创建了TracingContext(operationName, spanLimitWatcher);里面包含了traceId,SegmentId等
    Skwalking的TraceId的生成是通过GlobalIdGenerator的generate()方法来生成的,
    第一部分:具体是应用程序实例 ID
    第二部分:线程 ID
    第三部分:时间戳*10000+当前线程中的 seq,seq的值介于 0(包含)和 9999(包含)之间
    三部分通过.来分隔开

public class ContextManager implements BootService {
    public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
        AbstractSpan span;
        AbstractTracerContext context;
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        if (carrier != null && carrier.isValid()) {
            SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
            samplingService.forceSampled();
            context = getOrCreate(operationName, true);
            span = context.createEntrySpan(operationName);
            context.extract(carrier);
        } else {
            context = getOrCreate(operationName, false);
            span = context.createEntrySpan(operationName);
        }
        return span;
    }

    private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
        AbstractTracerContext context = CONTEXT.get();
        if (context == null) {
            if (StringUtil.isEmpty(operationName)) {
                if (LOGGER.isDebugEnable()) {
                    LOGGER.debug("No operation name, ignore this trace.");
                }
                context = new IgnoredTracerContext();
            } else {
                if (EXTEND_SERVICE == null) {
                    EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
                }
                context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);

            }
            CONTEXT.set(context);
        }
        return context;
    }

    public static AbstractSpan createLocalSpan(String operationName) {
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        return context.createLocalSpan(operationName);
    }

    public static AbstractSpan createExitSpan(String operationName, ContextCarrier carrier, String remotePeer) {
        if (carrier == null) {
            throw new IllegalArgumentException("ContextCarrier can't be null.");
        }
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        AbstractSpan span = context.createExitSpan(operationName, remotePeer);
        context.inject(carrier);
        return span;
    }

    public static AbstractSpan createExitSpan(String operationName, String remotePeer) {
        operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
        AbstractTracerContext context = getOrCreate(operationName, false);
        return context.createExitSpan(operationName, remotePeer);
    }
}
public class TracingContext implements AbstractTracerContext {

  TracingContext(String firstOPName, SpanLimitWatcher spanLimitWatcher) {
        this.segment = new TraceSegment();
        this.spanIdGenerator = 0;
        isRunningInAsyncMode = false;
        createTime = System.currentTimeMillis();
        running = true;

        // profiling status
        if (PROFILE_TASK_EXECUTION_SERVICE == null) {
            PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
        }
        this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(
            this, segment.getTraceSegmentId(), firstOPName);

        this.correlationContext = new CorrelationContext();
        this.extensionContext = new ExtensionContext();
        this.spanLimitWatcher = spanLimitWatcher;
    }

    @Override
    public AbstractSpan createEntrySpan(final String operationName) {
        if (isLimitMechanismWorking()) {
            NoopSpan span = new NoopSpan();
            return push(span);
        }
        AbstractSpan entrySpan;
        TracingContext owner = this;
        final AbstractSpan parentSpan = peek();
        final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
        if (parentSpan != null && parentSpan.isEntry()) {
            /*
             * Only add the profiling recheck on creating entry span,
             * as the operation name could be overrided.
             */
            profilingRecheck(parentSpan, operationName);
            parentSpan.setOperationName(operationName);
            entrySpan = parentSpan;
            return entrySpan.start();
        } else {
            entrySpan = new EntrySpan(
                spanIdGenerator++, parentSpanId,
                operationName, owner
            );
            entrySpan.start();
            return push(entrySpan);
        }
    }
}
@DefaultImplementor
public class ContextManagerExtendService implements BootService, GRPCChannelListener {
    public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
        AbstractTracerContext context;
        /*
         * Don't trace anything if the backend is not available.
         */
        if (!Config.Agent.KEEP_TRACING && GRPCChannelStatus.DISCONNECT.equals(status)) {
            return new IgnoredTracerContext();
        }

        int suffixIdx = operationName.lastIndexOf(".");
        if (suffixIdx > -1 && ignoreSuffixSet.contains(operationName.substring(suffixIdx))) {
            context = new IgnoredTracerContext();
        } else {
            SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
            if (forceSampling || samplingService.trySampling(operationName)) {
                context = new TracingContext(operationName, spanLimitWatcher);
            } else {
                context = new IgnoredTracerContext();
            }
        }

        return context;
    }
}
//生成id的类
public final class GlobalIdGenerator {
    public static String generate() {
      return StringUtil.join(
          '.',
          PROCESS_ID,
          String.valueOf(Thread.currentThread().getId()),
          String.valueOf(THREAD_ID_SEQUENCE.get().nextSeq())
      );
    }
}
public class NewDistributedTraceId extends DistributedTraceId {
    public NewDistributedTraceId() {
        super(GlobalIdGenerator.generate());
    }
}
public class TraceSegment {
    public TraceSegment() {
        this.traceSegmentId = GlobalIdGenerator.generate();
        this.spans = new LinkedList<>();
        this.relatedGlobalTraceId = new NewDistributedTraceId();
        this.createTime = System.currentTimeMillis();
    }
}

SkyWalking的agent注册了这些Service

org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMMetricsSender
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.context.status.StatusCheckService
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient
org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator

TraceSegmentServiceClient

@Override
public void prepare() {
    //在GRPCChannelManager中注册statusChanged方法
    ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}

@Override
public void boot() {
    lastLogTime = System.currentTimeMillis();
    segmentUplinkedCounter = 0;
    segmentAbandonedCounter = 0;
    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
    carrier.consume(this, 1);
}

@Override
public void onComplete() {
    //在ListenerManager中注册 afterFinished(TraceSegment traceSegment);
    TracingContext.ListenerManager.add(this);
}

@Override
public void statusChanged(GRPCChannelStatus status) {
    if (CONNECTED.equals(status)) {
        Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
        serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);
    }
    this.status = status;
}

@Override
public void afterFinished(TraceSegment traceSegment) {
    if (traceSegment.isIgnore()) {
        return;
    }
    if (!carrier.produce(traceSegment)) {
        if (LOGGER.isDebugEnable()) {
            LOGGER.debug("One trace segment has been abandoned, cause by buffer is full.");
        }
    }
}

ContextManager的bootService都是空实现

SamplingService

@Override
public void prepare() {}

  private void resetSamplingFactor() {
      samplingFactorHolder = new AtomicInteger(0);
  }


@Override
public void boot() {
  
    service = Executors.newSingleThreadScheduledExecutor(
            new DefaultNamedThreadFactory("SamplingService"));
    //监听配置更新        
    samplingRateWatcher = new SamplingRateWatcher("agent.sample_n_per_3_secs", this);
    ServiceManager.INSTANCE.findService(ConfigurationDiscoveryService.class)
                            .registerAgentConfigChangeWatcher(samplingRateWatcher);
    //启动定时任务,每隔3秒调用resetSamplingFactor();
    handleSamplingRateChanged();
}

public void handleSamplingRateChanged() {
    if (samplingRateWatcher.getSamplingRate() > 0) {
        if (!on) {
            on = true;
            this.resetSamplingFactor();
            scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(
                this::resetSamplingFactor, t -> LOGGER.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS);
            LOGGER.debug(
                "Agent sampling mechanism started. Sample {} traces in 3 seconds.",
                samplingRateWatcher.getSamplingRate()
            );
        }
    } else {
        if (on) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
            on = false;
        }
    }
}

@Override
public void onComplete() {}

GRPCChannelManager

@Override
public void prepare() {

}
//创建单线程定时线程池链接oap server
@Override
public void boot() {
    if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
        LOGGER.error("Collector server addresses are not set.");
        LOGGER.error("Agent will not uplink any data.");
        return;
    }
    grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
    connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
        new DefaultNamedThreadFactory("GRPCChannelManager")
    ).scheduleAtFixedRate(
        new RunnableWithExceptionProtection(
            this,
            t -> LOGGER.error("unexpected exception.", t)
        ), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
    );
}

@Override
public void onComplete() {}

@Override
public void run() {
    LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
    //IS_RESOLVE_DNS_PERIODICALLY 如果为 True,Skywalking 代理将定期解析 DNS 以更新接收者服务地址。
    if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
        grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
                .filter(StringUtil::isNotBlank)
                .map(eachBackendService -> eachBackendService.split(":"))
                .filter(domainPortPairs -> {
                    if (domainPortPairs.length < 2) {
                        LOGGER.debug("Service address [{}] format error. The expected format is IP:port", domainPortPairs[0]);
                        return false;
                    }
                    return true;
                })
                .flatMap(domainPortPairs -> {
                    try {
                        return Arrays.stream(InetAddress.getAllByName(domainPortPairs[0]))
                                .map(InetAddress::getHostAddress)
                                .map(ip -> String.format("%s:%s", ip, domainPortPairs[1]));
                    } catch (Throwable t) {
                        LOGGER.error(t, "Failed to resolve {} of backend service.", domainPortPairs[0]);
                    }
                    return Stream.empty();
                })
                .distinct()
                .collect(Collectors.toList());
    }

    if (reconnect) {
        if (grpcServers.size() > 0) {
            String server = "";
            try {
                int index = Math.abs(random.nextInt()) % grpcServers.size();
                if (index != selectedIdx) {
                    selectedIdx = index;

                    server = grpcServers.get(index);
                    String[] ipAndPort = server.split(":");

                    if (managedChannel != null) {
                        managedChannel.shutdownNow();
                    }

                    managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
                                                .addManagedChannelBuilder(new StandardChannelBuilder())
                                                .addManagedChannelBuilder(new TLSChannelBuilder())
                                                .addChannelDecorator(new AgentIDDecorator())
                                                .addChannelDecorator(new AuthenticationDecorator())
                                                .build();
                    reconnectCount = 0;
                    reconnect = false;
                    notify(GRPCChannelStatus.CONNECTED);
                } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
                    // Reconnect to the same server is automatically done by GRPC,
                    // therefore we are responsible to check the connectivity and
                    // set the state and notify listeners
                    reconnectCount = 0;
                    reconnect = false;
                    notify(GRPCChannelStatus.CONNECTED);
                }

                return;
            } catch (Throwable t) {
                LOGGER.error(t, "Create channel to {} fail.", server);
            }
        }

        LOGGER.debug(
            "Selected collector grpc service is not available. Wait {} seconds to retry",
            Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL
        );
    }
}
//通知所有监听器状态更新
private void notify(GRPCChannelStatus status) {
    for (GRPCChannelListener listener : listeners) {
        try {
            listener.statusChanged(status);
        } catch (Throwable t) {
            LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
        }
    }
}