Skywalking
Skywalking
Skywalking是专为微服务、云原生和基于容器的(Kubernetes)架构设计的分布式系统性能监控工具。
Skywalking关键特性
- 分布式跟踪:端到端分布式跟踪。服务拓扑分析、以服务为中心的可观察性和API仪表板。
- 针对您的技术栈的代理:Java、.Net Core、PHP、NodeJS、Golang、LUA、Rust、C++、客户端JavaScript和Python代理,持续积极开发和维护。
- 早期采用eBPF技术:Rover代理作为指标收集器和性能分析工具,采用eBPF技术来诊断CPU和网络性能问题。
- 扩展能力:从一个SkyWalking集群中可以收集和分析1000亿级别的遥测数据。
- 成熟的遥测生态系统支持:支持成熟的遥测生态系统的指标、跟踪和日志,例如Zipkin、OpenTelemetry、Prometheus、Zabbix、Fluentd等。
- 原生APM数据库:BanyanDB是一个2022年创建的可观察性数据库,旨在接收、分析和存储遥测/可观察性数据。
- 一致的指标聚合:SkyWalking本机的度量格式和广泛使用的度量格式(如OpenCensus、OTLP、Telegraf、Zabbix等)通过相同的脚本管道进行处理。
- 日志管理管道:通过脚本管道支持日志格式化、提取指标和各种采样策略,具有高性能。
- 报警和遥测管道:支持基于服务、基于部署和基于API的报警规则设置。支持将报警和所有遥测数据转发给第三方。
Skywalking搭建
- 安装Skywalking
mkdir -p /etc/sca/skywalking
cd /etc/sca/skywalking
cat > docker-compose.yaml <<-'EOF'
version: '3'
services:
mysql:
container_name: mysql
image: mysql:8
restart: always
environment:
- MYSQL_ROOT_PASSWORD=root
ports:
- "3306:3306"
nacos:
image: nacos/nacos-server:v2.2.1-slim
container_name: nacos
restart: always
environment:
- PREFER_HOST_MODE=hostname
- MODE=standalone
- NACOS_AUTH_IDENTITY_KEY=2222
- NACOS_AUTH_IDENTITY_VALUE=2xxx
- NACOS_AUTH_TOKEN=SecretKey012345678901234567890123456789012345678901234567890123456789
depends_on:
- mysql
ports:
- "7848:7848"
- "8848:8848"
- "9848:9848"
oap:
image: apache/skywalking-oap-server:9.4.0
container_name: oap
restart: always
ports:
- "11800:11800"
- "12800:12800"
oap-ui:
image: apache/skywalking-ui:9.4.0
container_name: oap-ui
restart: always
environment:
SW_OAP_ADDRESS: http://oap:12800
ports:
- "8080:8080"
depends_on:
- oap
EOF
docker-compose up -d
- 11800端口用于skywalking将应用的服务监控信息收集端口
- 12800端口用于skywalking对UI提供查询接口
- 8080端口是Skywalking WebUI接口提供可视化交互
- Agent下载地址:https://skywalking.apache.org/downloads/
- Java Agent说明文档:https://skywalking.apache.org/docs/skywalking-java/v8.15.0/readme/
- 安装Java Agent
- 下载后apache-skywalking-java-agent-8.15.0.tgz后解压缩到D:\java-agent
打开D:\java-agent\config\agent.config,将104行127.0.0.1:11800改成 宿主机IP:11800
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:192.168.31.230:11800}
java jar \
-javaagent:D:\java-agent\skywalking-agent.jar \
-Dskywalking.agent.service_name=c-service \
-Dskywalking.logging.file_name=c-service-api.log \
xxx.jar
idea的配置
通过WebUI获取监控数据
http://192.168.31.230:8080/
Springboot整合+日志
- pom.xml,注意版本号与Skywalking的一致
<!-- 该引用用于logback获取tranceId,也就是tid -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>9.2.0</version>
</dependency>
<!-- 该引用用于代码获取tranceId -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>9.2.0</version>
</dependency>
- logback配置logback.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod=" 5 seconds">
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
</layout>
</encoder>
</appender>
<appender name="grpc-log" class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.log.GRPCLogClientAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.mdc.TraceIdMDCPatternLogbackLayout">
<Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{tid}] [%thread] %-5level %logger{36} -%msg%n</Pattern>
</layout>
</encoder>
</appender>
<appender name="fileAppender" class="ch.qos.logback.core.FileAppender">
<file>/tmp/skywalking-logs/logback/e2e-service-provider.log</file>
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="org.apache.skywalking.apm.toolkit.log.logback.v1.x.TraceIdPatternLogbackLayout">
<Pattern>[%sw_ctx] [%level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger:%line - %msg%n</Pattern>
</layout>
</encoder>
</appender>
<!--需要将日志单独输出的包路径-->
<logger name="com.itlaoqi.aservice" additivity="false" level="INFO">
<appender-ref ref="stdout"/>
<appender-ref ref="grpc-log"/>
<appender-ref ref="fileAppender"/>
</logger>
<root level="INFO">
<appender-ref ref="grpc-log"/>
<appender-ref ref="stdout"/>
</root>
<logger name="fileLogger" level="INFO">
<appender-ref ref="fileAppender"/>
</logger>
</configuration>
- 配置agent配置文件,config/agent.config
agent.service_name=my-application
collector.backend_service=oap-server-host:11800
logging.level=DEBUG
plugin.logging=true
- 其他的跟上面差不多
- 应用里面使用@Sl4j打印日志即可
- 查看日志收集结果
Sky源码
- controller中traceId产生:SkyWalkingAgent的premain方法,访问每个插件的skywalking-plugin.def文件加载所有插件包括接口插件(org.apache.skywalking.apm.plugin.spring.mvc.v5.define.ControllerInstrumentation,指定拦截org.springframework.stereotype.Controller类,而它的父类AbstractControllerInstrumentation,已经拦截了RequestMapping,拦截类是RestMappingMethodInterceptor),同时调用installClassTransformer方法增强对应插件类名的类,里面调用了静态内部类Transformer的transform方法,调用所有插件的define方法,而所有插件继承了AbstractClassEnhancePluginDefine类,它的默认define方法调用子类ClassEnhancePluginDefine的enhanceClass和enhanceInstance分别增强了构造方法、实例方法和静态方法,都是调用子类插件的getConstructorsInterceptPoints(),getInstanceMethodsInterceptPoints(),getStaticMethodsInterceptPoints()3个方法返回拦截器的全类名实现调用拦截器接口实现类InstanceMethodsAroundInterceptor的beforeMethod目的,而traceId的产生由RestMappingMethodInterceptor的父类AbstractMethodInterceptor调用beforeMethod,然后里面再调用ContextManager的createEntrySpan方法创建Span,同时采集其他请求信息,服务间调用的时候通过HttpClientDoExecuteInterceptor的beforeMethod方法将trace信息添加到请求头
public class SkyWalkingAgent {
//主入口。使用byte-buddy transform 来增强所有在插件中定义的类。
public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
final PluginFinder pluginFinder;
try {
//加载配置
SnifferConfigInitializer.initializeCoreConfig(agentArgs);
} catch (Exception e) {
// try to resolve a new logger, and use the new logger to write the error log here
LogManager.getLogger(SkyWalkingAgent.class)
.error(e, "SkyWalking agent initialized failure. Shutting down.");
return;
} finally {
// refresh logger again after initialization finishes
LOGGER = LogManager.getLogger(SkyWalkingAgent.class);
}
if (!Config.Agent.ENABLE) {
LOGGER.warn("SkyWalking agent is disabled.");
return;
}
try {
//加载插件
pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
} catch (AgentPackageNotFoundException ape) {
LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
return;
} catch (Exception e) {
LOGGER.error(e, "SkyWalking agent initialized failure. Shutting down.");
return;
}
try {
//方法增强
installClassTransformer(instrumentation, pluginFinder);
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent installed class transformer failure.");
}
try {
//启动服务
ServiceManager.INSTANCE.boot();
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent boot failure.");
}
Runtime.getRuntime()
.addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
}
static void installClassTransformer(Instrumentation instrumentation, PluginFinder pluginFinder) throws Exception {
LOGGER.info("Skywalking agent begin to install transformer ...");
AgentBuilder agentBuilder = newAgentBuilder().ignore(
nameStartsWith("net.bytebuddy.")
.or(nameStartsWith("org.slf4j."))
.or(nameStartsWith("org.groovy."))
.or(nameContains("javassist"))
.or(nameContains(".asm."))
.or(nameContains(".reflectasm."))
.or(nameStartsWith("sun.reflect"))
.or(allSkyWalkingAgentExcludeToolkit())
.or(ElementMatchers.isSynthetic()));
JDK9ModuleExporter.EdgeClasses edgeClasses = new JDK9ModuleExporter.EdgeClasses();
try {
agentBuilder = BootstrapInstrumentBoost.inject(pluginFinder, instrumentation, agentBuilder, edgeClasses);
} catch (Exception e) {
throw new Exception("SkyWalking agent inject bootstrap instrumentation failure. Shutting down.", e);
}
try {
agentBuilder = JDK9ModuleExporter.openReadEdge(instrumentation, agentBuilder, edgeClasses);
} catch (Exception e) {
throw new Exception("SkyWalking agent open read edge in JDK 9+ failure. Shutting down.", e);
}
//核心。agentBuilder#transform,
agentBuilder.type(pluginFinder.buildMatch())
.transform(new Transformer(pluginFinder))
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(new RedefinitionListener())
.with(new Listener())
.installOn(instrumentation);
PluginFinder.pluginInitCompleted();
LOGGER.info("Skywalking agent transformer has installed.");
}
private static class Transformer implements AgentBuilder.Transformer {
private PluginFinder pluginFinder;
Transformer(PluginFinder pluginFinder) {
this.pluginFinder = pluginFinder;
}
@Override
//调用所有插件的define
public DynamicType.Builder<?> transform(final DynamicType.Builder<?> builder,
final TypeDescription typeDescription,
final ClassLoader classLoader,
final JavaModule javaModule,
final ProtectionDomain protectionDomain) {
LoadedLibraryCollector.registerURLClassLoader(classLoader);
List<AbstractClassEnhancePluginDefine> pluginDefines = pluginFinder.find(typeDescription);
if (pluginDefines.size() > 0) {
DynamicType.Builder<?> newBuilder = builder;
EnhanceContext context = new EnhanceContext();
for (AbstractClassEnhancePluginDefine define : pluginDefines) {
DynamicType.Builder<?> possibleNewBuilder = define.define(
typeDescription, newBuilder, classLoader, context);
if (possibleNewBuilder != null) {
newBuilder = possibleNewBuilder;
}
}
if (context.isEnhanced()) {
LOGGER.debug("Finish the prepare stage for {}.", typeDescription.getName());
}
return newBuilder;
}
LOGGER.debug("Matched class {}, but ignore by finding mechanism.", typeDescription.getTypeName());
return builder;
}
}
}
public abstract class AbstractClassEnhancePluginDefine {
/**
* Main entrance of enhancing the class.
*
* @param typeDescription target class description.
* @param builder byte-buddy's builder to manipulate target class's bytecode.
* @param classLoader load the given transformClass
* @return the new builder, or <code>null</code> if not be enhanced.
* @throws PluginException when set builder failure.
*/
public DynamicType.Builder<?> define(TypeDescription typeDescription, DynamicType.Builder<?> builder,
ClassLoader classLoader, EnhanceContext context) throws PluginException {
String interceptorDefineClassName = this.getClass().getName();
String transformClassName = typeDescription.getTypeName();
if (StringUtil.isEmpty(transformClassName)) {
LOGGER.warn("classname of being intercepted is not defined by {}.", interceptorDefineClassName);
return null;
}
LOGGER.debug("prepare to enhance class {} by {}.", transformClassName, interceptorDefineClassName);
WitnessFinder finder = WitnessFinder.INSTANCE;
/**
* find witness classes for enhance class
*/
String[] witnessClasses = witnessClasses();
if (witnessClasses != null) {
for (String witnessClass : witnessClasses) {
if (!finder.exist(witnessClass, classLoader)) {
LOGGER.warn("enhance class {} by plugin {} is not activated. Witness class {} does not exist.", transformClassName, interceptorDefineClassName, witnessClass);
return null;
}
}
}
List<WitnessMethod> witnessMethods = witnessMethods();
if (!CollectionUtil.isEmpty(witnessMethods)) {
for (WitnessMethod witnessMethod : witnessMethods) {
if (!finder.exist(witnessMethod, classLoader)) {
LOGGER.warn("enhance class {} by plugin {} is not activated. Witness method {} does not exist.", transformClassName, interceptorDefineClassName, witnessMethod);
return null;
}
}
}
/**
* find origin class source code for interceptor
*/
DynamicType.Builder<?> newClassBuilder = this.enhance(typeDescription, builder, classLoader, context);
context.initializationStageCompleted();
LOGGER.debug("enhance class {} by {} completely.", transformClassName, interceptorDefineClassName);
return newClassBuilder;
}
/**
* Begin to define how to enhance class. After invoke this method, only means definition is finished.
*
* @param typeDescription target class description
* @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
* @return new byte-buddy's builder for further manipulation.
*/
protected DynamicType.Builder<?> enhance(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
ClassLoader classLoader, EnhanceContext context) throws PluginException {
newClassBuilder = this.enhanceClass(typeDescription, newClassBuilder, classLoader);
newClassBuilder = this.enhanceInstance(typeDescription, newClassBuilder, classLoader, context);
return newClassBuilder;
}
/**
* Enhance a class to intercept constructors and class instance methods.
*
* @param typeDescription target class description
* @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
* @return new byte-buddy's builder for further manipulation.
*/
protected abstract DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,
DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,
EnhanceContext context) throws PluginException;
/**
* Enhance a class to intercept class static methods.
*
* @param typeDescription target class description
* @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
* @return new byte-buddy's builder for further manipulation.
*/
protected abstract DynamicType.Builder<?> enhanceClass(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
ClassLoader classLoader) throws PluginException;
}
public abstract class ClassEnhancePluginDefine extends AbstractClassEnhancePluginDefine {
private static final ILog LOGGER = LogManager.getLogger(ClassEnhancePluginDefine.class);
/**
* Enhance a class to intercept constructors and class instance methods.
*
* @param typeDescription target class description
* @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
* @return new byte-buddy's builder for further manipulation.
*/
@Override
protected DynamicType.Builder<?> enhanceInstance(TypeDescription typeDescription,
DynamicType.Builder<?> newClassBuilder, ClassLoader classLoader,
EnhanceContext context) throws PluginException {
ConstructorInterceptPoint[] constructorInterceptPoints = getConstructorsInterceptPoints();
InstanceMethodsInterceptPoint[] instanceMethodsInterceptPoints = getInstanceMethodsInterceptPoints();
String enhanceOriginClassName = typeDescription.getTypeName();
boolean existedConstructorInterceptPoint = false;
if (constructorInterceptPoints != null && constructorInterceptPoints.length > 0) {
existedConstructorInterceptPoint = true;
}
boolean existedMethodsInterceptPoints = false;
if (instanceMethodsInterceptPoints != null && instanceMethodsInterceptPoints.length > 0) {
existedMethodsInterceptPoints = true;
}
DelegateNamingResolver delegateNamingResolver = new DelegateNamingResolver(typeDescription.getTypeName(), this);
/**
* nothing need to be enhanced in class instance, maybe need enhance static methods.
*/
if (!existedConstructorInterceptPoint && !existedMethodsInterceptPoints) {
return newClassBuilder;
}
/**
* Manipulate class source code.<br/>
*
* new class need:<br/>
* 1.Add field, name {@link #CONTEXT_ATTR_NAME}.
* 2.Add a field accessor for this field.
*
* And make sure the source codes manipulation only occurs once.
*
*/
if (!typeDescription.isAssignableTo(EnhancedInstance.class)) {
if (!context.isObjectExtended()) {
newClassBuilder = newClassBuilder.defineField(
CONTEXT_ATTR_NAME, Object.class, ACC_PRIVATE | ACC_VOLATILE)
.implement(EnhancedInstance.class)
.intercept(FieldAccessor.ofField(CONTEXT_ATTR_NAME));
context.extendObjectCompleted();
}
}
/**
* 2. enhance constructors
*/
if (existedConstructorInterceptPoint) {
for (ConstructorInterceptPoint constructorInterceptPoint : constructorInterceptPoints) {
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
.intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
.to(BootstrapInstrumentBoost
.forInternalDelegateClass(constructorInterceptPoint
.getConstructorInterceptor()))));
} else {
newClassBuilder = newClassBuilder.constructor(constructorInterceptPoint.getConstructorMatcher())
.intercept(SuperMethodCall.INSTANCE.andThen(MethodDelegation.withDefaultConfiguration()
.to(new ConstructorInter(constructorInterceptPoint
.getConstructorInterceptor(), classLoader), delegateNamingResolver.resolve(constructorInterceptPoint))));
}
}
}
/**
* 3. enhance instance methods
*/
if (existedMethodsInterceptPoints) {
for (InstanceMethodsInterceptPoint instanceMethodsInterceptPoint : instanceMethodsInterceptPoints) {
String interceptor = instanceMethodsInterceptPoint.getMethodsInterceptor();
if (StringUtil.isEmpty(interceptor)) {
throw new EnhanceException("no InstanceMethodsAroundInterceptor define to enhance class " + enhanceOriginClassName);
}
ElementMatcher.Junction<MethodDescription> junction = not(isStatic()).and(instanceMethodsInterceptPoint.getMethodsMatcher());
if (instanceMethodsInterceptPoint instanceof DeclaredInstanceMethodsInterceptPoint) {
junction = junction.and(ElementMatchers.<MethodDescription>isDeclaredBy(typeDescription));
}
if (instanceMethodsInterceptPoint.isOverrideArgs()) {
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
} else {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(new InstMethodsInterWithOverrideArgs(interceptor, classLoader), delegateNamingResolver.resolve(instanceMethodsInterceptPoint)));
}
} else {
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
} else {
newClassBuilder = newClassBuilder.method(junction)
.intercept(MethodDelegation.withDefaultConfiguration()
.to(new InstMethodsInter(interceptor, classLoader), delegateNamingResolver.resolve(instanceMethodsInterceptPoint)));
}
}
}
}
return newClassBuilder;
}
/**
* Enhance a class to intercept class static methods.
*
* @param typeDescription target class description
* @param newClassBuilder byte-buddy's builder to manipulate class bytecode.
* @return new byte-buddy's builder for further manipulation.
*/
@Override
protected DynamicType.Builder<?> enhanceClass(TypeDescription typeDescription, DynamicType.Builder<?> newClassBuilder,
ClassLoader classLoader) throws PluginException {
StaticMethodsInterceptPoint[] staticMethodsInterceptPoints = getStaticMethodsInterceptPoints();
String enhanceOriginClassName = typeDescription.getTypeName();
if (staticMethodsInterceptPoints == null || staticMethodsInterceptPoints.length == 0) {
return newClassBuilder;
}
DelegateNamingResolver delegateNamingResolver = new DelegateNamingResolver(typeDescription.getTypeName(), this);
for (StaticMethodsInterceptPoint staticMethodsInterceptPoint : staticMethodsInterceptPoints) {
String interceptor = staticMethodsInterceptPoint.getMethodsInterceptor();
if (StringUtil.isEmpty(interceptor)) {
throw new EnhanceException("no StaticMethodsAroundInterceptor define to enhance class " + enhanceOriginClassName);
}
if (staticMethodsInterceptPoint.isOverrideArgs()) {
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
} else {
newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
.intercept(MethodDelegation.withDefaultConfiguration()
.withBinders(Morph.Binder.install(OverrideCallable.class))
.to(new StaticMethodsInterWithOverrideArgs(interceptor), delegateNamingResolver.resolve(staticMethodsInterceptPoint)));
}
} else {
if (isBootstrapInstrumentation()) {
newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
.intercept(MethodDelegation.withDefaultConfiguration()
.to(BootstrapInstrumentBoost.forInternalDelegateClass(interceptor)));
} else {
newClassBuilder = newClassBuilder.method(isStatic().and(staticMethodsInterceptPoint.getMethodsMatcher()))
.intercept(MethodDelegation.withDefaultConfiguration()
.to(new StaticMethodsInter(interceptor), delegateNamingResolver.resolve(staticMethodsInterceptPoint)));
}
}
}
return newClassBuilder;
}
}
public class TraceContextActivation extends ClassStaticMethodsEnhancePluginDefine {
//三个拦截器负责生成traceId,spanId,SegementId,都是由ContextManager生成
public static final String TRACE_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.TraceIDInterceptor";
public static final String SEGMENT_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.SegmentIDInterceptor";
public static final String SPAN_ID_INTERCEPT_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.SpanIDInterceptor";
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[] {
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_TRACE_ID_METHOD);
}
@Override
public String getMethodsInterceptor() {
return TRACE_ID_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_SEGMENT_ID_METHOD);
}
@Override
public String getMethodsInterceptor() {
return SEGMENT_ID_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_SPAN_ID_METHOD);
}
@Override
public String getMethodsInterceptor() {
return SPAN_ID_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_GET_CORRELATION_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_GET_CORRELATION_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_PUT_CORRELATION_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_PUT_CORRELATION_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
public class ContextManager implements BootService {
private static ThreadLocal<AbstractTracerContext> CONTEXT = new ThreadLocal<AbstractTracerContext>();
private static AbstractTracerContext get() {
return CONTEXT.get();
}
/**
* @return the first global trace id when tracing. Otherwise, "N/A".
*/
public static String getGlobalTraceId() {
AbstractTracerContext context = CONTEXT.get();
return Objects.nonNull(context) ? context.getReadablePrimaryTraceId() : EMPTY_TRACE_CONTEXT_ID;
}
/**
* @return the current segment id when tracing. Otherwise, "N/A".
*/
public static String getSegmentId() {
AbstractTracerContext context = CONTEXT.get();
return Objects.nonNull(context) ? context.getSegmentId() : EMPTY_TRACE_CONTEXT_ID;
}
/**
* @return the current span id when tracing. Otherwise, the value is -1.
*/
public static int getSpanId() {
AbstractTracerContext context = CONTEXT.get();
return Objects.nonNull(context) ? context.getSpanId() : -1;
}
}
插件定义
/**
* 插件的定义,继承xxxPluginDefine,通常命名为xxxInstrumentation
*/
public class DruidDataSourceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.druid.pool.DruidDataSource";
private static final String ENHANCE_METHOD = "getConnection";
private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.druid.v1.PoolingGetConnectInterceptor";
/**
* 在哪个类进行字节码增强
* */
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
/**
* 进行构造方法的拦截
* */
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
/**
* 进行实例方法的拦截
* */
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
//对getConnection无参方法记性增强
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesNoArguments());
}
//增强逻辑在哪个具体的插件类中执行
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
//对getConnection有参方法记性增强
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).and(takesArguments(String.class, String.class));
}
//增强逻辑在哪个具体的插件类中执行
@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
//在增强时是否要对原方法的入参进行改变
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
return new StaticMethodsInterceptPoint[0];
}
}
Skywalking链路追踪traceId
SkyWalking 中使用 DistributedTraceId 类来抽象 Trace ID,其中封装了一个 ID 类型的字段。DistributedTraceId 有两个实现类
NewDistirbutedTraceId 负责生成新 Trace ID,请求刚刚进入系统时,会创建 NewDistirbutedTraceId 对象,其构造方法内部会调用 GlobalIdGenerator.generate() 方法生成 ID 对象。
PropagatedTraceId 负责处理 Trace 传播过程中的 TraceId。PropagatedTraceId 的构造方法接收一个 String 类型参数(也就是在跨进程传播时序列化后的 Trace ID),解析之后得到 ID 对象。
在 SkyWalking 中,TraceSegment 是一个介于 Trace 与 Span 之间的概念,它是一条 Trace 的一段,可以包含多个 Span。在微服务架构中,一个请求基本都会涉及跨进程(以及跨线程)的操作,例如, RPC 调用、通过 MQ 异步执行、HTTP 请求远端资源等,处理一个请求就需要涉及到多个服务的多个线程。TraceSegment 记录了一个请求在一个线程中的执行流程(即 Trace 信息)。将该请求关联的 TraceSegment 串联起来,就能得到该请求对应的完整 Trace。
TraceId的生成:ContextManager的createEntrySpan方法,调用getOrCreate方法,再调用ContextManagerExtendService的createTraceContext方法,里面创建了TracingContext(operationName, spanLimitWatcher);里面包含了traceId,SegmentId等
Skwalking的TraceId的生成是通过GlobalIdGenerator的generate()方法来生成的,
第一部分:具体是应用程序实例 ID
第二部分:线程 ID
第三部分:时间戳*10000+当前线程中的 seq,seq的值介于 0(包含)和 9999(包含)之间
三部分通过.来分隔开
public class ContextManager implements BootService {
public static AbstractSpan createEntrySpan(String operationName, ContextCarrier carrier) {
AbstractSpan span;
AbstractTracerContext context;
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
if (carrier != null && carrier.isValid()) {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
samplingService.forceSampled();
context = getOrCreate(operationName, true);
span = context.createEntrySpan(operationName);
context.extract(carrier);
} else {
context = getOrCreate(operationName, false);
span = context.createEntrySpan(operationName);
}
return span;
}
private static AbstractTracerContext getOrCreate(String operationName, boolean forceSampling) {
AbstractTracerContext context = CONTEXT.get();
if (context == null) {
if (StringUtil.isEmpty(operationName)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("No operation name, ignore this trace.");
}
context = new IgnoredTracerContext();
} else {
if (EXTEND_SERVICE == null) {
EXTEND_SERVICE = ServiceManager.INSTANCE.findService(ContextManagerExtendService.class);
}
context = EXTEND_SERVICE.createTraceContext(operationName, forceSampling);
}
CONTEXT.set(context);
}
return context;
}
public static AbstractSpan createLocalSpan(String operationName) {
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
return context.createLocalSpan(operationName);
}
public static AbstractSpan createExitSpan(String operationName, ContextCarrier carrier, String remotePeer) {
if (carrier == null) {
throw new IllegalArgumentException("ContextCarrier can't be null.");
}
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
AbstractSpan span = context.createExitSpan(operationName, remotePeer);
context.inject(carrier);
return span;
}
public static AbstractSpan createExitSpan(String operationName, String remotePeer) {
operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD);
AbstractTracerContext context = getOrCreate(operationName, false);
return context.createExitSpan(operationName, remotePeer);
}
}
public class TracingContext implements AbstractTracerContext {
TracingContext(String firstOPName, SpanLimitWatcher spanLimitWatcher) {
this.segment = new TraceSegment();
this.spanIdGenerator = 0;
isRunningInAsyncMode = false;
createTime = System.currentTimeMillis();
running = true;
// profiling status
if (PROFILE_TASK_EXECUTION_SERVICE == null) {
PROFILE_TASK_EXECUTION_SERVICE = ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
}
this.profileStatus = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(
this, segment.getTraceSegmentId(), firstOPName);
this.correlationContext = new CorrelationContext();
this.extensionContext = new ExtensionContext();
this.spanLimitWatcher = spanLimitWatcher;
}
@Override
public AbstractSpan createEntrySpan(final String operationName) {
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
AbstractSpan entrySpan;
TracingContext owner = this;
final AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 : parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
/*
* Only add the profiling recheck on creating entry span,
* as the operation name could be overrided.
*/
profilingRecheck(parentSpan, operationName);
parentSpan.setOperationName(operationName);
entrySpan = parentSpan;
return entrySpan.start();
} else {
entrySpan = new EntrySpan(
spanIdGenerator++, parentSpanId,
operationName, owner
);
entrySpan.start();
return push(entrySpan);
}
}
}
@DefaultImplementor
public class ContextManagerExtendService implements BootService, GRPCChannelListener {
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
/*
* Don't trace anything if the backend is not available.
*/
if (!Config.Agent.KEEP_TRACING && GRPCChannelStatus.DISCONNECT.equals(status)) {
return new IgnoredTracerContext();
}
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && ignoreSuffixSet.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling(operationName)) {
context = new TracingContext(operationName, spanLimitWatcher);
} else {
context = new IgnoredTracerContext();
}
}
return context;
}
}
//生成id的类
public final class GlobalIdGenerator {
public static String generate() {
return StringUtil.join(
'.',
PROCESS_ID,
String.valueOf(Thread.currentThread().getId()),
String.valueOf(THREAD_ID_SEQUENCE.get().nextSeq())
);
}
}
public class NewDistributedTraceId extends DistributedTraceId {
public NewDistributedTraceId() {
super(GlobalIdGenerator.generate());
}
}
public class TraceSegment {
public TraceSegment() {
this.traceSegmentId = GlobalIdGenerator.generate();
this.spans = new LinkedList<>();
this.relatedGlobalTraceId = new NewDistributedTraceId();
this.createTime = System.currentTimeMillis();
}
}
SkyWalking的agent注册了这些Service
org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMMetricsSender
org.apache.skywalking.apm.agent.core.jvm.JVMService
org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileSnapshotSender
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.context.status.StatusCheckService
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient
org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator
TraceSegmentServiceClient
@Override
public void prepare() {
//在GRPCChannelManager中注册statusChanged方法
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
@Override
public void boot() {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}
@Override
public void onComplete() {
//在ListenerManager中注册 afterFinished(TraceSegment traceSegment);
TracingContext.ListenerManager.add(this);
}
@Override
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
serviceStub = TraceSegmentReportServiceGrpc.newStub(channel);
}
this.status = status;
}
@Override
public void afterFinished(TraceSegment traceSegment) {
if (traceSegment.isIgnore()) {
return;
}
if (!carrier.produce(traceSegment)) {
if (LOGGER.isDebugEnable()) {
LOGGER.debug("One trace segment has been abandoned, cause by buffer is full.");
}
}
}
ContextManager的bootService都是空实现
SamplingService
@Override
public void prepare() {}
private void resetSamplingFactor() {
samplingFactorHolder = new AtomicInteger(0);
}
@Override
public void boot() {
service = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("SamplingService"));
//监听配置更新
samplingRateWatcher = new SamplingRateWatcher("agent.sample_n_per_3_secs", this);
ServiceManager.INSTANCE.findService(ConfigurationDiscoveryService.class)
.registerAgentConfigChangeWatcher(samplingRateWatcher);
//启动定时任务,每隔3秒调用resetSamplingFactor();
handleSamplingRateChanged();
}
public void handleSamplingRateChanged() {
if (samplingRateWatcher.getSamplingRate() > 0) {
if (!on) {
on = true;
this.resetSamplingFactor();
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(
this::resetSamplingFactor, t -> LOGGER.error("unexpected exception.", t)), 0, 3, TimeUnit.SECONDS);
LOGGER.debug(
"Agent sampling mechanism started. Sample {} traces in 3 seconds.",
samplingRateWatcher.getSamplingRate()
);
}
} else {
if (on) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
on = false;
}
}
}
@Override
public void onComplete() {}
GRPCChannelManager
@Override
public void prepare() {
}
//创建单线程定时线程池链接oap server
@Override
public void boot() {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
LOGGER.error("Collector server addresses are not set.");
LOGGER.error("Agent will not uplink any data.");
return;
}
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
connectCheckFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("GRPCChannelManager")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS
);
}
@Override
public void onComplete() {}
@Override
public void run() {
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
//IS_RESOLVE_DNS_PERIODICALLY 如果为 True,Skywalking 代理将定期解析 DNS 以更新接收者服务地址。
if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
.filter(StringUtil::isNotBlank)
.map(eachBackendService -> eachBackendService.split(":"))
.filter(domainPortPairs -> {
if (domainPortPairs.length < 2) {
LOGGER.debug("Service address [{}] format error. The expected format is IP:port", domainPortPairs[0]);
return false;
}
return true;
})
.flatMap(domainPortPairs -> {
try {
return Arrays.stream(InetAddress.getAllByName(domainPortPairs[0]))
.map(InetAddress::getHostAddress)
.map(ip -> String.format("%s:%s", ip, domainPortPairs[1]));
} catch (Throwable t) {
LOGGER.error(t, "Failed to resolve {} of backend service.", domainPortPairs[0]);
}
return Stream.empty();
})
.distinct()
.collect(Collectors.toList());
}
if (reconnect) {
if (grpcServers.size() > 0) {
String server = "";
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
if (index != selectedIdx) {
selectedIdx = index;
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");
if (managedChannel != null) {
managedChannel.shutdownNow();
}
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
.addChannelDecorator(new AuthenticationDecorator())
.build();
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
} else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
// Reconnect to the same server is automatically done by GRPC,
// therefore we are responsible to check the connectivity and
// set the state and notify listeners
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
}
return;
} catch (Throwable t) {
LOGGER.error(t, "Create channel to {} fail.", server);
}
}
LOGGER.debug(
"Selected collector grpc service is not available. Wait {} seconds to retry",
Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL
);
}
}
//通知所有监听器状态更新
private void notify(GRPCChannelStatus status) {
for (GRPCChannelListener listener : listeners) {
try {
listener.statusChanged(status);
} catch (Throwable t) {
LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName());
}
}
}