一、从arthas-spring-boot-starter开始

官方提供的Arthas Spring Boot Starter,以maven依赖的方式启动了arthas,attach自身进程后常驻;

以starter包为切入点查看arthas是如何进行attach和执行命令等一系列操作的

1.1 starter的configuration拉起AttachArthasClassloader

starter中的ArthasConfiguration类,指定在配置开启时加载attach.ArthasAgent并加载其init方法

init方法内的主要逻辑为启动AttachArthasClassloader:

com.taobao.arthas.agent.attach.ArthasAgent#init

if (instrumentation == null) {
    instrumentation = ByteBuddyAgent.install();
}
 
// 检查 arthasHome
if (arthasHome == null || arthasHome.trim().isEmpty()) {
    // 解压出 arthasHome
    URL coreJarUrl = this.getClass().getClassLoader().getResource("arthas-bin.zip");
    if (coreJarUrl != null) {
        File tempArthasDir = createTempDir();
        ZipUtil.unpack(coreJarUrl.openStream(), tempArthasDir);
        arthasHome = tempArthasDir.getAbsolutePath();
    } else {
        throw new IllegalArgumentException("can not getResources arthas-bin.zip from classloader: "
                + this.getClass().getClassLoader());
    }
}
 
// find arthas-core.jar
File arthasCoreJarFile = new File(arthasHome, ARTHAS_CORE_JAR);
if (!arthasCoreJarFile.exists()) {
    throw new IllegalStateException("can not find arthas-core.jar under arthasHome: " + arthasHome);
}
AttachArthasClassloader arthasClassLoader = new AttachArthasClassloader(
        new URL[] { arthasCoreJarFile.toURI().toURL() });
 
/**
 * <pre>
 * ArthasBootstrap bootstrap = ArthasBootstrap.getInstance(inst);
 * </pre>
 */
Class<?> bootstrapClass = arthasClassLoader.loadClass(ARTHAS_BOOTSTRAP);
Object bootstrap = bootstrapClass.getMethod(GET_INSTANCE, Instrumentation.class, Map.class).invoke(null,
        instrumentation, configMap);
boolean isBind = (Boolean) bootstrapClass.getMethod(IS_BIND).invoke(bootstrap);
if (!isBind) {
    String errorMsg = "Arthas server port binding failed! Please check $HOME/logs/arthas/arthas.log for more details.";
    throw new RuntimeException(errorMsg);
}

解压arthas-bin.zip包后通过AttachArthasClassloader加载,可以看到对应属性中bootstrap类是com.taobao.arthas.core.server.ArthasBootstrap,方法是getInstance,跳转到core中的Bootstrap类:

对应getInstance方法调了构造方法ArthasBootstrap,到这里arthas-core模块才真正起起来:

https://github.com/alibaba/arthas/blob/arthas-all-3.5.5/core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java#L350

其中最重要的流程是bind方法,启动了arthas server,关键逻辑在ShellServer的初始化:

1.2 ArthasBootstrap启动ShellServer

com.taobao.arthas.core.server.ArthasBootstrap#bind

ShellServerOptions options = new ShellServerOptions()
                .setInstrumentation(instrumentation)
                .setPid(PidUtils.currentLongPid())
                .setWelcomeMessage(ArthasBanner.welcome());
if (configure.getSessionTimeout() != null) {
    options.setSessionTimeout(configure.getSessionTimeout() * 1000);
}
 
this.httpSessionManager = new HttpSessionManager();
this.securityAuthenticator = new SecurityAuthenticatorImpl(configure.getUsername(), configure.getPassword());
 
shellServer = new ShellServerImpl(options);
 
List<String> disabledCommands = new ArrayList<String>();
if (configure.getDisabledCommands() != null) {
    String[] strings = StringUtils.tokenizeToStringArray(configure.getDisabledCommands(), ",");
    if (strings != null) {
        disabledCommands.addAll(Arrays.asList(strings));
    }
}
BuiltinCommandPack builtinCommands = new BuiltinCommandPack(disabledCommands);
List<CommandResolver> resolvers = new ArrayList<CommandResolver>();
resolvers.add(builtinCommands);
 
