syf 发布的文章

TL;DR:热更新问题

要增强的目标类ThreadPoolExecutor,在增强前(bytebuddy.installOn → instrument.addTransformer / redefine / retransform)已经被classloader装载到jvm里

导致instrument对已加载的类增强有些问题,这个问题也许可以通过深入redefine或retransform的机制解决,快速且稳定的解决方案为在插件装载完成前不加载ThreadPoolExecutor

0.前置

instrument中的几个基本逻辑:

ClassFileTransformer接口

ClassFile实际指的是java字节码,class文件格式,这个对象在内存中,和文件没有关系

Note the term class file is used as defined in section 3.1 of The Java™ Virtual Machine Specification, to mean a sequence of bytes in class file format, whether or not they reside in a file.

jdk8中接口只有一个transform方法,类在加载、redefined或retransformed时会被调用来增强(the transformer's transform method is invoked when classes are loaded, redefined, or retransformed.)

redefineClasses方法

java 5+,已经加载的类重新进行转换处理,即会触发重新加载类定义,需要注意的是,新加载的类不能修改旧有的类声明,譬如不能增加属性、不能修改方法声明

retransformClasses方法

java 6+,与如上类似,但不是重新进行转换处理,而是直接把处理结果(bytecode)直接给JVM

“Agents use these methods to retransform previously loaded classes without needing to access their class files.”

redefine和retransform的区别:https://stackoverflow.com/questions/19009583/difference-between-redefine-and-retransform-in-javaagent

1.原始问题

自定义executors插件未生效,调试发现onInstall时抛UnsupportedOperationException,且未打印出来,这里异常栈顶是sun.instrument.InstrumentationImpl#retransformClasses,由SkyWalking调用ByteBuddy时指定了AgentBuilder.RedefinitionStrategy.RETRANSFORMATION策略加载

image2022-7-6_15-8-1.png

对比相同使用bytebuddy的自定义agent生效:https://gitlab.sunyongfei.com/platform-basic/java-agents/tree/threadpool-qy/thread-agent

其中onInstall前的RedefinitionStrategy不同:

SkyWalking采用AgentBuilder.RedefinitionStrategy.RETRANSFORMATION,自定义agent采用AgentBuilder.RedefinitionStrategy.REDEFINITION

net.bytebuddy.agent.builder.AgentBuilder.RedefinitionStrategy:

/**
 * <p>
 * A redefinition strategy regulates how already loaded classes are modified by a built agent.
 * </p>
 * <p>
 * <b>Important</b>: Most JVMs do not support changes of a class's structure after a class was already
 * loaded. Therefore, it is typically required that this class file transformer was built while enabling
 * {@link AgentBuilder#disableClassFormatChanges()}.
 * </p>
 */
enum RedefinitionStrategy {
 
    /**
     * Disables redefinition such that already loaded classes are not affected by the agent.
     */
    DISABLED(false, false) {
        @Override
        public void apply(Instrumentation instrumentation,
                          AgentBuilder.Listener listener,
                          CircularityLock circularityLock,
                          PoolStrategy poolStrategy,
                          LocationStrategy locationStrategy,
                          DiscoveryStrategy discoveryStrategy,
                          BatchAllocator redefinitionBatchAllocator,
                          Listener redefinitionListener,
                          LambdaInstrumentationStrategy lambdaInstrumentationStrategy,
                          DescriptionStrategy descriptionStrategy,
                          FallbackStrategy fallbackStrategy,
                          RawMatcher matcher) {
            /* do nothing */
        }
 
        @Override
        protected void check(Instrumentation instrumentation) {
            throw new IllegalStateException("Cannot apply redefinition on disabled strategy");
        }
 
        @Override
        protected Collector make() {
            throw new IllegalStateException("A disabled redefinition strategy cannot create a collector");
        }
    },
 
    /**
     * <p>
     * Applies a <b>redefinition</b> to all classes that are already loaded and that would have been transformed if
     * the built agent was registered before they were loaded. The created {@link ClassFileTransformer} is <b>not</b>
     * registered for applying retransformations.
     * </p>
     * <p>
     * Using this strategy, a redefinition is applied as a single transformation request. This means that a single illegal
     * redefinition of a class causes the entire redefinition attempt to fail.
     * </p>
     * <p>
     * <b>Note</b>: When applying a redefinition, it is normally required to use a {@link TypeStrategy} that applies
     * a redefinition instead of rebasing classes such as {@link TypeStrategy.Default#REDEFINE}. Also, consider
     * the constrains given by this type strategy.
     * </p>
     */
    REDEFINITION(true, false) {
        @Override
        protected void check(Instrumentation instrumentation) {
            if (!instrumentation.isRedefineClassesSupported()) {
                throw new IllegalStateException("Cannot apply redefinition on " + instrumentation);
            }
        }
 
        @Override
        protected Collector make() {
            return new Collector.ForRedefinition();
        }
    },
 
    /**
     * <p>
     * Applies a <b>retransformation</b> to all classes that are already loaded and that would have been transformed if
     * the built agent was registered before they were loaded. The created {@link ClassFileTransformer} is registered
     * for applying retransformations.
     * </p>
     * <p>
     * Using this strategy, a retransformation is applied as a single transformation request. This means that a single illegal
     * retransformation of a class causes the entire retransformation attempt to fail.
     * </p>
     * <p>
     * <b>Note</b>: When applying a retransformation, it is normally required to use a {@link TypeStrategy} that applies
     * a redefinition instead of rebasing classes such as {@link TypeStrategy.Default#REDEFINE}. Also, consider
     * the constrains given by this type strategy.
     * </p>
     */
    RETRANSFORMATION(true, true) {
        @Override
        protected void check(Instrumentation instrumentation) {
            if (!DISPATCHER.isRetransformClassesSupported(instrumentation)) {
                throw new IllegalStateException("Cannot apply retransformation on " + instrumentation);
            }
        }
 
        @Override
        protected Collector make() {
            return new Collector.ForRetransformation();
        }
    };

RedefinitionStrategy的存在是因为“Most JVMs do not support changes of a class's structure after a class was already loaded. Therefore, it is typically required that this class file transformer was built while enabling disableClassFormatChanges().”

即大部分JVM不支持在类被装载后修改,需要指定对这些已经被装载的类如何Redefine策略

open-jdk8 HotSpot VM instrumentation的支持:支持Redefine,不支持Retransform

image2022-7-6_14-34-49.png

最上面抛出异常的图调用方法:this.retransformClasses为反射获取的sun.instrument.InstrumentationImpl#retransformClasses方法:

image2022-7-6_14-34-49.png

打断点查看native方法的支持情况,原来这里才会懒加载,即上面instrumentation对retransform支持到这里才是正确的:

image2022-7-6_15-25-37.png

image2022-7-6_15-24-15.png

支持retransform,也就是说执行retransformClasses0(mNativeAgent, classes);原生方法抛了上述异常,此时ThreadPoolExecutor类应该是被加载了的:

验证:自定义agent也改为和SkyWalking一致的RETRAINSFORMATION策略,期望如果也变得不生效,则说明增强时ThreadPoolExecutor类已被装载,且是策略选择问题

结果:自定义agent依旧生效

说明要么自定义agent没有装载ThreadPoolExecutor类,这个策略自然也就对ThreadPoolExecutor类无效;要么可能压根就不是这个问题

验证自定义agent在增强时是否加载了目标类:断点打在apply:4812, AgentBuilder$RedefinitionStrategy

stack:

apply:4812, AgentBuilder$RedefinitionStrategy (net.bytebuddy.agent.builder)
doInstall:9463, AgentBuilder$Default (net.bytebuddy.agent.builder)
installOn:9384, AgentBuilder$Default (net.bytebuddy.agent.builder)
instrumentation:58, ThreadPoolAgent (com.sunyongfei.platform.basic.agent.threadpool)
premain:38, ThreadPoolAgent (com.sunyongfei.platform.basic.agent.threadpool)
invoke0:-1, NativeMethodAccessorImpl (sun.reflect)
invoke:62, NativeMethodAccessorImpl (sun.reflect)
invoke:43, DelegatingMethodAccessorImpl (sun.reflect)
invoke:498, Method (java.lang.reflect)
loadClassAndStartAgent:386, InstrumentationImpl (sun.instrument)
loadClassAndCallPremain:401, InstrumentationImpl (sun.instrument)

发现SkyWalking已经加载了ThreadPoolExecutor,自定义agent没有加载

2.问题定位

通过在ThreadPoolExecutor构造器方法上打断点,定位到加载ThreadPoolExecutor在日志组件中,获取单例FIleWriter时会通过ThreadPoolExecutor创建异步线程

org.apache.skywalking.apm.agent.core.logging.core.FileWriter#FileWriter

private FileWriter() {
    logBuffer = new ArrayBlockingQueue(1024);
    final ArrayList<String> outputLogs = new ArrayList<String>(200);
    Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("LogFileWriter"))
             .scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
                 @Override
                 public void run() {
                     try {
                         logBuffer.drainTo(outputLogs);
                         for (String log : outputLogs) {
                             writeToFile(log + Constants.LINE_SEPARATOR);
                         }
                         try {
                             fileOutputStream.flush();
                         } catch (IOException e) {
                             e.printStackTrace();
                         }
                     } finally {
                         outputLogs.clear();
                     }
                 }
             }, new RunnableWithExceptionProtection.CallbackWhenException() {
                 @Override
                 public void handle(Throwable t) {
                 }
             }), 0, 1, TimeUnit.SECONDS);
}

