Arthas 源码学习
一、从arthas-spring-boot-starter开始
官方提供的Arthas Spring Boot Starter,以maven依赖的方式启动了arthas,attach自身进程后常驻;
以starter包为切入点查看arthas是如何进行attach和执行命令等一系列操作的
1.1 starter的configuration拉起AttachArthasClassloader
starter中的ArthasConfiguration类,指定在配置开启时加载attach.ArthasAgent并加载其init方法
init方法内的主要逻辑为启动AttachArthasClassloader:
com.taobao.arthas.agent.attach.ArthasAgent#init
if (instrumentation == null) {
instrumentation = ByteBuddyAgent.install();
}
// 检查 arthasHome
if (arthasHome == null || arthasHome.trim().isEmpty()) {
// 解压出 arthasHome
URL coreJarUrl = this.getClass().getClassLoader().getResource("arthas-bin.zip");
if (coreJarUrl != null) {
File tempArthasDir = createTempDir();
ZipUtil.unpack(coreJarUrl.openStream(), tempArthasDir);
arthasHome = tempArthasDir.getAbsolutePath();
} else {
throw new IllegalArgumentException("can not getResources arthas-bin.zip from classloader: "
+ this.getClass().getClassLoader());
}
}
// find arthas-core.jar
File arthasCoreJarFile = new File(arthasHome, ARTHAS_CORE_JAR);
if (!arthasCoreJarFile.exists()) {
throw new IllegalStateException("can not find arthas-core.jar under arthasHome: " + arthasHome);
}
AttachArthasClassloader arthasClassLoader = new AttachArthasClassloader(
new URL[] { arthasCoreJarFile.toURI().toURL() });
/**
* <pre>
* ArthasBootstrap bootstrap = ArthasBootstrap.getInstance(inst);
* </pre>
*/
Class<?> bootstrapClass = arthasClassLoader.loadClass(ARTHAS_BOOTSTRAP);
Object bootstrap = bootstrapClass.getMethod(GET_INSTANCE, Instrumentation.class, Map.class).invoke(null,
instrumentation, configMap);
boolean isBind = (Boolean) bootstrapClass.getMethod(IS_BIND).invoke(bootstrap);
if (!isBind) {
String errorMsg = "Arthas server port binding failed! Please check $HOME/logs/arthas/arthas.log for more details.";
throw new RuntimeException(errorMsg);
}
解压arthas-bin.zip包后通过AttachArthasClassloader加载,可以看到对应属性中bootstrap类是com.taobao.arthas.core.server.ArthasBootstrap,方法是getInstance,跳转到core中的Bootstrap类:
对应getInstance方法调了构造方法ArthasBootstrap,到这里arthas-core模块才真正起起来:
其中最重要的流程是bind方法,启动了arthas server,关键逻辑在ShellServer的初始化:
1.2 ArthasBootstrap启动ShellServer
com.taobao.arthas.core.server.ArthasBootstrap#bind
ShellServerOptions options = new ShellServerOptions()
.setInstrumentation(instrumentation)
.setPid(PidUtils.currentLongPid())
.setWelcomeMessage(ArthasBanner.welcome());
if (configure.getSessionTimeout() != null) {
options.setSessionTimeout(configure.getSessionTimeout() * 1000);
}
this.httpSessionManager = new HttpSessionManager();
this.securityAuthenticator = new SecurityAuthenticatorImpl(configure.getUsername(), configure.getPassword());
shellServer = new ShellServerImpl(options);
List<String> disabledCommands = new ArrayList<String>();
if (configure.getDisabledCommands() != null) {
String[] strings = StringUtils.tokenizeToStringArray(configure.getDisabledCommands(), ",");
if (strings != null) {
disabledCommands.addAll(Arrays.asList(strings));
}
}
BuiltinCommandPack builtinCommands = new BuiltinCommandPack(disabledCommands);
List<CommandResolver> resolvers = new ArrayList<CommandResolver>();
resolvers.add(builtinCommands);
//worker group
workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("arthas-TermServer", true));
// TODO: discover user provided command resolver
if (configure.getTelnetPort() != null && configure.getTelnetPort() > 0) {
logger().info("try to bind telnet server, host: {}, port: {}.", configure.getIp(), configure.getTelnetPort());
shellServer.registerTermServer(new HttpTelnetTermServer(configure.getIp(), configure.getTelnetPort(),
options.getConnectionTimeout(), workerGroup, httpSessionManager));
} else {
logger().info("telnet port is {}, skip bind telnet server.", configure.getTelnetPort());
}
if (configure.getHttpPort() != null && configure.getHttpPort() > 0) {
logger().info("try to bind http server, host: {}, port: {}.", configure.getIp(), configure.getHttpPort());
shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
options.getConnectionTimeout(), workerGroup, httpSessionManager));
} else {
// listen local address in VM communication
if (configure.getTunnelServer() != null) {
shellServer.registerTermServer(new HttpTermServer(configure.getIp(), configure.getHttpPort(),
options.getConnectionTimeout(), workerGroup, httpSessionManager));
}
logger().info("http port is {}, skip bind http server.", configure.getHttpPort());
}
for (CommandResolver resolver : resolvers) {
shellServer.registerCommandResolver(resolver);
}
可以看到初始化好ShellServer实例后,通过shellServer.registerCommandResolver方法把BuiltinCommandPack内置命令加了进来,内置命令包括trace、watch等如下:
接着把ShellServer通过SessionManager绑定到HttpApiHandler,至此Arthas最关键的核心ShellServer就初始化完毕了(忽略了注册TermServer、初始化SpyAPI等逻辑)
二、如何处理命令请求?
有了HttpApiHandler,我们继续顺藤摸瓜,看看HttpApiHander是怎么接收http请求,再把命令dispatch到arthas-core去执行的
2.1 handle
看到handle方法的第一段逻辑,确认就是我们要找的http请求入口了,反序列化出来的ApiRequest属性和文档中http api参数一致:
com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#handle
HttpMethod method = request.method();
if (HttpMethod.POST.equals(method)) {
requestBody = getBody(request);
ApiRequest apiRequest = parseRequest(requestBody);
requestId = apiRequest.getRequestId();
result = processRequest(ctx, apiRequest);
} else {
result = createResponse(ApiState.REFUSED, "Unsupported http method: " + method.name());
}
可以看到processRequest方法就是我们要找的dispatch command逻辑了
2.2 dispatchRequest
processRequest一开始有一些session鉴权相关的逻辑,暂时先不关注,我们先关注命令是怎么dispatch到arthas-core的,继续追踪到dispatchRequest方法:
com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#dispatchRequest
private ApiResponse dispatchRequest(ApiAction action, ApiRequest apiRequest, Session session) throws ApiException {
switch (action) {
case EXEC:
return processExecRequest(apiRequest, session);
case ASYNC_EXEC:
return processAsyncExecRequest(apiRequest, session);
case INTERRUPT_JOB:
return processInterruptJob(apiRequest, session);
case PULL_RESULTS:
return processPullResultsRequest(apiRequest, session);
case SESSION_INFO:
return processSessionInfoRequest(apiRequest, session);
case JOIN_SESSION:
return processJoinSessionRequest(apiRequest, session);
case CLOSE_SESSION:
return processCloseSessionRequest(apiRequest, session);
case INIT_SESSION:
break;
}
return null;
}
只有一个switch,对应不同action执行的逻辑,先继续追踪action为EXEC到processExecRequest:
createJob → job.run()
com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#processExecRequest
Job foregroundJob = session.getForegroundJob();
if (foregroundJob != null) {
response.setState(ApiState.REFUSED)
.setMessage("Another job is running.");
logger.info("Another job is running, jobId: {}", foregroundJob.id());
return response;
}
packingResultDistributor = new PackingResultDistributorImpl(session);
//distribute result message both to origin session channel and request channel by CompositeResultDistributor
//ResultDistributor resultDistributor = new CompositeResultDistributorImpl(packingResultDistributor, session.getResultDistributor());
job = this.createJob(commandLine, session, packingResultDistributor);
session.setForegroundJob(job);
updateSessionInputStatus(session, InputStatus.ALLOW_INTERRUPT);
job.run();
createJob逻辑通过createProcess起了一个进程,并把该进程绑定到jobId上。
createProcess中主要是一些处理进程STDOUT链的逻辑,创建了个Process对象返回,以及把上层resultDistributor透传到最底层创建进程处
ResultDistributor接口定义了getResults()方法,返回List<ResultModel>为命令执行结果,ResultModel是所有命令执行结果的父类,包含类型和jobId两个基本属性;
不同的命令执行结果子类继承此父类拓展自己的数据结构,并通过不同command的CommandProcessImpl#appendResult方法写入
com.taobao.arthas.core.shell.system.impl.JobControllerImpl#createJob
@Override
public Job createJob(InternalCommandManager commandManager, List<CliToken> tokens, Session session, JobListener jobHandler, Term term, ResultDistributor resultDistributor) {
checkPermission(session, tokens.get(0));
int jobId = idGenerator.incrementAndGet();
StringBuilder line = new StringBuilder();
for (CliToken arg : tokens) {
line.append(arg.raw());
}
boolean runInBackground = runInBackground(tokens);
Process process = createProcess(session, tokens, commandManager, jobId, term, resultDistributor);
process.setJobId(jobId);
JobImpl job = new JobImpl(jobId, this, process, line.toString(), runInBackground, session, jobHandler);
jobs.put(jobId, job);
return job;
}
上述job.run()执行后任务会通过ArthasBootstrap里的executorService线程池执行,这个线程池核心线程大小为1
下一段是轮询job执行结果的逻辑,waitForJob中while(true)间隔100ms轮询执行结果,执行结束后返回上述resultDistributor的List<ResultModel>执行结果作为response返回
com.taobao.arthas.core.shell.term.impl.http.api.HttpApiHandler#processExecRequest
//packing results
body.put("jobId", job.id());
body.put("jobStatus", job.status());
body.put("timeExpired", timeExpired);
if (timeExpired) {
body.put("timeout", timeout);
}
body.put("results", packingResultDistributor.getResults());
response.setSessionId(session.getSessionId())
//.setConsumerId(consumerId)
.setBody(body);
return response;//packing results
body.put("jobId", job.id());
body.put("jobStatus", job.status());
body.put("timeExpired", timeExpired);
if (timeExpired) {
body.put("timeout", timeout);
}
body.put("results", packingResultDistributor.getResults());
response.setSessionId(session.getSessionId())
//.setConsumerId(consumerId)
.setBody(body);
return response;
三、arthas命令定义
3.1 以ThreadCommand为例
arthas的命令都继承AnnotatedCommand并重写process方法
ThreadCommand的process方法根据命令行参数不同执行方法也不同,以processAllThreads为例:
com.taobao.arthas.core.command.monitor200.ThreadCommand#processAllThreads
List<ThreadVO> threads = ThreadUtil.getThreads();
// 统计各种线程状态
Map<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
for (State s : State.values()) {
stateCountMap.put(s, 0);
}
for (ThreadVO thread : threads) {
State threadState = thread.getState();
Integer count = stateCountMap.get(threadState);
stateCountMap.put(threadState, count + 1);
}
boolean includeInternalThreads = true;
Collection<ThreadVO> resultThreads = new ArrayList<ThreadVO>();
if (!StringUtils.isEmpty(this.state)) {
this.state = this.state.toUpperCase();
if (states.contains(this.state)) {
includeInternalThreads = false;
for (ThreadVO thread : threads) {
if (thread.getState() != null && state.equals(thread.getState().name())) {
resultThreads.add(thread);
}
}
} else {
return ExitStatus.failure(1, "Illegal argument, state should be one of " + states);
}
} else {
resultThreads = threads;
}
//thread stats
ThreadSampler threadSampler = new ThreadSampler();
threadSampler.setIncludeInternalThreads(includeInternalThreads);
threadSampler.sample(resultThreads);
threadSampler.pause(sampleInterval);
List<ThreadVO> threadStats = threadSampler.sample(resultThreads);
process.appendResult(new ThreadModel(threadStats, stateCountMap, all));
return ExitStatus.success();List<ThreadVO> threads = ThreadUtil.getThreads();
// 统计各种线程状态
Map<State, Integer> stateCountMap = new LinkedHashMap<State, Integer>();
for (State s : State.values()) {
stateCountMap.put(s, 0);
}
for (ThreadVO thread : threads) {
State threadState = thread.getState();
Integer count = stateCountMap.get(threadState);
stateCountMap.put(threadState, count + 1);
}
boolean includeInternalThreads = true;
Collection<ThreadVO> resultThreads = new ArrayList<ThreadVO>();
if (!StringUtils.isEmpty(this.state)) {
this.state = this.state.toUpperCase();
if (states.contains(this.state)) {
includeInternalThreads = false;
for (ThreadVO thread : threads) {
if (thread.getState() != null && state.equals(thread.getState().name())) {
resultThreads.add(thread);
}
}
} else {
return ExitStatus.failure(1, "Illegal argument, state should be one of " + states);
}
} else {
resultThreads = threads;
}
//thread stats
ThreadSampler threadSampler = new ThreadSampler();
threadSampler.setIncludeInternalThreads(includeInternalThreads);
threadSampler.sample(resultThreads);
threadSampler.pause(sampleInterval);
List<ThreadVO> threadStats = threadSampler.sample(resultThreads);
process.appendResult(new ThreadModel(threadStats, stateCountMap, all));
return ExitStatus.success();
可以看到线程CPU时间通过ThreadSampler采样得出:
3.2 以Profiler为例
profiler命令也是继承自AnnotatedCommand实现的process方法调用execute执行async-profiler采样
arthas通过实现自定义的AsyncProfilerMXBean接口,把async-profiler相关的二进制.so格式lib打包成MBean,来对async-profiler进项相关操作:
这个wrapper基本没什么逻辑,以包装native方法去调.so为主,主要逻辑还是在Command类中
一下就看懂了 博主真是太厉害了!
一下就看懂了 lvlv真是太厉害了!
自己开发javaagent attach的过程中发现,arthas的classloader隔离做的非常好,如果不做处理开发个agent直接attach到目标jvm上去,会发现agent的log打在目标jvm上,如果不做隔离,agent引入的库(log4j、HttpClient、gRPC、GSON等)会有与目标jvm已经加载的库产生冲突的可能,非常不可控,这块需要研究下arthas是怎么做的
参考:
https://tech.meituan.com/2019/10/10/jvm-cpu-profiler.html
https://github.com/alibaba/jvm-sandbox