//worker group
workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("arthas-TermServer", true));
 
// TODO: discover user provided command resolver
if (configure.getTelnetPort() != null && configure.getTelnetPort() > 0) {
    logger().info("try to bind telnet server, host: {}, port: {}.", configure.getIp(), configure.getTelnetPort());
    shellServer.registerTermServer(new HttpTelnetTermServer(configure.getIp(), configure.getTelnetPort(),
            options.getConnectionTimeout(), workerGroup, httpSessionManager));
} else {
    logger().info("telnet port is {}, skip bind telnet server.", configure.getTelnetPort());
}
if (configure.getHttpPort() != null && configure.getHttpPort() > 0) {
    logger().info("try to bind http server, host: {}, port: {}.", configure.getIp(), configure.getHttpPort());
    shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
            options.getConnectionTimeout(), workerGroup, httpSessionManager));
} else {
    // listen local address in VM communication
    if (configure.getTunnelServer() != null) {
        shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
                options.getConnectionTimeout(), workerGroup, httpSessionManager));
    }
    logger().info("http port is {}, skip bind http server.", configure.getHttpPort());
}
 
for (CommandResolver resolver : resolvers) {
    shellServer.registerCommandResolver(resolver);
}

可以看到初始化好ShellServer实例后,通过shellServer.registerCommandResolver方法把BuiltinCommandPack内置命令加了进来,内置命令包括trace、watch等如下:

https://github.com/alibaba/arthas/blob/arthas-all-3.5.5/core/src/main/java/com/taobao/arthas/core/command/BuiltinCommandPack.java#L73

接着把ShellServer通过SessionManager绑定到HttpApiHandler,至此Arthas最关键的核心ShellServer就初始化完毕了(忽略了注册TermServer、初始化SpyAPI等逻辑)

二、如何处理命令请求?

有了HttpApiHandler,我们继续顺藤摸瓜,看看HttpApiHander是怎么接收http请求,再把命令dispatch到arthas-core去执行的

2.1 handle

看到handle方法的第一段逻辑,确认就是我们要找的http请求入口了,反序列化出来的ApiRequest属性和文档中http api参数一致:

com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#handle

HttpMethod method = request.method();
if (HttpMethod.POST.equals(method)) {
    requestBody = getBody(request);
    ApiRequest apiRequest = parseRequest(requestBody);
    requestId = apiRequest.getRequestId();
    result = processRequest(ctx, apiRequest);
} else {
    result = createResponse(ApiState.REFUSED, "Unsupported http method: " + method.name());
}

可以看到processRequest方法就是我们要找的dispatch command逻辑了

2.2 dispatchRequest

processRequest一开始有一些session鉴权相关的逻辑,暂时先不关注,我们先关注命令是怎么dispatch到arthas-core的,继续追踪到dispatchRequest方法:

com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#dispatchRequest

private ApiResponse dispatchRequest(ApiAction action, ApiRequest apiRequest, Session session) throws ApiException {
    switch (action) {
        case EXEC:
            return processExecRequest(apiRequest, session);
        case ASYNC_EXEC:
            return processAsyncExecRequest(apiRequest, session);
        case INTERRUPT_JOB:
            return processInterruptJob(apiRequest, session);
        case PULL_RESULTS:
            return processPullResultsRequest(apiRequest, session);
        case SESSION_INFO:
            return processSessionInfoRequest(apiRequest, session);
        case JOIN_SESSION:
            return processJoinSessionRequest(apiRequest, session);
        case CLOSE_SESSION:
            return processCloseSessionRequest(apiRequest, session);
        case INIT_SESSION:
            break;
    }
    return null;
}