3.修复

org.apache.skywalking.apm.agent.core.logging.core.WriterFactory,增加FILE_WRITTER_INIT_FLAG开关,在所有插件和bytebuddy installlOn执行结束前不允许初始化FileWriter,日志只能输出到STDOUT
executors-plugin插件本身在插桩时不能打印日志,否则会死循环递归调用interpretor逻辑,引发stackoverflow等问题

image2022-7-7_13-45-30.png

gRPC & netty

背景

  • 开发MQ测试环境多版本引流功能,复用RocketMQ的remoting模块netty通信模块,client以近原生的方式请求元信息服务控制消息引流
  • 开发故障检测过程中使用gRPC通信能力,java-gRPC底层默认由netty实现
  • SkyWalking多版本链路传递功能,学习SkyWalking通信部分设计

gRPC简介

一个高性能、开源通用RPC框架

亮点

CNCF孵化中

gRPC使用protobuf作为

从protobuf开始

protubuf是gRPC中的核心概念之一,gRPC使用protobuf同时作为接口定义语言(IDL)和底层消息交换的序列化结构(实际上也可以替换为json等),protobuf中即可以定义rpc(端点),也可以定义数据结构,也支持目录包引用、oneof、enum等特性,protobuf的定义和常用的编程语言数据结构还是有些区别的,例如没有继承关系(protobuf3不再支持extend,实际上可以通过oneof关键字曲线救国)