只有一个switch,对应不同action执行的逻辑,先继续追踪action为EXEC到processExecRequest:

createJob → job.run()

com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#processExecRequest

Job foregroundJob = session.getForegroundJob();
if (foregroundJob != null) {
    response.setState(ApiState.REFUSED)
            .setMessage("Another job is running.");
    logger.info("Another job is running, jobId: {}", foregroundJob.id());
    return response;
}
 
packingResultDistributor = new PackingResultDistributorImpl(session);
//distribute result message both to origin session channel and request channel by CompositeResultDistributor
//ResultDistributor resultDistributor = new CompositeResultDistributorImpl(packingResultDistributor, session.getResultDistributor());
job = this.createJob(commandLine, session, packingResultDistributor);
session.setForegroundJob(job);
updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
 
job.run();

createJob逻辑通过createProcess起了一个进程,并把该进程绑定到jobId上。

createProcess中主要是一些处理进程STDOUT链的逻辑,创建了个Process对象返回,以及把上层resultDistributor透传到最底层创建进程处

ResultDistributor接口定义了getResults()方法,返回List<ResultModel>为命令执行结果,ResultModel是所有命令执行结果的父类,包含类型和jobId两个基本属性;

不同的命令执行结果子类继承此父类拓展自己的数据结构,并通过不同command的CommandProcessImpl#appendResult方法写入

com.taobao.arthas.core.shell.system.impl.JobControllerImpl#createJob

@Override
public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) {
    checkPermission(session, tokens.get(0));
    int jobId = idGenerator.incrementAndGet();
    StringBuilder line = new StringBuilder();
    for (CliToken arg : tokens) {
        line.append(arg.raw());
    }
    boolean runInBackground = runInBackground(tokens);
    Process process = createProcess(session, tokens, commandManager, jobId, term, resultDistributor);
    process.setJobId(jobId);
    JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, session, jobHandler);
    jobs.put(jobId, job);
    return job;
}

上述job.run()执行后任务会通过ArthasBootstrap里的executorService线程池执行,这个线程池核心线程大小为1

下一段是轮询job执行结果的逻辑,waitForJob中while(true)间隔100ms轮询执行结果,执行结束后返回上述resultDistributor的List<ResultModel>执行结果作为response返回

com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#processExecRequest

//packing results
body.put("jobId", job.id());
body.put("jobStatus", job.status());
body.put("timeExpired", timeExpired);
if (timeExpired) {
    body.put("timeout", timeout);
}
body.put("results", packingResultDistributor.getResults());
 
response.setSessionId(session.getSessionId())
        //.setConsumerId(consumerId)
        .setBody(body);
return response;//packing results
body.put("jobId", job.id());
body.put("jobStatus", job.status());
body.put("timeExpired", timeExpired);
if (timeExpired) {
    body.put("timeout", timeout);
}
body.put("results", packingResultDistributor.getResults());
 
response.setSessionId(session.getSessionId())
        //.setConsumerId(consumerId)
        .setBody(body);
return response;

三、arthas命令定义

3.1 以ThreadCommand为例

https://github.com/alibaba/arthas/blob/arthas-all-3.5.5/core/src/main/java/com/taobao/arthas/core/command/monitor200/ThreadCommand.java

arthas的命令都继承AnnotatedCommand并重写process方法

ThreadCommand的process方法根据命令行参数不同执行方法也不同,以processAllThreads为例:

com.taobao.arthas.core.command.monitor200.ThreadCommand#processAllThreads

List<ThreadVO> threads = ThreadUtil.getThreads();
 
// 统计各种线程状态
Map<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
for (State s : State.values()) {
    stateCountMap.put(s, 0);
}
 
for (ThreadVO thread : threads) {
    State threadState = thread.getState();
    Integer count = stateCountMap.get(threadState);
    stateCountMap.put(threadState, count + 1);
}
 