以下面故障检测服务的其中一个protobuf定义为例,import可以从其他包下引入定义,option则是代码生成的一些选项,service下定义了一个rpc:StreamChannel,入参和出参都是stream类型,表示是双向流;enum和message的数据结构中都由Field var=number的格式组成,其中number的值会在序列化时转成二进制,用来表示字段code。protobuf序列化时number在1~15时占用1个字节,16~2047会占用2个字节,所以最佳实践是把前15的序号保留给最常使用的字段。

syntax = "proto3";

import "command/server/register.proto";
import "command/client/thread_snapshot.proto";
import "command/client/profile.proto";
import "command/server/hot_thread.proto";

package com.enmonster.platform.hts.grpc;

option java_multiple_files = true;
option java_package = "com.enmonster.platform.hts.grpc";
option java_outer_classname = "CommandDispatcherRPC";

service CommandDispatcher {
  // 双向流,server和client可互相通讯
  rpc StreamChannel (stream StreamDataPackage) returns (stream StreamDataPackage) {}
}

// 保证向前兼容,添加命令请勿修改已有命令的field_value
enum Command {
  REGISTER = 0;  // client注册到server
  RECORD_HOT_THREAD = 1;  // 记录热点线程
  // 以上为server端支持的命令,以下为client端支持的命令
  THREAD_SNAPSHOT = 11;  // 采集线程快照
  PROFILE = 12;  // 开始async-profiler采样
}

// 双向流的逻辑数据包,response_body为空表示是请求,不为空表示是响应
message StreamDataPackage {
  string job_id = 1;  // 命令所属的job id
  Command command = 2;  // 命令
  string client_ip = 3;  // 客户端ip, aka node ip
  oneof request {  // 命令请求payload
    RegisterRequest register_request = 10;
    ThreadSnapshotRequest thread_snapshot_request = 11;
    HotThreadRequest hot_thread_request = 12;
    ProfileRequest profile_request = 13;
  }
  BaseResponse response = 20;  // 命令响应payload,不为null表示是命令结果,否则是命令请求
}

message BaseResponse {
  bool ok = 1;  // 是否成功
  string message = 2;  // 消息
  // 命令结果body
  oneof body {
    RegisterResponse register_response = 3;
    ThreadSnapshotResponse thread_snapshot_response = 4;
    HotThreadResponse hot_thread_response = 5;
    ProfileResponse profile_response = 6;
    ProfileResultResponse profile_result_response = 7;
  }
}

大部分字段类型都和常用编程语言类型相似,包括支持map<string, string>表示map、repeated表示数组等,同时也支持复用包里的数据结构,例如google.protobuf.Timestamp

编译protobuf

protobuf的优势在于其是跨平台、跨语言的DSL,比高级编程语言更抽象一级,所以写起来非常简洁(有点像写java接口哈)。这就意味着protobuf+grpc编译后的产物是高级编程语言(java、python、go、c/cpp……),有点像前端的.vue编译成css、js,不过感觉抽象级别更高一些。

这里以java编译为例,引入protobuf-maven-plugin插件执行compile就可以得到编译后的java文件(也可以直接用protoc二进制编译器,实际上这个maven插件也是去执行protoc编译的)

填充自己的业务逻辑吧

编译后的代码中可以看见对应的java类中已经有了完整的gRPC底层通讯逻辑,包括定义的RPC端点,几种同步异步的stub可以直接调用,各种数据结构的builder等,使用时可以非常方便地继承生成的ImplBase类,填充对应handler的逻辑。

可以把protobuf定义单独打到一个maven模块,server和client去引相同的依赖包,以保证版本的一致性和复用。因为是BI_DI类型的rpc,client的逻辑和server就很类似了,初始由client去主动连接server,我这里因为需要上报client的信息做了心跳保活,实际上因为gRPC基于HTTP/2的特性,如果没有显式地设置deadline/timeout,流式的rpc是可以一直传输的,而不用使用HTTP/1.X去轮询或者定时hold长链接。

@see HTC的client代码

SkyWalking是怎么玩的

SkyWalking作为tracing组件,每时每刻都会上报大量的数据到其后端server,除了对SW本身的处理能力、吞吐性有要求外,稳固可靠的传输层/RPC框架也很重要,SkyWalking使用的就是gRPC。社区在8.X后也有了走kafka消息队列的方案,但是至少就我们公司而言,gRPC在链路传输和rpc性能方面已经表现得相当稳固了。

TraceSegmentServiceClient.java

Tracing.proto

RemoteServiceHandler.java

TraceSegmentReportServiceHandler.java

我看的也比较浅,如果感兴趣可以去看SW官方的这篇博客,SkyWalking创始人吴晟写的,介绍了一些通讯、路由、整个系统架构层面的一些内容,比较干货。

有趣的是,RocketMQ在最新发布的5.0版本中也在原来的纯netty通信基础上,选用了gRPC作为默认通讯及rpc方案,由此可见gRPC的高性能和可靠性也越来越被开源社区认可。

站在巨人的肩膀上——netty

既然上面说到gRPC这么强大,除了protobuf作为二进制序列化框架、rpc DSL,以及HTTP/2的底层协议,其他都是自己实现的吗?从grpc-java的角度看,底层还少不了一位重量级角色netty,作为网络层框架。(当然这么说也不是很严谨,实际上除了序列化协议可以由protobuf替换为json等,底层网络传输层也可以由ok-http等替换,这和语言也有关系)

netty简介

netty是一个异步的、事件驱动的网络应用框架,netty的性能众所周知非常强大,资料也非常多,我并没有写过原生的netty,最近在写虚拟环境MQ相关内容时发现RocketMQ的通讯模块对netty封装的也非常好,所以拿过来看下内部是怎么实现的,以及分享下如何基于rocket-remoting模块去拓展MQ client的逻辑,得到和原生consumer<->broker一致的rpc能力。

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand

背景