boolean includeInternalThreads = true;
Collection<ThreadVO> resultThreads = new ArrayList<ThreadVO>();
if (!StringUtils.isEmpty(this.state)) {
    this.state = this.state.toUpperCase();
    if (states.contains(this.state)) {
        includeInternalThreads = false;
        for (ThreadVO thread : threads) {
            if (thread.getState() != null && state.equals(thread.getState().name())) {
                resultThreads.add(thread);
            }
        }
    } else {
        return ExitStatus.failure(1, "Illegal argument, state should be one of " + states);
    }
} else {
    resultThreads = threads;
}
 
//thread stats
ThreadSampler threadSampler = new ThreadSampler();
threadSampler.setIncludeInternalThreads(includeInternalThreads);
threadSampler.sample(resultThreads);
threadSampler.pause(sampleInterval);
List<ThreadVO> threadStats = threadSampler.sample(resultThreads);
 
process.appendResult(new ThreadModel(threadStats, stateCountMap, all));
return ExitStatus.success();List<ThreadVO> threads = ThreadUtil.getThreads();
 
// 统计各种线程状态
Map<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
for (State s : State.values()) {
    stateCountMap.put(s, 0);
}
 
for (ThreadVO thread : threads) {
    State threadState = thread.getState();
    Integer count = stateCountMap.get(threadState);
    stateCountMap.put(threadState, count + 1);
}
 
boolean includeInternalThreads = true;
Collection<ThreadVO> resultThreads = new ArrayList<ThreadVO>();
if (!StringUtils.isEmpty(this.state)) {
    this.state = this.state.toUpperCase();
    if (states.contains(this.state)) {
        includeInternalThreads = false;
        for (ThreadVO thread : threads) {
            if (thread.getState() != null && state.equals(thread.getState().name())) {
                resultThreads.add(thread);
            }
        }
    } else {
        return ExitStatus.failure(1, "Illegal argument, state should be one of " + states);
    }
} else {
    resultThreads = threads;
}
 
//thread stats
ThreadSampler threadSampler = new ThreadSampler();
threadSampler.setIncludeInternalThreads(includeInternalThreads);
threadSampler.sample(resultThreads);
threadSampler.pause(sampleInterval);
List<ThreadVO> threadStats = threadSampler.sample(resultThreads);
 
process.appendResult(new ThreadModel(threadStats, stateCountMap, all));
return ExitStatus.success();

可以看到线程CPU时间通过ThreadSampler采样得出:

https://github.com/alibaba/arthas/blob/arthas-all-3.5.5/core/src/main/java/com/taobao/arthas/core/command/monitor200/ThreadSampler.java

3.2 以Profiler为例

profiler命令也是继承自AnnotatedCommand实现的process方法调用execute执行async-profiler采样

arthas通过实现自定义的AsyncProfilerMXBean接口,把async-profiler相关的二进制.so格式lib打包成MBean,来对async-profiler进项相关操作:

https://github.com/alibaba/arthas/blob/arthas-all-3.5.5/core/src/main/java/one/profiler/AsyncProfiler.java

这个wrapper基本没什么逻辑,以包装native方法去调.so为主,主要逻辑还是在Command类中

标签: java, arthas, jvm

已有 3 条评论

  1. lvlv lvlv

    一下就看懂了 博主真是太厉害了!

    1. syf syf

      一下就看懂了 lvlv真是太厉害了!

  2. syf syf

    自己开发javaagent attach的过程中发现,arthas的classloader隔离做的非常好,如果不做处理开发个agent直接attach到目标jvm上去,会发现agent的log打在目标jvm上,如果不做隔离,agent引入的库(log4j、HttpClient、gRPC、GSON等)会有与目标jvm已经加载的库产生冲突的可能,非常不可控,这块需要研究下arthas是怎么做的

    参考:

    https://tech.meituan.com/2019/10/10/jvm-cpu-profiler.html
    https://github.com/alibaba/jvm-sandbox

添加新评论