对于一些底层业务系统,同步信息表填充基本信息的操作比较常见,过去的做法多为跨服务同步基础信息表到当前业务表并持久化,需要时join基础表填充字段。这里设计一个简单的CacheService,以内存缓存的方式实现以下功能:

  1. 从外部系统rpc调用刷新缓存并持久化到DB(全量刷新)
  2. 从MQ拿到增量变更数据,刷新缓存并更新DB(增量刷新)
  3. 应用重启从DB拉取全量基础信息维护到内存(初始化)
  4. 无法推MQ的外部系统支持定时任务统一同步(定时刷新)
  5. 需要填充基本字段时直接从内存中可取(读取)

接口

Cacheable,从外部系统接收的数据结构实现转换方法,处理成系统内需要的结构T:

/**
 * @author sunyongfei
 * @date 2022/3/28
 */
public interface Cacheable<T> {

    /**
     * 获取缓存key
     * @return
     */
    String getCacheKey();

    /**
     * 获取缓存值
     * @return
     */
    T getCacheValue();
}

CacheService,单个缓存维护逻辑,可能从不同的数据源同步,统一约束行为

/**
 * @author sunyongfei
 * @genertic T: destination target class
 * @date 2022/3/28
 */
public interface CacheService<T> {

    /**
     * @PostConstruct needed in actual impl bean,
     * if you want to add annotation in the interface, use abstract class instead.
     */
    void initCacheFromDB();

    /**
     * refresh cache
     */
    boolean refreshCache();

    /**
     * refresh single cache item
     */
    boolean refreshCache(T t);

    /**
     * get cache by key
     * @param cacheKey
     * @return
     */
    T getCacheItem(String cacheKey);
}

使用例:

SysInfoSimpleDTO implements XXX, Cacheable<SysInfoDetailDTO>: 接收外部系统的DTO

SysInfoServiceImpl implements XXX, CacheService: 自定义逻辑

SyncServiceImpl: 公共逻辑,如简单的定时任务同步

@Resource
private List<CacheService<?>> cacheServices;

@Scheduled(cron = "0 0/5 * * * ?")
public void refreshCache() {
    if (CollectionUtils.isNotEmpty(cacheServices)) {
        cacheServices.forEach(CacheService::refreshCache);
    }
}

其他逻辑略

Jorges Luis Borges, 博尔赫斯

I offer you lean streets, desperate sunsets, the moon of the jagged suburbs.
我给你贫穷的街道、绝望的日落、破败郊区的月亮。

I offer you the bitterness of a man who has looked long and long at the lonely moon.
我给你一个久久地望着孤月的人的悲哀。

I offer you my ancestors, my dead men, the ghosts that living men have honoured in marble: my father’s father killed in the frontier of Buenos Aires, two bullets through his lungs, bearded and dead, wrapped by his soldiers in the hide of a cow;
my mother’s grandfather -just twentyfour- heading a charge of three hundred men in Perú, now ghosts on vanished horses.
I offer you whatever insight my books may hold. whatever manliness or humour my life.
我给你我已死去的先辈,人们用大理石纪念他们的幽灵:在布宜偌斯艾利斯边境阵亡的我父亲的父亲,两颗子弹穿了他的胸膛。蓄着胡子的他死去了,士兵们用牛皮裹起他的尸体;我母亲的祖父——时年二十四岁——在秘鲁率领三百名士兵冲锋,如今都成了消失的马背上的幽灵。
我给你我写的书中所能包含的一切悟力、我生活中所能有的男子气概或幽默。

I offer you the loyalty of a man who has never been loyal.
我给你一个从未有过信仰人的忠诚。

I offer you that kernel of myself that I have saved somehow -the central heart that deals not in words, traffics not with dreams and is untouched by time, by joy, by adversities.
我给你我设法保全的我自己的核心——不营字造句,不和梦想交易,不被时间、欢乐和逆境触动的核心。

I offer you the memory of a yellow rose seen at sunset, years before you were born.
我给你,早在你出生前多年的一个傍晚看到的一朵黄玫瑰的记忆。

I offer you explanations of yourself, theories about yourself, authentic and surprising news of yourself.
我给你对自己的解释,关于你自己的理论,你自己的真实而惊人的消息。

I can give you my loneliness, my darkness, the hunger of my heart; I am trying to bribe you with uncertainty, with danger, with defeat.
我给你我的寂寞、我的黑暗、我心的饥渴;我试图用困惑、危险、失败来打动你。