这是我的个人学习笔记,发出来给需要的人看到,欢迎大家一起讨论。 后面会逐渐将笔记中的文章搬过来。
henrychenv / my-notes Goto Github PK
View Code? Open in Web Editor NEW个人学习笔记
个人学习笔记
这部分本来想放在第一篇的,不过这只是我的个人笔记,而且重点在源码分析上,我相信看到我这篇的肯定也看了很多其他的资料了,这篇的顺序不会对源码分析的内容理解造成影响。
Java语言的卖点之一就是多线程编程,多线程编程免不了要面对并发问题,对于临界资源的访问就是其中之一,synchronized就是用于串行访问临界资源的,线程会synchronized一个对象,这个对象就是访问临界资源的锁,这个对象可以是Java中的任何一个对象。除了保证串行访问临界资源,synchronized配上Object的wait、notify和notifyAll操作还可以控制线程间的协作,功能很强大。
为什么要加上HotSpot?因为我只看了HotSpot,不确定其他虚拟机是否是一样的实现。
对象头是对象的头部,存储了MarkWord和类型指针。在32位系统中,MarkWord是4个字节,是一段相对自由的空间,不同的情况下可以存放不同的内容,比如可以用于存放hashCode以及和GC相关的信息,在偏向锁、轻量级锁以及重量级锁的实现中也会用到。
这张图片展示了MarkWord作为synchronized三种锁时的状态:
在对象刚分配时,如果对象类型是可偏向的,会变成左上角的匿名偏向状态,否则就会变成右上角的无锁状态。
当偏向某个线程时,就会变成左下角的偏向某个线程的状态,就算解锁了,也一直保持这个状态,所以从MarkWord是无法看出偏向锁是否已经解锁了的。如果发生了重偏向,会再次变为匿名偏向状态,可以重新偏向其他线程。
在左下角的偏向状态下,另一个线程synchronized了这个对象,
如果当前锁已经释放了,就会发生偏向锁撤销,变为无锁状态,然后再变成轻量级锁状态,在改变MarkWord中锁状态位的同时,MarkWord也会指向轻量级锁的LockRecord,无锁状态的MarkWord会存储到LockRecord的displaced_header中。
如果当前锁还在使用,当前锁就会升级为轻量级锁,这个会将偏向锁以及重入的偏向锁都该为轻量级锁,并将无锁的MarkWord存到最早的一个LockRecord的displaced_header中,其他LockRecord的displaced_header设置为空。这样当最早的一个LockRecord释放后,就可以用这个displaced_header还原MarkWord到无锁状态了。
这个时候的撤销和升级为轻量级锁的操作如果是针对当前线程的操作,不需要等到安全点,如果需要跨线程操作,就必须要等到安全点,相对低效。这也是常常被人提的偏向锁的最大缺点。
如果轻量级锁状态下存在竞争,那么会膨胀为重量级锁,这时MarkWord会变为重量级锁状态,每个对象会对应一个OjbectMonitor对象,无锁的MarkWord会存放到ObjectMonitor的header中。ObjectMonitor这个对象有很重的操作逻辑,wait、notify、notifyAll等操作都由它完成。
为什么偏向锁在变为轻量级锁或者重量级锁前需要偏向锁撤销呢?因为升级后hashCode和垃圾回收信息依然不能丢,这个结构只有无锁的MarkWord才有,所以需要先变为无锁的MarkWord,再将无锁的MarkWord放到轻量级锁或重量级锁中。基于相同的原因,当计算hashCode时,如果是偏向锁状态,也会发生偏向锁撤销。
LockRecord都和每次加锁请求对应,不仅是轻量级锁,偏向锁和重量级锁的每次加锁请求也会获取一个LockRecord,释放锁时会释放LockRecord。
处置之外,LockRecord和轻量级锁的重入以及无锁MarkWord的存储有关。对应重入的情况,只有最早的LockRecord的displaced_header中是无锁的MarkWord,其他的都是NULL,这样当碰到一个非NULL的displaced_header时,就可以知道,释放了这个,线程就完全退出这个锁了。
相较于类似于加锁请求的LockRecord而言,ObjectMonitor的功能就非常强大了。这个是重量级锁的重要数据结构,除了有加锁、解锁和重入功能,还有wait,notify,notifyAll的功能。
偏向锁适用于同一个线程反复进入的情况;
轻量级锁适用于多个线程交替进入但不会同时进入的情况;
重量级锁适用于多个线程同时进入的情况,当然了,只有一个线程能实际进入,其他的都得等待。
偏向锁可以通过 -XX:+UseBiasedLocking
开启, -XX:+UseBiasedLocking
关闭。就算开启了,也并不是在jvm启动后立即变为偏向状态的,而是有一定的延迟,因为jvm认为刚启动时竞争较大,所以会延迟启动偏向锁,延迟时间可以通过 -XX:BiasedLockingStartupDelay
控制。
详细的请看源码分析,这里只是提一下要点。
每次获取锁都需要从栈帧中获取
如果偏向锁线程ID为0,将MarkWord中的线程ID置为当前线程ID
如果线程ID是当前ID就允许重入
只会释放LockRecord,不会将MarkWord中的线程ID置为0。
所以才会有下面的锁的撤销,批量重偏向和批量撤销
如果便行锁被使用,将升级为轻量级锁,数据结构转换和轻量级锁获取锁一样。
否则MarkWord置为无锁状态
让没在使用的epoch过期,可以被重偏向
将klass中的偏向锁标志位置为不可偏向,每次获取锁时会检查klass是否可以偏向
进入撤销流程,可能会批量重偏向/批量撤销,单个撤销。
如果撤销/重偏向后,
• 是epoch过期或者是无锁状态,加轻量级锁,逻辑和轻量级锁获取锁一样。
• 是轻量级锁状态,走轻量级锁加锁流程
• 否则膨胀为重量级锁,走重量级锁加锁流程
每次获取锁都需要从栈帧中获取
会将无锁的MarkWord设置到LockRecord的displaced_header中,然后让对象的MarkWord指向LockRecord
重入的轻量级锁的LockRecord中displaced_header为NULL,也不会让对象的MarkWord指向这个LockRecord。
和第一次进入时不同
释放时,如果LockRecord中的displaced_header为NULL,说明是锁重入的释放,只释放LockRecord。
如果LockRecord的displaced_header不为NULL,将LockRecord中无锁的MarkWord恢复到对象的MarkWord中,然后释放LockRecord,表示轻量级锁完全释放
释放时,如果发现已经膨胀为重量级锁,需要先获取ObjectMonitor,再走重量级锁释放流程。
判断线程是否持有轻量级锁
LockRecord是在线程栈中的,所以直接判断轻量级锁的LockRecord是否在栈空间内
这个三个锁都一样,每次获取锁都需要从栈帧中获取
如果OjbectMonitor中的owner为NULL,将ObjectMonitor中的线程ID置为自己
如果ObjectMonitor中的owner为自己或者是自己的轻量级锁,视为重入。
如果owner为当前线程的轻量级锁,owner设置为当前线程,重入次数设置为1。
如果owner为当前线程,ObectMonitor重入次数+1
获取锁失败会根据策略进入EntryList或者cxq等待,然后挂起,等待被唤醒。
在进入队列前,会尝试多次获取锁。
如果当前重入次数不为0,重入次数-1
如果重入次数为0,将owner置为NULL,但是不会将对象的MarkWord恢复为无锁状态。
扫描所有ObjectMonitor,如果当前重量级锁没被使用,会将对象的MarkWord恢复为Object中无锁的MarkWord,然后释放ObjectMonitor
栈帧的连续空间上
堆中,先在gFreeList中,然后每个线程从中获取一部分,放到线程私有空间的omFreeList中
每次线程获取时,会从omFreeList中获取,获取成功会放到omUsedList中,获取失败会从gFreeList填充,然后再次获取,如果gFreeList也没有,会直接new一个ObjectMonitor返回
ObjectMonitor从omUsedList中移到omFreeList中
openjdk-wiki-Synchronization
CSDN-深入分析wait/notify为什么要在同步块内
CSDN-Synchronized关键字底层原理
Gighub-死磕Synchronized底层实现
CSDN-极速体验编译openjdk8(docker环境)
豆瓣读书-第232页 7.2.1 Interpreter模块
openjdk源码-jdk8u/hotspot
知乎-HotSpot VM重量级锁降级机制的实现原理
要求编译的JDK为11+(我使用的是Java11),但编译后的javaagent可用于8+的应用程序
./gradlew assemble
javaagent路径: javaagent/build/libs/opentelemetry-javaagent-.jar
opentelemetry-java-instrumentation中的trace实现分两种:
这里以spring-webmvc为例分别讲下怎样实现一个第三方库的instrumentation。
首先,在instrumentation下以包为名创建一个目录,名称为第三方库名,并带上此Instrumentation支持的三方库最低版本。
然后建三个目录:
instrumentation/spring/spring-webmvc-3.1 // 3.1表示支持的版本必须>=3.1
├── javaagent
├── library
└── wildfly-testing
然后将项目加到顶层目录的settings.gradle.kts中,srpingmvc的为:
include(":instrumentation:spring:spring-webmvc-3.1:javaagent")
include(":instrumentation:spring:spring-webmvc-3.1:library")
include(":instrumentation:spring:spring-webmvc-3.1:wildfly-testing")
opentelemetry-java-instrumentation对一个类库的实现instrumentation有自己的抽象,library和javaagent都遵循这套抽象。
library是通过类库已有拓展点,使用opentelemetry-java的功能进行拓展,可集成到javaagent或者springboot中。
javaagent方式在此之上还有一层抽象,而且javaagent中直接反射更方便些,有时候并不完全遵循这套抽象,但两者有很多相通的部分,熟悉library的方式有利于理解javaagent的方式。
所以下面主要参考springmvc的library实现看下这套规范,然后再拓展到javaagent的方式。
创建build.gradle.kts,并添加编译插件
plugins {
id("otel.library-instrumentation")
}
otel.library-instrumentation插件会引入很多默认配置、编译工具等。
springmvc的为
plugins {
id("otel.library-instrumentation")
}
dependencies {
compileOnly("org.springframework:spring-webmvc:3.1.0.RELEASE")
compileOnly("javax.servlet:javax.servlet-api:3.1.0")
}
Instrumentation对命名是有规定的,需要包含两个类:
springmvc的为SpringWebMvcTracing.java和SpringWebMvcTracingBuilder.java
对于Tracing类,必须包含个部分
比如springmvc的:
/** Entrypoint for tracing Spring Web MVC apps. */
public final class SpringWebMvcTracing {
/** Returns a new {@link SpringWebMvcTracing} configured with the given {@link OpenTelemetry}. */
public static SpringWebMvcTracing create(OpenTelemetry openTelemetry) {
return builder(openTelemetry).build();
}
/**
* Returns a new {@link SpringWebMvcTracingBuilder} configured with the given {@link
* OpenTelemetry}.
*/
public static SpringWebMvcTracingBuilder builder(OpenTelemetry openTelemetry) {
return new SpringWebMvcTracingBuilder(openTelemetry);
}
private final Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter;
SpringWebMvcTracing(Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter) {
this.instrumenter = instrumenter;
}
/** Returns a new {@link Filter} that generates telemetry for received HTTP requests. */
public Filter newServletFilter() {
return new WebMvcTracingFilter(instrumenter);
}
}
对于TracingBuilder,必须包含2个部分:
比如springmvc的:
/** A builder of {@link SpringWebMvcTracing}. */
public final class SpringWebMvcTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-webmvc-3.1";
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<HttpServletRequest, HttpServletResponse>>
additionalExtractors = new ArrayList<>();
private CapturedHttpHeaders capturedHttpHeaders = CapturedHttpHeaders.server(Config.get());
SpringWebMvcTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
/**
* Adds an additional {@link AttributesExtractor} to invoke to set attributes to instrumented
* items.
*/
public SpringWebMvcTracingBuilder addAttributesExtractor(
AttributesExtractor<HttpServletRequest, HttpServletResponse> attributesExtractor) {
additionalExtractors.add(attributesExtractor);
return this;
}
/**
* Configure the instrumentation to capture chosen HTTP request and response headers as span
* attributes.
*
* @param capturedHttpHeaders An instance of {@link CapturedHttpHeaders} containing the configured
* HTTP request and response names.
*/
public SpringWebMvcTracingBuilder captureHttpHeaders(CapturedHttpHeaders capturedHttpHeaders) {
this.capturedHttpHeaders = capturedHttpHeaders;
return this;
}
/**
* Returns a new {@link SpringWebMvcTracing} with the settings of this {@link
* SpringWebMvcTracingBuilder}.
*/
public SpringWebMvcTracing build() {
SpringWebMvcHttpAttributesExtractor httpAttributesExtractor =
new SpringWebMvcHttpAttributesExtractor(capturedHttpHeaders);
Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter =
Instrumenter.<HttpServletRequest, HttpServletResponse>builder(
openTelemetry,
INSTRUMENTATION_NAME,
HttpSpanNameExtractor.create(httpAttributesExtractor))
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor))
.addAttributesExtractor(httpAttributesExtractor)
.addAttributesExtractor(new StatusCodeExtractor())
.addAttributesExtractor(new SpringWebMvcNetAttributesExtractor())
.addAttributesExtractors(additionalExtractors)
.addRequestMetrics(HttpServerMetrics.get())
.addContextCustomizer(ServerSpanNaming.get())
.newServerInstrumenter(JavaxHttpServletRequestGetter.INSTANCE);
return new SpringWebMvcTracing(instrumenter);
}
}
opentelemetry-java-instrumentation是通过instrumenter实现的Instrumentation逻辑,所以instrumentation逻辑就是构建和使用instrumenter的逻辑。
instrumenter是在TracingBuilder#build()构建的,这也是build方法的主要逻辑。
可以看下springmvc的TracingBuilder#build方法
public final class SpringWebMvcTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-webmvc-3.1";
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<HttpServletRequest, HttpServletResponse>>
additionalExtractors = new ArrayList<>();
private CapturedHttpHeaders capturedHttpHeaders = CapturedHttpHeaders.server(Config.get());
SpringWebMvcTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
...
/**
* Returns a new {@link SpringWebMvcTracing} with the settings of this {@link
* SpringWebMvcTracingBuilder}.
*/
public SpringWebMvcTracing build() {
SpringWebMvcHttpAttributesExtractor httpAttributesExtractor =
new SpringWebMvcHttpAttributesExtractor(capturedHttpHeaders);
Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter =
Instrumenter.<HttpServletRequest, HttpServletResponse>builder( // 创建InstrumenterBuilder,用于构建Instrumenter
openTelemetry,
INSTRUMENTATION_NAME,
HttpSpanNameExtractor.create(httpAttributesExtractor)) // SpanNameExtractor,用于生成span name
.setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor))
.addAttributesExtractor(httpAttributesExtractor)
.addAttributesExtractor(new StatusCodeExtractor())
.addAttributesExtractor(new SpringWebMvcNetAttributesExtractor())
.addAttributesExtractors(additionalExtractors)
.addRequestMetrics(HttpServerMetrics.get())
.addContextCustomizer(ServerSpanNaming.get())
.newServerInstrumenter(JavaxHttpServletRequestGetter.INSTANCE);
return new SpringWebMvcTracing(instrumenter);
}
}
Instrumenter是使用InstrumenterBuilder构建的,InstrumenterBuilder由Instrumenter#build方法创建。
Instrumenter的组成部分就是对手动方式中加入的各种信息的组件化标准化,分为几个部分:
OpenTelemetry对象,实现了OpenTelemetry协议,提供了trace核心功能。
instrumentation library的名称,不是instrumented library的名称,是instrumented library的trace逻辑的类库。
需要全局唯一,方便问题排查。
springmvc的为HttpSpanNameExtractor,会调用其extract方法获取名称:
@Override
public String extract(REQUEST request) {
String route = extractRoute(request);
if (route != null) {
return route;
}
String method = attributesExtractor.method(request);
if (method != null) {
return "HTTP " + method;
}
return "HTTP request";
}
可以看到springmvc span name的名称会依次取route、method、硬编码的HTTP request。
但javaagent实现中并没有用这个,而是用了HandlerSpanNameExtractor:
public class HandlerSpanNameExtractor implements SpanNameExtractor<Object> {
@Override
public String extract(Object handler) {
Class<?> clazz;
String methodName;
if (handler instanceof HandlerMethod) {
// 调用http://localhost:8888/spanWithLinks时会进这里
// name span based on the class and method name defined in the handler
Method method = ((HandlerMethod) handler).getMethod();
clazz = method.getDeclaringClass();
methodName = method.getName();
} else if (handler instanceof HttpRequestHandler) {
// org.springframework.web.servlet.mvc.HttpRequestHandlerAdapter
clazz = handler.getClass();
methodName = "handleRequest";
} else if (handler instanceof Controller) {
// org.springframework.web.servlet.mvc.SimpleControllerHandlerAdapter
clazz = handler.getClass();
methodName = "handleRequest";
} else if (handler instanceof Servlet) {
// org.springframework.web.servlet.handler.SimpleServletHandlerAdapter
clazz = handler.getClass();
methodName = "service";
} else {
// perhaps org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter
clazz = handler.getClass();
methodName = "<annotation>";
}
// 然后进这里,这里的逻辑就是clazz.simpleName + methodName
return SpanNames.fromMethod(clazz, methodName);
}
在jaeger中的体现为:
AttributesExtractor
用于在请求开始和结束时从请求或者响应中提取属性放到span中,一个instrmenter可以有多个,
有两个方法:
比如StatusCodeExtractor,从response中提取了http status code放入了span中
final class StatusCodeExtractor
implements AttributesExtractor<HttpServletRequest, HttpServletResponse> {
@Override
public void onStart(AttributesBuilder attributes, HttpServletRequest httpServletRequest) {}
@Override
public void onEnd(
AttributesBuilder attributes,
HttpServletRequest httpServletRequest,
@Nullable HttpServletResponse response,
@Nullable Throwable error) {
if (response != null) {
long statusCode;
if (!response.isCommitted() && error != null) {
statusCode = 500;
} else {
statusCode = response.getStatus();
}
set(attributes, SemanticAttributes.HTTP_STATUS_CODE, statusCode);
}
}
}
可以通过InstrumenterBuilder#addAttributesExtractor添加。
SpanStatusExtractor用于设置span的status,
比如springmvc的HttpSpanStatusExtractor
public final class HttpSpanStatusExtractor<REQUEST, RESPONSE>
implements SpanStatusExtractor<REQUEST, RESPONSE> {
...
@Override
public StatusCode extract(REQUEST request, @Nullable RESPONSE response, Throwable error) {
if (response != null) {
// 通过status code判断span状态是UNSET还是ERROR
Integer statusCode = attributesExtractor.statusCode(request, response);
if (statusCode != null) {
StatusCode statusCodeObj = statusConverter.statusFromHttpStatus(statusCode);
if (statusCodeObj == StatusCode.ERROR) {
return statusCodeObj;
}
}
}
// 通过是否有error判断span状态是UNSET还是ERROR
return SpanStatusExtractor.getDefault().extract(request, response, error);
}
}
可以通过InstrumenterBuilder#setSpanStatusExtractor设置。
简单实现可以参考
class MySpanLinksExtractor implements SpanLinksExtractor<Request> {
@Override
public void extract(SpanLinksBuilder spanLinks, Context parentContext, Request request) {
for (RelatedOperation op : request.getRelatedOperations()) {
spanLinks.addLink(op.getSpanContext());
}
}
}
通过InstrumenterBuilder#addSpanLinksExtractor添加。
简单实现参考
class MyErrorCauseExtractor implements ErrorCauseExtractor {
@Override
public Throwable extractCause(Throwable error) {
if (error instanceof MyLibWrapperException && error.getCause() != null) {
error = error.getCause();
}
return ErrorCauseExtractor.jdk().extractCause(error);
}
}
通过InstrumenterBuilder#setErrorCauseExtractor设置。
用于满足定制化的时间获取,比如从request和response中获取时间:
class MyTimeExtractor implements TimeExtractor<Request, Response> {
@Override
public Instant extractStartTime(Request request) {
return request.startTimestamp();
}
@Override
public Instant extractEndTime(Request request, @Nullable Response response, @Nullable Throwable error) {
if (response != null) {
return response.endTimestamp();
}
return request.clock().now();
}
}
通过InstrumenterBuilder#setTimeExtractor设置
在instrumenter#start前对context进行修改的拓展点,比如上面的ServerSpanNaming.get()的实现:
public final class ServerSpanNaming {
private static final ContextKey<ServerSpanNaming> CONTEXT_KEY =
ContextKey.named("opentelemetry-servlet-span-naming-key");
public static <REQUEST> ContextCustomizer<REQUEST> get() {
return (context, request, startAttributes) -> {
if (context.get(CONTEXT_KEY) != null) {
return context;
}
return context.with(CONTEXT_KEY, new ServerSpanNaming(Source.CONTAINER));
};
}
}
通过InstrumenterBuilder#addContextCustomizer添加。
InstrumenterBuilder提供了很多build方法来最终生成指定span kind的Intrumenter:
SpanKindExtractor简单例子:
class MySpanKindExtractor implements SpanKindExtractor<Request> {
@Override
public SpanKind extract(Request request) {
// 根据request决定span kind
return request.shouldSynchronouslyWaitForResponse() ? SpanKind.CLIENT : SpanKind.PRODUCER;
}
}
使用时分三步:
具体用法和以及细节可以参考springmvc的WebMvcTracingFilter
final class WebMvcTracingFilter extends OncePerRequestFilter implements Ordered {
private final Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter;
WebMvcTracingFilter(Instrumenter<HttpServletRequest, HttpServletResponse> instrumenter) {
this.instrumenter = instrumenter;
}
@Override
public void doFilterInternal(
HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
Context parentContext = Context.current();
if (!instrumenter.shouldStart(parentContext, request)) {
// 如果当前请求没有被trace,跳过instrumentated operation
filterChain.doFilter(request, response);
return;
}
// 开始instrumented operation
Context context = instrumenter.start(parentContext, request);
// 这个scope可以保证后面的span都是当前span的child span,具体可以参考手动部分
try (Scope ignored = context.makeCurrent()) {
filterChain.doFilter(request, response);
// 正常结束instrumented operation
instrumenter.end(context, request, response, null);
} catch (Throwable t) {
// 有异常时传入异常,并结束instrumented operation
instrumenter.end(context, request, response, t);
throw t;
}
}
@Override
public void destroy() {}
@Override
public int getOrder() {
// Run after all HIGHEST_PRECEDENCE items
return Ordered.HIGHEST_PRECEDENCE + 1;
}
}
建议自己看下Instrumenter#start和Instrumenter#end代码,很简单,看完后会对instrumenter的结构和流程有更深刻的理解。
有了library,利用springboot集成就比较简单了。
这种方式不算是auto-instrumentation,只能算semi-auto-instrumentation,因为做不到对代码完全无侵入。
相比于javaagent的方式,调试难度大大降低,前期如果没有把握,可以先用这种方式集成到项目中。
springboot的方式分两大步:
opentelemetry-java-instrumentation项目中已有实现,可参考instrumentation/spring/spring-boot-autoconfigure,
但不要直接使用,存在性能问题,下面会细说怎么改,建议参考这部分的逻辑自己实现,因为这部分逻辑很少也很简单。
参考OpenTelemetryAutoConfiguration
@Configuration
@EnableConfigurationProperties(SamplerProperties.class)
public class OpenTelemetryAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public OpenTelemetry openTelemetry(
SamplerProperties samplerProperties,
ObjectProvider<ContextPropagators> propagatorsProvider,
ObjectProvider<List<SpanExporter>> spanExportersProvider) {
SdkTracerProviderBuilder tracerProviderBuilder = SdkTracerProvider.builder();
spanExportersProvider.getIfAvailable(Collections::emptyList).stream()
.map(SimpleSpanProcessor::create)
.forEach(tracerProviderBuilder::addSpanProcessor);
SdkTracerProvider tracerProvider =
tracerProviderBuilder
.setSampler(Sampler.traceIdRatioBased(samplerProperties.getProbability()))
.build();
ContextPropagators propagators = propagatorsProvider.getIfAvailable(ContextPropagators::noop);
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(propagators)
.build();
}
}
不能直接用的原因有3个:
可以简单改为:
@Configuration
public class OpenTelemetryConfiguration {
@Bean
public JaegerThriftSpanExporter otelJaegerThriftSpanExporter() {
return JaegerThriftSpanExporter.builder()
.setEndpoint("http://alert-store.bigo.sg:8880/trace/api/traces")
//.setEndpoint("http://169.136.115.37:14268/api/traces")
//.setEndpoint("http://localhost:14268/api/traces")
.build();
}
@Bean
public OpenTelemetry openTelemetry(
SamplerProperties samplerProperties,
ObjectProvider<ContextPropagators> propagatorsProvider,
ObjectProvider<List<SpanExporter>> spanExportersProvider) {
SdkTracerProviderBuilder tracerProviderBuilder = SdkTracerProvider.builder();
tracerProviderBuilder.setResource(Resource.create(
Attributes.builder()
.putAll(Resource.getDefault().getAttributes())
.put(ResourceAttributes.SERVICE_NAME, "ec-platform-trade-center-chl2")
.build()
));
spanExportersProvider.getIfAvailable(Collections::emptyList).stream()
.map(exporter -> BatchSpanProcessor.builder(exporter)
// TODO 设置队列大小, 超时等参数
.build())
.forEach(tracerProviderBuilder::addSpanProcessor);
SdkTracerProvider tracerProvider =
tracerProviderBuilder
.setSampler(Sampler.traceIdRatioBased(samplerProperties.getProbability()))
.build();
// propagator如有需要也可以拓展,这个一般是写死的
ContextPropagators propagators = propagatorsProvider.getIfAvailable(ContextPropagators::noop);
return OpenTelemetrySdk.builder()
.setTracerProvider(tracerProvider)
.setPropagators(propagators)
.build();
}
}
可能的依赖:
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-spring-boot-autoconfigure</artifactId>
<version>1.10.1-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.10.1</version>
</dependency>
<!-- simple span exporter -->
<!-- outputs spans to console -->
<!-- provides opentelemetry-sdk artifact -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporters-logging</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger-thrift</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.opentelemetry/opentelemetry-semconv -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.10.1-alpha</version>
</dependency>
如果已经引入了javaagent会更简单,直接使用javaagent的openTelemetry:
@Configuration
public class OpenTelemetryConfiguration {
@Bean
public OpenTelemetry openTelemetry() {
// 这个的原理在上面说过了
return GlobalOpenTelemetry.get();
}
}
可以使用上面的SpringWebMvcTracing#newServletFilter将Filter集成进来,参考WebMvcFilterAutoConfiguration
@Configuration
@EnableConfigurationProperties(WebMvcProperties.class)
@ConditionalOnProperty(prefix = "otel.springboot.web", name = "enabled", matchIfMissing = true)
@ConditionalOnClass(OncePerRequestFilter.class)
public class WebMvcFilterAutoConfiguration {
@Bean
public Filter otelWebMvcTracingFilter(OpenTelemetry openTelemetry) {
// 这里的openTelemetry就是刚才创建的OpenTelemetry Bean,其他类库可能不是newServletFilter这个方法,但类似
return SpringWebMvcTracing.create(openTelemetry).newServletFilter();
}
}
没有仔细研究,就不写了。
推荐直接看官方文档。
这种方式可以集成已有的library,但更多的是用ByteBuddy修改字节码,直接插入trace逻辑。难度高,但对接入方最友好。
在javaagent初始化流程中,可以看到是通过SPI机制加载了InstrumentationModule,是instrumentation javaagent实现中当之无愧的C位。
由于是在javaagent环境下,并且还有muzzle检查的需要,在实现时需要遵循一些规范。
下面还是以springmvc的javaagent为例一起看下如何实现一个InstrumentationModule。
在javaagent目录下创建build.gradle.kts文件,并加上如下内容:
plugins {
// gradle插件
id("otel.javaagent-instrumentation")
}
dependencies {
// 如果依赖上面的library,需要加进来,但有些没依赖,比如springmvc的
implementation(project(":instrumentation:yarpc-1.0:library"))
// testing依赖
testImplementation(project(":instrumentation:yarpc-1.0:testing"))
}
plugins {
id("otel.javaagent-instrumentation")
}
muzzle {
pass {
group.set("org.springframework")
module.set("spring-webmvc")
versions.set("[3.1.0.RELEASE,]")
// these versions depend on org.springframework:spring-web which has a bad dependency on
// javax.faces:jsf-api:1.1 which was released as pom only
skip("1.2.1", "1.2.2", "1.2.3", "1.2.4")
// 3.2.1.RELEASE has transitive dependencies like spring-web as "provided" instead of "compile"
skip("3.2.1.RELEASE")
extraDependency("javax.servlet:javax.servlet-api:3.0.1")
assertInverse.set(true)
}
}
...
dependencies {
...
}
...
除了上述部分外,还加了muzzle的gradle插件做兼容性检查,建议我们在给新类库写instrumentation时也加上,具体可以参考muzzle docs。
注意:下面的所有方法中只有typeInstrumentations是必须实现的,其他方法都是可选的。
先看下springmvc的SpringWebMvcInstrumentationModule的例子:
@AutoService(InstrumentationModule.class)
public class SpringWebMvcInstrumentationModule extends InstrumentationModule {
public SpringWebMvcInstrumentationModule() {
super("spring-webmvc", "spring-webmvc-3.1");
}
@Override
public boolean isHelperClass(String className) {
return className.startsWith(
"org.springframework.web.servlet.OpenTelemetryHandlerMappingFilter");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new DispatcherServletInstrumentation(), new HandlerAdapterInstrumentation());
}
}
InstrumentationModule是通过SPI机制加载的,所以需要遵循SPI的规定去写:
上面直接使用了@AutoService注解实现,当然也可以直接在META-INF/services/ 创建对应文件。
在构造器中指明了Instrumentation的名称和支持的最低版,名称可以指定多个,必须全小写,中划线连接,用于在runtime下通过参数禁用它。
InstrumentationModule实现了Ordered接口,支持指定顺序,初始化时会按从小到大的顺序执行:
@Override
public int order() {
return 1;
}
springmvc的实现中没有这个需求,就没实现。
上面springmvc实现中实现了isHelperClass方法,这样在muzzle检查时会将org.springframework.web.servlet.OpenTelemetryHandlerMappingFilter当做helper class看待,相比于普通的检查,helper class的检查会更加严格,如果希望对自己的class进行检查,可以将其指定为helper class。
如果希望对某个类做更细的检查,可以加到helper class中,以防万一。
比如springmvc的SpringWebMvcInstrumentationModule
@AutoService(InstrumentationModule.class)
public class SpringWebMvcInstrumentationModule extends InstrumentationModule {
public SpringWebMvcInstrumentationModule() {
super("spring-webmvc", "spring-webmvc-3.1");
}
@Override
public boolean isHelperClass(String className) {
return className.startsWith(
"org.springframework.web.servlet.OpenTelemetryHandlerMappingFilter");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new DispatcherServletInstrumentation(), new HandlerAdapterInstrumentation());
}
}
只有更直接的方法可以指定helper class,就是通过getAdditionalHelperClassNames方法。
public List<String> getAdditionalHelperClassNames() {
return Arrays.asList(
"org.my.library.instrumentation.SomeHelper",
"org.my.library.instrumentation.AnotherHelper");
}
如果这些类存在继承关系,需要保证父类在前,子类在后。
这个方法很少被实现。
可以通过classLoaderMatcher判断当前Instrumentation是否应该被加载,因为字节码修改过程相比于判断过程是很慢的,这种简单判断可以快速过滤掉一些需要加载的Instrumentation。
比如SpringWebInstrumentationModule
@AutoService(InstrumentationModule.class)
public class SpringWebInstrumentationModule extends InstrumentationModule {
public SpringWebInstrumentationModule() {
super("spring-web", "spring-web-3.1");
}
@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// class added in 3.1
return hasClassesNamed("org.springframework.web.method.HandlerMethod");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new WebApplicationContextInstrumentation());
}
}
只有当HandlerMethod存在时才加载Instrumentation,因为这个类是3.1后才出现的,会有兼容问题。
这个是真正的字节码匹配+修改逻辑。
在InstrumentationModule所有方法中,只有typeInstrumentations这个方法是必须实现,而且不允许返回空的,否则这个InstrumentationModule什么都不会做。
比如上面就定义了两个:
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new DispatcherServletInstrumentation(), new HandlerAdapterInstrumentation());
}
TypeInstrumentation的主要逻辑是对指定类型的类做字节码需改,有三个方法:
以HandlerAdapterInstrumentation为例看下实现:
这个TypeInstrumentation是对org.springframework.web.servlet.HandlerAdapter#handle做了字节码修改,看下这个方法签名
public interface HandlerAdapter {
...
@Nullable
ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception;
...
}
看下HandlerAdapterInstrumentation逻辑:
public class HandlerAdapterInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
// classloader必须包含org.springframework.web.servlet.HandlerAdapter,根据名称匹配类很快的
return hasClassesNamed("org.springframework.web.servlet.HandlerAdapter");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// 必须实现org.springframework.web.servlet.HandlerAdapter接口,这个相比于classLoaderOptimization要慢一些了,需要检查类的接口
return implementsInterface(named("org.springframework.web.servlet.HandlerAdapter"));
}
@Override
public void transform(TypeTransformer transformer) {
// 字节码修改主逻辑,也是先匹配再修改
transformer.applyAdviceToMethod(
// 判断是否有满足要求的方法,这里要写得足够细,以免误修改
// 这里匹配的是: org.springframework.web.servlet.HandlerAdapter#handle
isMethod()
.and(isPublic())
.and(nameStartsWith("handle"))
.and(takesArgument(0, named("javax.servlet.http.HttpServletRequest")))
.and(takesArguments(3)),
// 满足要求后的字节码修改逻辑,看下面的ControllerAdvice,
// 这里的ControllerAdvice为字符串而不是class对象的原因是:
// HandlerAdapterInstrumentation是由agent class loader加载的,如果ControllerAdvice直接写class对象,这个类会被加载到agent class loader,
// agent class loader对app是隔离的,而ControllerAdvice的逻辑是需要在app中使用的,
// 所以必须由app class laoder加载。字符串的形式才有可能在runtime时由app class loader加载
HandlerAdapterInstrumentation.class.getName() + "$ControllerAdvice");
}
@SuppressWarnings("unused")
public static class ControllerAdvice {
// 在方法入口处插入逻辑
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void nameResourceAndStartSpan(
// 获取第0个参数,即HttpServletRequest request
@Advice.Argument(0) HttpServletRequest request,
// 获取第2个参数,即Object handler
@Advice.Argument(2) Object handler,
// 定义两个本地变量在OnMethodExit时用
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
// TODO (trask) should there be a way to customize Instrumenter.shouldStart()?
if (isGrailsHandler(handler)) {
// skip creating handler span for grails, grails instrumentation will take care of it
return;
}
Context parentContext = Java8BytecodeBridge.currentContext();
// don't start a new top-level span
if (!Java8BytecodeBridge.spanFromContext(parentContext).getSpanContext().isValid()) {
return;
}
// Name the parent span based on the matching pattern
ServerSpanNaming.updateServerSpanName(
parentContext, CONTROLLER, SpringWebMvcServerSpanNaming.SERVER_SPAN_NAME, request);
// 判断是非被trace
if (!handlerInstrumenter().shouldStart(parentContext, handler)) {
// 没被trace直接结束
return;
}
// 创建span,其实就是Instrumenter#start,就是上面library的同款,handlerInstrumenter()中初始化了Instrumenter
context = handlerInstrumenter().start(parentContext, handler);
// 创建当前context的scope
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Argument(2) Object handler,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
// 结束span,调用了Instrumenter#end
handlerInstrumenter().end(context, handler, null, throwable);
}
}
}
其实javaagent的写法,只不过是用ByteBuddy提供的字节码侵入能力代替了类库提供的拓展点,拓展点能干的事情通过ByteBuddy侵入字节码一样能干。
比如用SpringBoot启动web项目时,在Spring IOC(应该是AnnotationConfigApplicationContext)中插入上面的OpenTelemetryHandlerMappingFilter,逻辑在WebApplicationContextInstrumentation中:
public class WebApplicationContextInstrumentation implements TypeInstrumentation {
...
@SuppressWarnings("unused")
public static class FilterInjectingAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(0) ConfigurableListableBeanFactory beanFactory) {
if (beanFactory instanceof BeanDefinitionRegistry
&& !beanFactory.containsBean("otelAutoDispatcherFilter")) {
try {
// Firstly check whether DispatcherServlet is present. We need to load an instrumented
// class from spring-webmvc to trigger injection that makes
// OpenTelemetryHandlerMappingFilter available.
beanFactory
.getBeanClassLoader()
.loadClass("org.springframework.web.servlet.DispatcherServlet");
// Now attempt to load our injected instrumentation class.
Class<?> clazz =
beanFactory
.getBeanClassLoader()
.loadClass("org.springframework.web.servlet.OpenTelemetryHandlerMappingFilter");
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setScope(SCOPE_SINGLETON);
beanDefinition.setBeanClass(clazz);
beanDefinition.setBeanClassName(clazz.getName());
// 加了一个名为otelAutoDispatcherFilter的BeanDefinition
((BeanDefinitionRegistry) beanFactory)
.registerBeanDefinition("otelAutoDispatcherFilter", beanDefinition);
} catch (ClassNotFoundException ignored) {
// Ignore
}
}
}
}
}
在后面的DispatcherServletInstrumentation中去使用
public class DispatcherServletInstrumentation implements TypeInstrumentation {
...
@SuppressWarnings("unused")
public static class HandlerMappingAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void afterRefresh(
@Advice.Argument(0) ApplicationContext springCtx,
@Advice.FieldValue("handlerMappings") List<HandlerMapping> handlerMappings) {
if (springCtx.containsBean("otelAutoDispatcherFilter")) {
OpenTelemetryHandlerMappingFilter filter =
(OpenTelemetryHandlerMappingFilter) springCtx.getBean("otelAutoDispatcherFilter");
if (handlerMappings != null && filter != null) {
filter.setHandlerMappings(handlerMappings);
}
}
}
}
...
}
没有仔细研究,推荐直接看官方文档。
更多细节参考writing-instrumentation和writing-instrumentation-module
opentelemetry-java-instrumentation还提供了一些手段检查app的symbols(class, method, field)和Instrument逻辑兼容性,不兼容则不会执行Instrument逻辑,所以不会影响app正常运行。
核心功能是由groovy实现的,并且不太会有拓展的需求,就不展开说了,感兴趣的可以参考Understanding Muzzle。
建议对需要用到的新类库用muzzle-check-gradle-plugin检查下是否兼容,并做好充分测试,灰度上线。
重量级锁是基于ObjectMonitor的,先看下它的数据结构:
紧接着上面轻量级锁失败,会进入膨胀流程。
不要误以为获得了这个对象就加送成功了,比如轻量级锁膨胀为重量级锁,不一定是轻量级锁对应线程触发了膨胀,也可能是竞争的线程,这时膨胀后的轻量级锁依然不会被当前线程获取到。
主逻辑代码如下:
// 膨胀为重量级锁
// 只返回重量级锁(ObjectMonitor), 并不负责锁占有的逻辑
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) {
// Inflate mutates the heap ...
// Relaxing assertion for bug 6320749.
assert (Universe::verify_in_progress() ||
// 必须在安全点才能膨胀
!SafepointSynchronize::is_at_safepoint(), "invariant") ;
// 先提前说下ObjectMonitor的获取和释放
// 重量级锁的ObjectMonitor的存储使用了free list,
// 获取时调用omAlloc,
// 从omFreeList中获取, 如果omFreeList为空,则从JVM全局的gFreeList中分配一批ObjectMonitor到omFreeList中
// 获取成功的ObjectMonitor放到omInUseList中,
// 然后返回
// 释放时调用omRelease, 从omInUseList换回到omFreeList中
for (;;) {
// 这是个for循环, 是为了处理多线程同时走膨胀逻辑的情况
// 如果这样, CAS可能会失败, 需要获取最新状态并重新来
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
// The mark can be in one of the following states:
// MarkWord可以是以下几种状态
// * Inflated(膨胀完成, 已经是重量级锁) - just return 直接返回
// * Stack-locked(轻量级锁) - coerce it to inflated 走膨胀逻辑
// * INFLATING(膨胀中) - busy wait for conversion to complete 忙, 等待膨胀结束
// * Neutral(无锁) - aggressively inflate the object. 走膨胀逻辑
// * BIASED(偏向锁) - Illegal. We should never see this 讲道理这里不该出现这种情况
// 下面就是针对上面4中可能的情况走不同的处理逻辑
// CASE: inflated
if (mark->has_monitor()) {
// 已经膨胀完成,已经是重量级锁的情况
// 直接返回MarkWord中的ObjectMonitor
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
// CASE: inflation in progress - inflating over a stack-lock.
// Some other thread is converting from stack-locked to inflated.
// Only that thread can complete inflation -- other threads must wait.
// The INFLATING value is transient.
// Currently, we spin/yield/park and poll the markword, waiting for inflation to finish.
// We could always eliminate polling by parking the thread on some auxiliary list.
if (mark == markOopDesc::INFLATING()) {
// 膨胀中的情况
TEVENT (Inflate: spin while INFLATING) ;
// 自旋等待,知道膨胀**状态解除, 这个方法中会有一个for循环用于等待,
// 根据在for循环中的循环次数, 会有不同的等待策略, 可能会spin(自旋)/yield/park
ReadStableMark(object) ;
continue ;
}
// CASE: stack-locked
// Could be stack-locked either by this thread or by some other thread.
// 可以是当前线程的stack-lock, 也可以是其他线程的
//
// Note that we allocate the objectmonitor speculatively, _before_ attempting
// to install INFLATING into the mark word. We originally installed INFLATING,
// allocated the objectmonitor, and then finally STed the address of the
// objectmonitor into the mark. This was correct, but artificially lengthened
// the interval in which INFLATED appeared in the mark, thus increasing
// the odds of inflation contention.
// 翻译过来大致意思是:
// 注意, 我们在将MarkWord状态更新为INLATING之前分配了ObjectMonitor.
// 之前, 我们先将状态更新为INFLATING, 然后分配ObjectMonitor, 最后将ObjectMonitor的地址写到MarkWord中.
// 这么做没错, 但是会人为拉长MarkWord为INFLATING状态的时间, 增加了膨胀的可能性.
// 最后一句很难理解, 我觉得没增加膨胀的可能性, 拉长INFLATING状态的持续时间,
// 会增加其他线程的等待时长, 导致其他线程被挂起, 从而总的等待时长增加.
// 所以应该想办法减少INFLATING状态的持续时间
//
// We now use per-thread private objectmonitor free lists.
// These list are reprovisioned from the global free list outside the
// critical INFLATING...ST interval. A thread can transfer
// multiple objectmonitors en-mass from the global free list to its local free list.
// This reduces coherency traffic and lock contention on the global free list.
// Using such local free lists, it doesn't matter if the omAlloc() call appears
// before or after the CAS(INFLATING) operation.
// See the comments in omAlloc().
// 说下ObjectMonitor的实现
// OjbectMonitor的获取是基于free list实现的. free list有全局的和线程本地的两种.
// 全局的free list是事先分配好的, 独立于膨胀流程的.
// 每个线程一次会从全局的free list 获取多个块, 这种方式降低了为了一致性而加锁的成本,
// 只在本次每次从全局获取时需要加锁, 在本地线程获取ObjectMonitor时并不需要加锁
if (mark->has_locker()) {
// 轻量级锁的情况
ObjectMonitor * m = omAlloc (Self) ;
// Optimistically prepare the objectmonitor - anticipate successful CAS
// We do this before the CAS in order to minimize the length of time
// in which INFLATING appears in the mark.
// 第一件事是new了一个ObjectMonitor出来,会加到当前线程的omInUseList中
// 对膨胀的成功率真的很乐观啊
// 不过这样在其他线程抢先膨胀中的时候会节省时间
m->Recycle();
m->_Responsible = NULL ;
m->OwnerIsThread = 0 ;
m->_recursions = 0 ;
// 设置自旋获取重量级锁的次数,默认5000
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class
// 设置MarkWord设置为INFLATING(0)状态
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
if (cmp != mark) {
// 设置失败,说明有其他线程抢先膨胀了
// 将m从当前线程的omInUseList中移除, 其实是放弃自己初始化的ObjectMonitor
omRelease (Self, m, true) ;
// 再次尝试, 再次尝试时会重新判断状态
continue ; // Interference -- just retry
}
// We've successfully installed INFLATING (0) into the mark-word.
// This is the only case where 0 will appear in a mark-work.
// Only the singular thread that successfully swings the mark-word
// to 0 can perform (or more precisely, complete) inflation.
// 走到这里说明成功将MarkWord置为0
// 这也是唯一能把锁标志位置0的方式
// 只有成功设置的那一个线程才能执行锁膨胀.
//
// Why do we CAS a 0 into the mark-word instead of just CASing the
// mark-word from the stack-locked value directly to the new inflated state?
// Consider what happens when a thread unlocks a stack-locked object.
// It attempts to use CAS to swing the displaced header value from the
// on-stack basiclock back into the object header. Recall also that the
// header value (hashcode, etc) can reside in (a) the object header, or
// (b) a displaced header associated with the stack-lock, or (c) a displaced
// header in an objectMonitor. The inflate() routine must copy the header
// value from the basiclock on the owner's stack to the objectMonitor, all
// the while preserving the hashCode stability invariants. If the owner
// decides to release the lock while the value is 0, the unlock will fail
// and control will eventually pass from slow_exit() to inflate. The owner
// will then spin, waiting for the 0 value to disappear. Put another way,
// the 0 causes the owner to stall if the owner happens to try to
// drop the lock (restoring the header from the basiclock to the object)
// while inflation is in-progress. This protocol avoids races that might
// would otherwise permit hashCode values to change or "flicker" for an object.
// Critically, while object->mark is 0 mark->displaced_mark_helper() is stable.
// 0 serves as a "BUSY" inflate-in-progress indicator.
// 为什么要先将MarkWord置为INFLATING(0),
// 而不是将MarkWord直接从轻量级锁(stack-lock)直接变成重量级锁(inflated state)?
// 为了防止竞争导致hashCode改变或者抖动!
//
// 解释下为什么会这样:
// 回忆下对象头的值(hashCode等)可能在什么地方:
// 1. 对象MarkWord,
// 2. LockRecord(stack-lock)的displaced_header中
// 3. ObjectMonitor的displaced_header中
// 想象这样的场景:
// 线程1:
// 一个线程想要unlock轻量级锁(stack-lock), 此时MarkWord是指向LockRecord的
// 这时需要将栈帧中的轻量级锁(LockRecord)的displaced_header写回到object的MarkWord中
// 线程2:
// 轻量级锁正在膨胀, 这个时候需要将LockRecord中的header设置到ObjectMonitor中,
// 再将MarkWord指向ObjectMonitor
//
// 在这时, 如果需要计算hashCode, 因为的状态不稳定,
// 所以计算出来的hashCode可能会放在LockRecord的displaced_header中,
// 也可能在ObjectMonitor的displaced_header中
// 读的时候也一样,
// 导致hashCode被改变或者抖动, 这不是我们期望的情况.
//
// 所以加了一个中间状态INFLATING,
// 表示膨胀中, 请勿打扰, 其他操作需要暂时等待, 包括对hashCode的操作
// 这个时候如果要获取hashCode, 也得等待(暂时没空debug过去看, 猜的)
// fetch the displaced mark from the owner's stack.
// The owner can't die or unwind past the lock while our INFLATING
// object is in the mark. Furthermore the owner can't complete
// an unlock on the object, either.
// 从displaced_mark_helper中获取monitor_value,
// 其实是obj原始的MarkWord, 不包括锁标志位
// 这个过程中owner是不能挂的, 而且不能unlock
// dmw应该是DisplacedMarkWord的意思
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
// Setup monitor fields to proper values -- prepare the monitor
// 将dmw设置进ObjectMonitor
m->set_header(dmw) ;
// Optimization: if the mark->locker stack address is associated
// with this thread we could simply set m->_owner = Self and
// m->OwnerIsThread = 1. Note that a thread can inflate an object
// that it has stack-locked -- as might happen in wait() -- directly
// with CAS. That is, we can avoid the xchg-NULL .... ST idiom.
// 设置ObjectMonitor的owner为LockRecord,而不是当前线程(Self)
// 因为触发锁膨胀的线程不一定是持有锁的线程, 可能是其他线程, 比如一个从wait中醒来的线程
// 所以这里不会将owner设置为当前线程
// 锁的占有可以在ObjectMonitor::enter中做
m->set_owner(mark->locker());
m->set_object(object);
// TODO-FIXME: assert BasicLock->dhw != 0.
// Must preserve store ordering. The monitor state must
// be stable at the time of publishing the monitor address.
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
// MarkWord设置为重量级锁
object->release_set_mark(markOopDesc::encode(m));
// Hopefully the performance counters are allocated on distinct cache lines
// to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite stacklock) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
// CASE: neutral
// TODO-FIXME: for entry we currently inflate and then try to CAS _owner.
// If we know we're inflating for entry it's better to inflate by swinging a
// pre-locked objectMonitor pointer into the object header. A successful
// CAS inflates the object *and* confers ownership to the inflating thread.
// In the current implementation we use a 2-step mechanism where we CAS()
// to inflate and then CAS() again to try to swing _owner from NULL to Self.
// An inflateTry() method that we could call from fast_enter() and slow_enter()
// would be useful.
// 无锁的情况
assert (mark->is_neutral(), "invariant");
// 这里会先初始化一个ObjectMonitor对象,但没有设置owner,
// 原因和上面一样, 请求线程不一定是轻量级锁持有的线程,
// 在外层方法中将owner设置成当前线程
ObjectMonitor * m = omAlloc (Self) ;
// prepare m for installation - set monitor to initial state
m->Recycle();
m->set_header(mark);
// owner为NULL
m->set_owner(NULL);
m->set_object(object);
m->OwnerIsThread = 1 ;
m->_recursions = 0 ;
m->_Responsible = NULL ;
m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class
// 修改MarkWord为重量级锁状态
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
// 设置成功说明MarkWord被另外一个线程修改了, 释放ObjectMonitor
m->set_object (NULL) ;
m->set_owner (NULL) ;
m->OwnerIsThread = 0 ;
m->Recycle() ;
omRelease (Self, m, true) ;
m = NULL ;
continue ;
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
// interference - the markword changed - just retry.
// The state-transitions are one-way, so there's no chance of
// live-lock -- "Inflated" is an absorbing state.
}
// Hopefully the performance counters are allocated on distinct
// cache lines to avoid false sharing on MP systems ...
if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ;
TEVENT(Inflate: overwrite neutral) ;
if (TraceMonitorInflation) {
if (object->is_instance()) {
ResourceMark rm;
tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) object, (intptr_t) object->mark(),
object->klass()->external_name());
}
}
return m ;
}
}
很长,但是逻辑很简单,想偷懒的可以直接看总结:
主要是针对当前obj可能的4种MarkWord状态分别处理:
1.已经是重量级锁(010): 直接返回当前MarkWord指向的重量级锁
2.轻量级锁(00):
轻量级锁膨胀为重量级锁后,并没有被干掉。LockRecord还在,释放时还是需要释放掉
为什么重量级锁的owner不直接存轻量级锁对应的线程?
因为轻量级锁中没有字段存储对应的线程。
也没法加这个字段,因为MarkWord没有空间放这个字段,放在LockRecord中也不行,因为LockRecord是线程私有空间,不允许其他线程访问,如果其他线程不能访问,这个字段就没意义了。
轻量级锁没存对应线程怎么判断自己属于哪个线程?
轻量级锁的LockRecord是在线程的私有连续空间上的,直接判断其LockRecord的地址是否在这个范围上即可。这样的玩法也很神奇,省了个字段效率还特高。
既然这么好为什么重量级锁不这么玩?
因为重量级锁的实现和轻量级锁完全不一样,重量级锁的ObjectMonitor是通过FreeList分配的,没有连续空间,没发做这种判断。
3.膨胀中的状态(整个MarkWord是0)
自旋等待一段时间,根据自旋的次数会yield或者park,知道膨胀中状态解除,然后获取最新状态重新走这段逻辑。所以膨胀中的状态持续时间越短越好。
4.无锁状态
当中的一些优化逻辑看代码注释,**也很简单,不多说了。
这部分不看代码了,就着图说下逻辑。
这是重量级锁不同于偏向锁和轻量级锁的地方,不同于他俩的栈结构,ObjectMonitor的分配和释放是基于FreeList的,在线程私有空间中有两个链表,一个表示空闲的ObjectMonitor——omFreeList,另一个表示使用中的ObjectMonitor——omInUseList。
分配操作就是从omFreeList头部获取一个ObjectMonitor,加到omInUseList头部,操作成功就返回这个ObjectMonitor。
释放操作就是从omInUseList中将对应的ObjectMonitor删掉,然后加回到omFreeList中。
刚才的omFreeList和omInUseList是线程私有的,不存在并发问题,效率很高。
如果omFreeList空了,会从全局空间的gFreeList中重新补给,补给是会获取多个ObjectMonitor,放到omFreeList中,这个操作存在并发问题,会加锁。因为每次重新分配好了就会在很长的一段时间内都不需要加锁,总的来说效率还是很高的。
补给完成后会从omFreeList中获取。
这种批量获取一块空间到线程私有空间中,从而消除并发问题的做法在TLAB中也用过。
如果gFreeList也空了,会直接new一个ObjectMonitor对象返回,这是最低效的。
上面的逻辑会因为一些参数的不同而不同,具体逻辑可以看代码。
加锁的过程会关联两个对象
这部分逻辑和偏向锁以及轻量级锁一样,都会获取一个LockRecord,关联当前对象。也就是说,每次加锁请求都会关联一个LockRecord,无论是什么锁,第一次进入或重入。值得注意的是,重量级锁LockRecord的displaced_header是无锁状态MarkWord,这个状态在解锁时会用到。
if (!success) {
// 准备使用轻量级锁或者重量级锁
// 用当前的MarkWord构造一个无锁的MarkWord作为DisplacedHeader
markOop displaced = lockee->mark()->set_unlocked();
// 保存到LockRecord中, 用于解锁时MarkWord的还原
// 如果当前是轻量级锁, 那么这次是轻量级锁重入, 后面会将dispalced_header置为NULL
// 如果当前是重量级锁状态, 这个dispalced_header不会被置为NULL, 区别于轻量级锁重入
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
// 如果参数强制指定使用重量级锁,
// 或者CAS将MarkWord从无锁状态设置为轻量级锁失败
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
...
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
...
} else {
// 膨胀
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
在slow_enter中膨胀(inflate)完立即调用了enter方法
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
...
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
inflate方法只是完成了膨胀,真正的加锁逻辑得看enter方法
void ATTR ObjectMonitor::enter(TRAPS) {
// The following code is ordered to check the most common cases first
// and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
Thread * const Self = THREAD ;
void * cur ;
// 没有被加锁(_owner==NULL), 设置_owner为当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
// 设置成功
// Either ASSERT _recursions == 0 or explicitly set _recursions = 0.
assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}
if (cur == Self) {
// 重入的情况
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}
if (Self->is_lock_owned ((address)cur)) {
// 当前线程持有该对象的轻量级锁,
// 在膨胀的代码的轻量级锁膨胀逻辑中说了, 膨胀肯定不是轻量级锁对应线程触发的, 肯定是其他线程触发的
// 这里应该是轻量级锁重入时发现自己的轻量级锁被膨胀了
// 由轻量级锁膨胀为重量级锁,且第一次进入enter方法(升级是会设置_owner为LockRecord),
// 那cur是指向LockRecord的指针
assert (_recursions == 0, "internal state error");
// 重入计数置为1,
// 为什么是1?
// 之前说了这里对应的是轻量级锁被膨胀为重量级锁后第一次重入的情况, 重试次数设置成1
// 确实是作为重量级锁的身份第一次重入, 但膨胀之前的轻量级锁可能已经重入过多次了, 轻量级锁的这些重入怎么算?
// 这时是不是应该和当初偏向锁升级为轻量级锁一样, 遍历轻量级锁, 统计下从入次数, 后面每次解锁时减一?
// 实际上没这么实现, ObjectMonitor又是不一样的思路,
// 对于重量级锁的LockRecord, 其displaced_header是不为NULL的,
// 对于轻量级锁的若干次重入, 解锁时判断displaced_header为NUll后直接跳过不操作ObjectMonitor,
// 直到碰到第一次轻量级锁加锁时的displaced_header时的不为NULL的header时, 再对ObjectMonitor操作即可
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
// _owner设置为当前线程,在inflate方法中会设置为LockRecord(轻量级锁膨胀到重量级锁的情况), 或者是NULL(无锁膨胀到重量级锁的情况)
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}
// We've encountered genuine contention.
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
// Try one round of spinning *before* enqueueing Self
// and before going through the awkward and expensive state
// transitions. The following spin is strictly optional ...
// Note that if we acquire the monitor from an initial spin
// we forgo posting JVMTI events and firing DTRACE probes.
// 先尝试自旋获取锁(获取锁的过程就是CAS设置owner为自己)
// 在进行昂贵的系统同步操作前再给自己一次获取锁的机会
if (Knob_SpinEarly && TrySpin (Self) > 0) {
// 如果获取到了, 直接返回
assert (_owner == Self , "invariant") ;
assert (_recursions == 0 , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
Self->_Stalled = 0 ;
return ;
}
assert (_owner != Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (Self->is_Java_thread() , "invariant") ;
JavaThread * jt = (JavaThread *) Self ;
assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ;
assert (jt->thread_state() != _thread_blocked , "invariant") ;
assert (this->object() != NULL , "invariant") ;
assert (_count >= 0, "invariant") ;
// Prevent deflation at STW-time. See deflate_idle_monitors() and is_busy().
// Ensure the object-monitor relationship remains stable while there's contention.
Atomic::inc_ptr(&_count);
EventJavaMonitorEnter event;
// 设置线程状态为BLOCKED
{ // Change java thread status to indicate blocked on monitor enter.
JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this);
Self->set_current_pending_monitor(this);
DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt);
if (JvmtiExport::should_post_monitor_contended_enter()) {
JvmtiExport::post_monitor_contended_enter(jt, this);
// The current thread does not yet own the monitor and does not
// yet appear on any queues that would get it made the successor.
// This means that the JVMTI_EVENT_MONITOR_CONTENDED_ENTER event
// handler cannot accidentally consume an unpark() meant for the
// ParkEvent associated with this ObjectMonitor.
}
OSThreadContendState osts(Self->osthread());
ThreadBlockInVM tbivm(jt);
// TODO-FIXME: change the following for(;;) loop to straight-line code.
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()
// 调用系统同步操作
EnterI (THREAD) ;
... 后半部分在唤醒部分说
获取锁失败呢,就会进入BLOCKED状态,然后入队(EnterI)等待被唤醒,这段代码很有意思,入队前各种尝试抢占锁。
对于轻量级锁重入多次后膨胀为重量级锁的解锁过程,请看解锁部分解析,这里不一定有体会,因为这个实现本身就得加锁和解锁配合实现才行
void ATTR ObjectMonitor::EnterI (TRAPS) {
Thread * Self = THREAD ;
assert (Self->is_Java_thread(), "invariant") ;
assert (((JavaThread *) Self)->thread_state() == _thread_blocked , "invariant") ;
// Try the lock - TATAS
// CAS设置owner为自己, 设置成功返回1,
// 仍然不死心啊
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
DeferredInitialize () ;
// We try one round of spinning *before* enqueueing Self.
//
// If the _owner is ready but OFFPROC we could use a YieldTo()
// operation to donate the remainder of this thread's quantum
// to the owner. This has subtle but beneficial affinity
// effects.
// 来一轮自旋获取锁
if (TrySpin (Self) > 0) {
// 成功直接返回
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
// The Spin failed -- Enqueue and park the thread ...
// 终于放弃了, 乖乖入队等待了?
assert (_succ != Self , "invariant") ;
assert (_owner != Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
// Enqueue "Self" on ObjectMonitor's _cxq.
//
// Node acts as a proxy for Self.
// As an aside(此外), if were to ever rewrite the synchronization code mostly
// in Java, WaitNodes, ObjectMonitors, and Events would become 1st-class
// Java objects. This would avoid awkward lifecycle and liveness issues,
// as well as eliminate a subset of ABA issues.
// TODO: eliminate ObjectWaiter and enqueue either Threads or Events.
//
// 将当前线程封装成ObjectWaiter
ObjectWaiter node(Self) ;
Self->_ParkEvent->reset() ;
node._prev = (ObjectWaiter *) 0xBAD ;
node.TState = ObjectWaiter::TS_CXQ ;
// Push "Self" onto the front of the _cxq.
// Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
// Note that spinning tends to reduce the rate at which threads
// enqueue and dequeue on EntryList|cxq.
// 将线程入队
// 一旦进入cxq/EntryList, 线程在获得锁之前将一直排队.
// 注意, 之前的自旋操作会降低线程出队的概率, 也就是说synchronized是非公平锁.
ObjectWaiter * nxt ;
// 不断尝试CAS入队
for (;;) {
// 头插法进入_cxq
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
// Interference - the CAS failed because _cxq changed. Just retry.
// As an optional optimization we retry the lock.
// CAS失败, 再次尝试获取锁, 这样可以降低进入队列等待的概率
// 相当于逮到机会, 再尝试获取锁一下...
// 这个代码有点可爱了
if (TryLock (Self) > 0) {
assert (_succ != Self , "invariant") ;
assert (_owner == Self , "invariant") ;
assert (_Responsible != Self , "invariant") ;
return ;
}
}
// Check for cxq|EntryList edge transition to non-null. This indicates
// the onset of contention. While contention persists exiting threads
// will use a ST:MEMBAR:LD 1-1 exit protocol. When contention abates exit
// operations revert to the faster 1-0 mode. This enter operation may interleave
// (race) a concurrent 1-0 exit operation, resulting in stranding, so we
// arrange for one of the contending thread to use a timed park() operations
// to detect and recover from the race. (Stranding is form of progress failure
// where the monitor is unlocked but all the contending threads remain parked).
// That is, at least one of the contended threads will periodically poll _owner.
// One of the contending threads will become the designated "Responsible" thread.
// The Responsible thread uses a timed park instead of a normal indefinite park
// operation -- it periodically wakes and checks for and recovers from potential
// strandings admitted by 1-0 exit operations. We need at most one Responsible
// thread per-monitor at any given moment. Only threads on cxq|EntryList may
// be responsible for a monitor.
//
// Currently, one of the contended threads takes on the added role of "Responsible".
// A viable alternative would be to use a dedicated "stranding checker" thread
// that periodically iterated over all the threads (or active monitors) and unparked
// successors where there was risk of stranding. This would help eliminate the
// timer scalability issues we see on some platforms as we'd only have one thread
// -- the checker -- parked on a timer.
// TODO-henry 不知道这要干嘛
// SyncFlags默认是0, nxt==NULL&&_EntryList==NULL说明当前线程入队前_cxq和_EntryList都为空
if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
// Try to assume the role of responsible thread for the monitor.
// CONSIDER: ST vs CAS vs { if (Responsible==null) Responsible=Self }
// 设置_Responsible为当前线程
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// The lock have been released while this thread was occupied queueing
// itself onto _cxq. To close the race and avoid "stranding" and
// progress-liveness failure we must resample-retry _owner before parking.
// Note the Dekker/Lamport duality: ST cxq; MEMBAR; LD Owner.
// In this case the ST-MEMBAR is accomplished with CAS().
//
// TODO: Defer all thread state transitions until park-time.
// Since state transitions are heavy and inefficient we'd like
// to defer the state transitions until absolutely necessary,
// and in doing so avoid some transitions ...
TEVENT (Inflated enter - Contention) ;
int nWakeups = 0 ;
int RecheckInterval = 1 ;
// 在这里会挂起-唤醒, 唯一能跳出循环的方式就是获取锁
for (;;) {
// 再次尝试获取锁...
// 还不死心...
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
if ((SyncFlags & 2) && _Responsible == NULL) {
Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
}
// 终于乖乖挂起了(park)
// park self
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
// Increase the RecheckInterval, but clamp the value.
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
... 后半部分是唤醒逻辑,后面会有解析
这段代码可谓是把非公平的特性表现的淋漓尽致,在入队前也是各种不死心,找到机会就抢占锁,提高自己获取锁的概率。相对的,一但入队等待,想出来获取锁也会很难。
代码中park之前的代码是入队的代码,park之后的代码是唤醒后获取锁的代码。
尝试获取锁的逻辑在TryLock中
int ObjectMonitor::TryLock (Thread * Self) {
for (;;) {
void * own = _owner ;
if (own != NULL) return 0 ;
if (Atomic::cmpxchg_ptr (Self, &_owner, NULL) == NULL) {
// Either guarantee _recursions == 0 or set _recursions = 0.
assert (_recursions == 0, "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert that OwnerIsThread == 1
return 1 ;
}
// The lock had been free momentarily, but we lost the race to the lock.
// Interference -- the CAS failed.
// We can either return -1 or retry.
// Retry doesn't make as much sense because the lock was just acquired.
if (true) return -1 ;
}
}
逻辑简单到懒得注释,就是尝试将owner设为自己,返回设置成功or失败。
这部分代码和轻量级锁解锁代码高度重合
CASE(_monitorexit): {
// synchronized(obj)的obj
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base();
// limit of expression stack
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 按enter时相同的顺序遍历LockRecord, 可以达到栈的效果
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
// 找到当前对象持有的LockRecord
BasicLock* lock = most_recent->lock();
// 从LockRecord中取出刚开始的header
markOop header = lock->displaced_header();
// 解除关联, 偏向锁和重入的轻量级锁只需要这一步就够了
most_recent->set_obj(NULL);
if (!lockee->mark()->has_bias_pattern()) {
// 当前不是偏向锁的情况, 可能是轻量级锁或者重量级锁
bool call_vm = UseHeavyMonitors;
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL || call_vm) {
// header != NULL说明不是轻量级锁的重入, call_vm==true说明是强制使用重量级锁
// 对于不是重入的轻量级锁, 从DisplacedMarkWord还原回去就行,
// 还原失败, 说明轻量级锁可能在某个地方膨胀后MarkWord被换成了重量级锁的
// 需要走InterpreterRuntime::monitorexit
// 对于call_vm==true, 也就是强制走重量级锁的情况, 直接InterpreterRuntime::monitorexit
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// restore object for the slow case
// 先把obj设置回去, 这是为后面在InterpreterRuntime::monitorexit中为重量级锁解锁做准备
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
// 只需要找到一个就退出, 对于重入的情况肯定是后获取的锁先释放
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
// 下一个LockRecord
most_recent++;
}
// Need to throw illegal monitor state exception
CALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);
ShouldNotReachHere();
}
发现没有,即使最后是重量级锁,也需要找到LockRecord。重量级锁在释放时,是先操作的ObjectMonitor,再释放LockRecord(上面将释放的LockRecord重新设置回去了,因为LockRecord在线程私有的栈上,所以不会在释放后被其他线程占用),继续跟下去
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
...
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
... 后半部分代码后面分析
IRT_END
别忘了,重量级锁LockRecord的displaced_header为无锁状态的MarkWord
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
// if displaced header is null, the previous enter is recursive enter, no-op
markOop dhw = lock->displaced_header();
markOop mark ;
// 不仅是轻量级锁钟乳, 当前是 重量级锁也可能会进到这里,
// 如果膨胀为重量级锁前, 作为轻量级锁被重入多次, 重入的这些LockRecord会进到这里处理
if (dhw == NULL) {
// dhw为NULL,说明是轻量级重入的线程
// 因为重入时displaced_header会被设置为NULL
// 只做了些校验,没有实质性逻辑, 相当于啥也没做
// Recursive stack-lock.
// Diagnostics -- Could be: stack-locked, inflating, inflated.
mark = object->mark() ;
assert (!mark->is_neutral(), "invariant") ;
if (mark->has_locker() && mark != markOopDesc::INFLATING()) {
assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ;
}
if (mark->has_monitor()) {
ObjectMonitor * m = mark->monitor() ;
assert(((oop)(m->object()))->mark() == mark, "invariant") ;
assert(m->is_entered(THREAD), "invariant") ;
}
return ;
}
mark = object->mark() ;
// If the object is stack-locked by the current thread, try to
// swing the displaced header from the box back to the mark.
// 只有轻量级锁会进
if (mark == (markOop) lock) {
// 因为对于轻量级锁来说, 锁标志位是00, 所以mark==(markOop)lock表示MarkWord指向LockRecord
// 如果MarkWord就是LockRecord,将LockRecord的displaced_header写回MarkWord中
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
// 写成功就直接结束
// 否则, 说明存在获取MarkWord后, MarkWord被改了, 存在竞争, 这时的轻量级锁应该升级为重量级锁了
// 需要获取膨胀后的重量级锁, 然后释放
TEVENT (fast_exit: release stacklock) ;
return;
}
}
// 上面没成功, 说明轻量级锁已经膨胀为重量级锁
// 需要获取到重量级锁, 然后释放
ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
对于重量级锁,也会走到轻量级锁重入的逻辑中去,怎么回事呢?先把最后一行和锁释放相关代码看完再揭晓
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
if (THREAD != _owner) {
// ObjectMonitor的owner不是当前线程
if (THREAD->is_lock_owned((address) _owner)) {
// 当前线程拥有轻量级锁,
// 说明轻量级锁膨胀为重量级锁后, 还没调用enter方法, owner依然指向LockRecord
// 通过is_lock_owned方法可以判断这个地址是否在LockRecord的栈上
// Transmute _owner from a BasicLock pointer to a Thread address.
// We don't need to hold _mutex for this transition.
// Non-null to Non-null is safe as long as all readers can
// tolerate either flavor.
assert (_recursions == 0, "invariant") ;
// owner设置为当前线程, 继续走后面的流程
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// 抛错
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
// 对于重入的情况, 重入-1
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
... 此时当前线程完全释放锁,准备将owner交给其他线程了
这张图对应代码如下:
synchronized(obj1) { // LockRecord1加锁 displaced_header不为NULL 轻量级锁
synchronized(obj1) { // LockRecord2加锁 displaced_header为NULL 轻量级锁重入
// 其他线程竞争obj1,膨胀为重量级锁,owner指向LockRecord1
synchronized(obj1) { // LockRecord3加锁 displaced_header不为NULL 重量级锁重入次数1 owner改为当前线程
synchronized(obj1) { // LockRecord4加锁 displaced_header不为NULL 重量级锁重入次数2
} // LockRecord4解锁 displaced_header不为NULL 重量级锁重入次数1
} // LockRecord3解锁 displaced_header为NULL 重量级锁重入次数0
} // LockRecord2解锁 displaced_header不为NULL 不会进入ObjectMonitor::exit,在fast_enter中就会返回
} // LockRecord1解锁 displaced_header不为为NULL 重量级锁重入次数0 唤醒其他线程
想象如下情况:
ObjectMonitor::exit
中对重入次数减1,重量级锁重入次数为1ObjectMonitor::exit
中对重入次数减1,重量级锁重入次数为0fast_exit
的if (dhw == NULL
处就会返回,不会进到ObjectMonitor::exit
中ObjectMonitor::exit
中,走完上面的代码,准备将锁交给其他线程了void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
... 前面是释放锁的代码
// 当前线程不在需要这个锁, 准备将所有权交给其他线程
// 这个线程可能是因为非公平特性抢占的
// 也可能是从等待队列唤醒的
for (;;) {
assert (THREAD == _owner, "invariant") ;
if (Knob_ExitPolicy == 0) {
// release semantics: prior loads and stores from within the critical section
// must not float (reorder) past the following store that drops the lock.
// On SPARC that requires MEMBAR #loadstore|#storestore.
// But of course in TSO #loadstore|#storestore is not required.
// I'd like to write one of the following:
// A. OrderAccess::release() ; _owner = NULL
// B. OrderAccess::loadstore(); OrderAccess::storestore(); _owner = NULL;
// Unfortunately OrderAccess::release() and OrderAccess::loadstore() both
// store into a _dummy variable. That store is not needed, but can result
// in massive wasteful coherency traffic on classic SMP systems.
// Instead, I use release_store(), which is implemented as just a simple
// ST on x64, x86 and SPARC.
// 非公平策略,
// 给刚进入还没有入队等待过的线程一次抢占的机会, 而不是唤醒线程
// 先释放锁(owner置为NULL), 这个时候有其他线程进入会获得锁(然后owner就不是NULL了)
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ; // See if we need to wake a successor
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
// 没有等待的线程(cxq和EntryList都为空)或者已经有假定继承人
// 说明此时锁有其他线程准备占有了, 这个时候应该是被刚进来的线程获取了锁, 而不是队列中parking的线程
// 返回
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
// Normally the exiting thread is responsible for ensuring succession,
// but if other successors are ready or other entering threads are spinning
// then this thread can simply store NULL into _owner and exit without
// waking a successor. The existence of spinners or ready successors
// guarantees proper succession (liveness). Responsibility passes to the
// ready or running successors. The exiting thread delegates the duty.
// More precisely, if a successor already exists this thread is absolved
// of the responsibility of waking (unparking) one.
//
// The _succ variable is critical to reducing futile wakeup frequency.
// _succ identifies the "heir presumptive" thread that has been made
// ready (unparked) but that has not yet run. We need only one such
// successor thread to guarantee progress.
// See http://www.usenix.org/events/jvm01/full_papers/dice/dice.pdf
// section 3.3 "Futile Wakeup Throttling" for details.
//
// Note that spinners in Enter() also set _succ non-null.
// In the current implementation spinners opportunistically set
// _succ so that exiting threads might avoid waking a successor.
// Another less appealing alternative would be for the exiting thread
// to drop the lock and then spin briefly to see if a spinner managed
// to acquire the lock. If so, the exiting thread could exit
// immediately without waking a successor, otherwise the exiting
// thread would need to dequeue and wake a successor.
// (Note that we'd need to make the post-drop spin short, but no
// shorter than the worst-case round-trip cache-line migration time.
// The dropped lock needs to become visible to the spinner, and then
// the acquisition of the lock by the spinner must become visible to
// the exiting thread).
//
// It appears that an heir-presumptive (successor) must be made ready.
// Only the current lock owner can manipulate the EntryList or
// drain _cxq, so we need to reacquire the lock. If we fail
// to reacquire the lock the responsibility for ensuring succession
// falls to the new owner.
// 再次占有锁, 如果失败, 说明锁被占用, 返回
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
// 如果占有锁成功, 会从队列中找一个线程唤醒
TEVENT (Exit - Reacquired) ;
} else {
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
// 如果EntryList和cxq都为空(说明没有准备获取锁的阻塞线程), 或者有假定的继承人
// 就放弃锁
OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock
OrderAccess::storeload() ;
// Ratify the previously observed values.
if (_cxq == NULL || _succ != NULL) {
// 如果此时cxq没有等待线程或者存在假定继承人, 返回
TEVENT (Inflated exit - simple egress) ;
return ;
}
// inopportune interleaving -- the exiting thread (this thread)
// in the fast-exit path raced an entering thread in the slow-enter
// path.
// We have two choices:
// A. Try to reacquire the lock.
// If the CAS() fails return immediately, otherwise
// we either restart/rerun the exit operation, or simply
// fall-through into the code below which wakes a successor.
// B. If the elements forming the EntryList|cxq are TSM
// we could simply unpark() the lead thread and return
// without having set _succ.
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
// 尝试获取锁, 如果锁被其他线程占了, 返回
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
guarantee (_owner == THREAD, "invariant") ;
ObjectWaiter * w = NULL ;
int QMode = Knob_QMode ;
if (QMode == 2 && _cxq != NULL) {
// QMode == 2 : cxq has precedence over EntryList.
// Try to directly wake a successor from the cxq.
// If successful, the successor will need to unlink itself from cxq.
// 策略2:
// cxq优先级高于EntryList, 如果cxq队列不为空, 直接从cxq中取出一个waiter唤醒
// 如果唤醒成功, 在外面需要将成功的waiter从cxq中出队
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
// Aggressively drain cxq into EntryList at the first opportunity.
// This policy ensure that recently-run threads live at the head of EntryList.
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap (&cxq, NULL)
// 策略3: 将cxq插入到EntryList尾部
// 取出cxq头节点, 同时将cxq队列清空
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
// 将取出的cxq队列变成双向链表
// 之前只有_next有值, 现在给_prev赋值
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
// 状态由TS_CXQ改为TS_ENTER
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Append the RATs to the EntryList
// TODO: organize EntryList as a CDLL so we can locate the tail in constant-time.
// 获得EntryList的尾节点
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
// 将cxq插入到EntryList尾部
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
// Fall thru into code that tries to wake a successor from EntryList
}
if (QMode == 4 && _cxq != NULL) {
// 策略4: 将cxq插入到EntryList的头部
// Aggressively drain cxq into EntryList at the first opportunity.
// This policy ensure that recently-run threads live at the head of EntryList.
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap (&cxq, NULL)
// 取出cxq头节点, 然后清空老队列
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
// 找出cxq的尾节点
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
// 状态由TS_CXQ改为TS_ENTER
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
// Prepend the RATs to the EntryList
// EntryList插入到cxq尾部
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
// Fall thru into code that tries to wake a successor from EntryList
}
// 注意, 如果是策略3和策略4, 这里应该是cxq和EntryList都不为空
w = _EntryList ;
if (w != NULL) {
// 如果EntryList不为空
// I'd like to write: guarantee (w->_thread != Self).
// But in practice an exiting thread may find itself on the EntryList.
// Lets say thread T1 calls O.wait(). Wait() enqueues T1 on O's waitset and
// then calls exit(). Exit release the lock by setting O._owner to NULL.
// Lets say T1 then stalls. T2 acquires O and calls O.notify(). The
// notify() operation moves T1 from O's waitset to O's EntryList. T2 then
// release the lock "O". T2 resumes immediately after the ST of null into
// _owner, above. T2 notices that the EntryList is populated, so it
// reacquires the lock and then finds itself on the EntryList.
// Given all that, we have to tolerate the circumstance where "w" is
// associated with Self.
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
// 唤醒EntryList队首元素
ExitEpilog (Self, w) ;
return ;
}
// 走到这里说明EntryList为空
// If we find that both _cxq and EntryList are null then just
// re-run the exit protocol from the top.
// 如果cxq也为空, 再循环一次
w = _cxq ;
if (w == NULL) continue ;
// 走到这里说明cxq不为空
// Drain _cxq into EntryList - bulk transfer.
// First, detach _cxq.
// The following loop is tantamount to: w = swap (&cxq, NULL)
// 获取cxq头节点,
// 然后cxq清空, 因为cxq中的数据会被移到EntryList中
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
// 根据上面的代码逻辑, 走到这里说明, EntryList为空, cxq不为空的情况
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
// Convert the LIFO SLL anchored by _cxq into a DLL.
// The list reorganization step operates in O(LENGTH(w)) time.
// It's critical that this step operate quickly as
// "Self" still holds the outer-lock, restricting parallelism
// and effectively lengthening the critical section.
// Invariant: s chases t chases u.
// TODO-FIXME: consider changing EntryList from a DLL to a CDLL so
// we have faster access to the tail.
if (QMode == 1) {
// 策略1: 将cxq中的元素反转后插入EntryList中, 这会导致后入队的线程先获取到锁
// QMode == 1 : drain cxq to EntryList, reversing order
// We also reverse the order of the list.
// 倒置cxq, 构建一个新的双向链表
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
// 修改状态为TS_ENTER
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
// EntryList变为倒置后的cxq
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// 将cxq中的元素移到EntryList中
// QMode == 0 or QMode == 2 这行注释不对, 只有策略0会走到这里, 策略2会直接返回
// QMode == 0 or QMode == 2
// 修改状态为TS_ENTER
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
// 修改状态为TS_ENTER
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
// SLL变DLL
p->_prev = q ;
q = p ;
}
}
// In 1-0 mode we need: ST EntryList; MEMBAR #storestore; ST _owner = NULL
// The MEMBAR is satisfied by the release_store() operation in ExitEpilog().
// See if we can abdicate to a spinner instead of waking a thread.
// A primary goal of the implementation is to reduce the
// context-switch rate.
// _succ不为NULL说明已经有假定的继承人, 所以不需要再唤醒新的线程, 以减少竞争
if (_succ != NULL) continue;
// 唤醒EntryList队首元素
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
会先尝试释放一次,让刚进来的线程抢占以下,如果被抢占了,当前线程成功移交所有权。如果没被抢占,就会从cxq或者EntryList中找一个,不同的策略会决定cxq和EntryList的先后顺序,以及是先头部还是先尾部。默认策略是0,对应的是先EntryList头部,再cxq头部。
实际的唤醒操作在下面这个方法
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
assert (_owner == Self, "invariant") ;
// Exit protocol:
// 1. ST _succ = wakee
// 2. membar #loadstore|#storestore;
// 2. ST _owner = NULL
// 3. unpark(wakee)
// 设置假定继承人为wakee线程
_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
ParkEvent * Trigger = Wakee->_event ;
// Hygiene -- once we've set _owner = NULL we can't safely dereference Wakee again.
// The thread associated with Wakee may have grabbed the lock and "Wakee" may be
// out-of-scope (non-extant).
Wakee = NULL ;
// Drop the lock
// 内存屏障#loadstore|#storestore;
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::fence() ; // ST _owner vs LD in unpark()
if (SafepointSynchronize::do_call_back()) {
TEVENT (unpark before SAFEPOINT) ;
}
// 唤醒选定的线程
DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);
Trigger->unpark() ;
// Maintain stats and report events to JVMTI
if (ObjectMonitor::_sync_Parks != NULL) {
ObjectMonitor::_sync_Parks->inc() ;
}
}
代码很简单,不解释了。这里最后会恢复挂起的线程,恢复后不保证一定能运行用户代码,还要获取锁,因为synchronized是非公平的,不保证一定能获取到。unpark后继续看EnterI的代码
void ATTR ObjectMonitor::EnterI (TRAPS) {
... 省略入队代码
for (;;) {
... 省略挂起代码
// 被唤醒后继续获取锁, 如果成功直接退出
if (TryLock(Self) > 0) break ;
// The lock is still contested.
// Keep a tally of the # of futile wakeups.
// Note that the counter is not protected by a lock or updated by atomics.
// That is by design - we trade "lossy" counters which are exposed to
// races during updates for a lower probe effect.
// 走到这里说明被唤醒后获取锁失败, 记录下
TEVENT (Inflated enter - Futile wakeup) ;
if (ObjectMonitor::_sync_FutileWakeups != NULL) {
ObjectMonitor::_sync_FutileWakeups->inc() ;
}
++ nWakeups ;
// Assuming this is not a spurious wakeup we'll normally find _succ == Self.
// We can defer clearing _succ until after the spin completes
// TrySpin() must tolerate being called with _succ == Self.
// Try yet another round of adaptive spinning.
// 尝试自旋获取锁
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
// We can find that we were unpark()ed and redesignated _succ while
// we were spinning. That's harmless. If we iterate and call park(),
// park() will consume the event and return immediately and we'll
// just spin again. This pattern can repeat, leaving _succ to simply
// spin on a CPU. Enable Knob_ResetEvent to clear pending unparks().
// Alternately, we can sample fired() here, and if set, forgo spinning
// in the next iteration.
if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
Self->_ParkEvent->reset() ;
OrderAccess::fence() ;
}
// 如果还是失败, 放弃假定继承人
if (_succ == Self) _succ = NULL ;
// Invariant: after clearing _succ a thread *must* retry _owner before parking.
OrderAccess::fence() ;
}
// 走到这里应该获取到锁了
// Egress :
// Self has acquired the lock -- Unlink Self from the cxq or EntryList.
// Normally we'll find Self on the EntryList .
// From the perspective of the lock owner (this thread), the
// EntryList is stable and cxq is prepend-only.
// The head of cxq is volatile but the interior is stable.
// In addition, Self.TState is stable.
// 入队(cxq/EntryList)后, 获取到了锁, 接下来从cxq/EntryList中出队
// 正常情况下, 这个时候应该在EntryList中
assert (_owner == Self , "invariant") ;
assert (object() != NULL , "invariant") ;
// I'd like to write:
// guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
// but as we're at a safepoint that's not safe.
// 出队(从_cxq或_EntryList中移除)
UnlinkAfterAcquire (Self, &node) ;
if (_succ == Self) _succ = NULL ;
... 省略内存屏障相关代码
return ;
}
unpark后会继续TryLock获取锁,如果成功,就从cxq/EntryList中移除,否则在for循环中再次park自己,等待下次unpark。
最后释放LockRecord,这个顺序很重要,因为释放流程不保证一定能释放成功,得在操作ObjectMonitor成功后,才能释放LockRecord
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
...
elem->set_obj(NULL);
...
IRT_END
Object#wait方法是个native方法
public class Object {
...
public final native void wait(long timeout) throws InterruptedException;
...
}
找一下入口:
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
...
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
主逻辑在
void ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
if (UseBiasedLocking) {
// 撤销偏向锁
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
...
// 膨胀为重量级锁
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD, obj());
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
// wait主逻辑
monitor->wait(millis, true, THREAD);
/* This dummy call is in place to get around dtrace bug 6254741. Once
that's fixed we can uncomment the following line and remove the call */
// DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
dtrace_waited_probe(monitor, obj, THREAD);
}
无论如何先变成重量级锁,所以wait是重量级锁才有的操作,也看出来ObjectMonitor这个类是有很重的操作逻辑的。
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
... 省略检查逻辑和事件逻辑
assert (Self->_Stalled == 0, "invariant") ;
Self->_Stalled = intptr_t(this) ;
jt->set_current_waiting_monitor(this);
// create a node to be put into the queue
// Critically, after we reset() the event but prior to park(), we must check
// for a pending interrupt.
ObjectWaiter node(Self);
// 修改waiter状态为TS_WAIT
node.TState = ObjectWaiter::TS_WAIT ;
Self->_ParkEvent->reset() ;
// 内存屏障
OrderAccess::fence(); // ST into Event; membar ; LD interrupted-flag
// Enter the waiting queue, which is a circular doubly linked list in this case
// but it could be a priority queue or any data structure.
// _WaitSetLock protects the wait queue. Normally the wait queue is accessed only
// by the the owner of the monitor *except* in the case where park()
// returns because of a timeout of interrupt. Contention is exceptionally rare
// so we use a simple spin-lock instead of a heavier-weight blocking lock.
// 对进入WaitSet的操作加锁
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 进入WaitSet
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
if ((SyncFlags & 4) == 0) {
_Responsible = NULL ;
}
intptr_t save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
// 释放锁资源
exit (true, Self) ; // exit the monitor
guarantee (_owner != Self, "invariant") ;
// The thread is on the WaitSet list - now park() it.
// On MP systems it's conceivable that a brief spin before we park
// could be profitable.
//
// TODO-FIXME: change the following logic to a loop of the form
// while (!timeout && !interrupted && _notified == 0) park()
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
// 设置操作系统线程状态
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
ThreadBlockInVM tbivm(jt);
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
// 挂起自己
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {
// Intentionally empty
} else
if (node._notified == 0) {
if (millis <= 0) {
Self->_ParkEvent->park () ;
} else {
ret = Self->_ParkEvent->park (millis) ;
}
}
...
}
wait操作前半段做了这件事:
后半段放在被唤醒时候说
notify的代码分为两个部分,一部分是notify本身的逻辑,另一部分是,wait()中unpark后的逻辑
和上面一样找到入口
JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle))
JVMWrapper("JVM_MonitorNotify");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
ObjectSynchronizer::notify(obj, CHECK);
JVM_END
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
// 撤销偏向锁
// 撤销时, 如果依然活跃会升级为轻量级锁
if (UseBiasedLocking) {
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
markOop mark = obj->mark();
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
// 能走到这里说明当前线程就是活跃的, 不用notify了
return;
}
// 否则膨胀后notify
ObjectSynchronizer::inflate(THREAD, obj())->notify(THREAD);
}
如果当前线程不是活跃的,膨胀为重量级锁,然后notify
void ObjectMonitor::notify(TRAPS) {
CHECK_OWNER();
if (_WaitSet == NULL) {
TEVENT (Empty-Notify) ;
return ;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
int Policy = Knob_MoveNotifyee ;
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
// 从WaitSet队列中获取一个waiter
ObjectWaiter * iterator = DequeueWaiter() ;
if (iterator != NULL) {
TEVENT (Notify1 - Transfer) ;
guarantee (iterator->TState == ObjectWaiter::TS_WAIT, "invariant") ;
guarantee (iterator->_notified == 0, "invariant") ;
if (Policy != 4) {
iterator->TState = ObjectWaiter::TS_ENTER ;
}
iterator->_notified = 1 ;
Thread * Self = THREAD;
iterator->_notifier_tid = Self->osthread()->thread_id();
ObjectWaiter * List = _EntryList ;
if (List != NULL) {
assert (List->_prev == NULL, "invariant") ;
assert (List->TState == ObjectWaiter::TS_ENTER, "invariant") ;
assert (List != iterator, "invariant") ;
}
if (Policy == 0) { // prepend to EntryList
// 头插到EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
List->_prev = iterator ;
iterator->_next = List ;
iterator->_prev = NULL ;
_EntryList = iterator ;
}
} else
if (Policy == 1) { // append to EntryList
// 尾插到EntryList
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
// CONSIDER: finding the tail currently requires a linear-time walk of
// the EntryList. We can make tail access constant-time by converting to
// a CDLL instead of using our current DLL.
ObjectWaiter * Tail ;
for (Tail = List ; Tail->_next != NULL ; Tail = Tail->_next) ;
assert (Tail != NULL && Tail->_next == NULL, "invariant") ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
}
} else
if (Policy == 2) { // prepend to cxq
// prepend to cxq
// 头插到cxq, 默认
if (List == NULL) {
iterator->_next = iterator->_prev = NULL ;
_EntryList = iterator ;
} else {
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Front = _cxq ;
iterator->_next = Front ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
break ;
}
}
}
} else
if (Policy == 3) { // append to cxq
// 尾插到cxq
iterator->TState = ObjectWaiter::TS_CXQ ;
for (;;) {
ObjectWaiter * Tail ;
Tail = _cxq ;
if (Tail == NULL) {
iterator->_next = NULL ;
if (Atomic::cmpxchg_ptr (iterator, &_cxq, NULL) == NULL) {
break ;
}
} else {
while (Tail->_next != NULL) Tail = Tail->_next ;
Tail->_next = iterator ;
iterator->_prev = Tail ;
iterator->_next = NULL ;
break ;
}
}
} else {
// 取WaitSet第一个节点唤醒
ParkEvent * ev = iterator->_event ;
iterator->TState = ObjectWaiter::TS_RUN ;
OrderAccess::fence() ;
ev->unpark() ;
}
if (Policy < 4) {
iterator->wait_reenter_begin(this);
}
// _WaitSetLock protects the wait queue, not the EntryList. We could
// move the add-to-EntryList operation, above, outside the critical section
// protected by _WaitSetLock. In practice that's not useful. With the
// exception of wait() timeouts and interrupts the monitor owner
// is the only thread that grabs _WaitSetLock. There's almost no contention
// on _WaitSetLock so it's not profitable to reduce the length of the
// critical section.
}
Thread::SpinRelease (&_WaitSetLock) ;
if (iterator != NULL && ObjectMonitor::_sync_Notifications != NULL) {
ObjectMonitor::_sync_Notifications->inc() ;
}
}
取出WaitSet的头节点,然后根据操作,notify的策略有4个:
0: 将头节点插入到EntryList头部
1: 将头节点插入到EntryList尾部
2: 将头节点插入到cxq头部,默认选项
3: 将头节点插入到cxq尾部
兜底策略: 直接唤醒(unpark)头节点
可以发现,除了兜底策略,其他策略都是插入到cxq或者EntryList中,而不是直接唤醒,所以notify了不能保证立即唤醒线程。
要注意的一点是,这里跑的并不是用户的应用程序,或者说是临界区的代码,而是jvm的代码,要跑应用程序代码得先获取锁。
假设notify后,线程从cxq/EntryList或者直接被unpark了,继续看wait中unpark后的代码:
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
... 省略进入WaitSet和释放资源的代码
int ret = OS_OK ;
int WasNotified = 0 ;
{ // State transition wrappers
// 设置操作系统线程状态
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);
{
... 省略挂起代码
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
...
// 从WaitSet中出队
if (node.TState == ObjectWaiter::TS_WAIT) {
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter (&node) ; // unlink from WaitSet
assert(node._notified == 0, "invariant");
node.TState = ObjectWaiter::TS_RUN ;
}
Thread::SpinRelease (&_WaitSetLock) ;
}
// The thread is now either on off-list (TS_RUN),
// on the EntryList (TS_ENTER), or on the cxq (TS_CXQ).
// The Node's TState variable is stable from the perspective of this thread.
// No other threads will asynchronously modify TState.
guarantee (node.TState != ObjectWaiter::TS_WAIT, "invariant") ;
OrderAccess::loadload() ;
// 不允许notify后的自己称为假定的候选人,
// 因为后面需要走一遍enter的流程,
// 这段时间的空闲对应用程序来说是浪费, 所以要给别的线程机会去抢占锁资源
if (_succ == Self) _succ = NULL ;
WasNotified = node._notified ;
// 重新获取锁
// Reentry phase -- reacquire the monitor.
// re-enter contended monitor after object.wait().
// retain OBJECT_WAIT state until re-enter successfully completes
// Thread state is thread_in_vm and oop access is again safe,
// although the raw address of the object may have changed.
// (Don't cache naked oops over safepoints, of course).
// post monitor waited event. Note that this is past-tense, we are done waiting.
if (JvmtiExport::should_post_monitor_waited()) {
// 难道是之前刚在notify中被唤醒就又被park了?
JvmtiExport::post_monitor_waited(jt, this, ret == OS_TIMEOUT);
if (node._notified != 0 && _succ == Self) {
// In this part of the monitor wait-notify-reenter protocol it
// is possible (and normal) for another thread to do a fastpath
// monitor enter-exit while this thread is still trying to get
// to the reenter portion of the protocol.
//
// The ObjectMonitor was notified and the current thread is
// the successor which also means that an unpark() has already
// been done. The JVMTI_EVENT_MONITOR_WAITED event handler can
// consume the unpark() that was done when the successor was
// set because the same ParkEvent is shared between Java
// monitors and JVM/TI RawMonitors (for now).
//
// We redo the unpark() to ensure forward progress, i.e., we
// don't want all pending threads hanging (parked) with none
// entering the unlocked monitor.
node._event->unpark();
}
}
if (event.should_commit()) {
post_monitor_wait_event(&event, node._notifier_tid, millis, ret == OS_TIMEOUT);
}
OrderAccess::fence() ;
assert (Self->_Stalled != 0, "invariant") ;
Self->_Stalled = 0 ;
assert (_owner != Self, "invariant") ;
ObjectWaiter::TStates v = node.TState ;
// 重新获取锁
if (v == ObjectWaiter::TS_RUN) {
enter (Self) ;
} else {
guarantee (v == ObjectWaiter::TS_ENTER || v == ObjectWaiter::TS_CXQ, "invariant") ;
ReenterI (Self, &node) ;
node.wait_reenter_end(this);
}
// 获取锁成功
// Self has reacquired the lock.
// Lifecycle - the node representing Self must not appear on any queues.
// Node is about to go out-of-scope, but even if it were immortal we wouldn't
// want residual elements associated with this thread left on any lists.
guarantee (node.TState == ObjectWaiter::TS_RUN, "invariant") ;
assert (_owner == Self, "invariant") ;
assert (_succ != Self , "invariant") ;
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
guarantee (_recursions == 0, "invariant") ;
_recursions = save; // restore the old recursion count
_waiters--; // decrement the number of waiters
// Verify a few postconditions
assert (_owner == Self , "invariant") ;
assert (_succ != Self , "invariant") ;
assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
if (SyncFlags & 32) {
OrderAccess::fence() ;
}
// check if the notification happened
if (!WasNotified) {
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {
TEVENT (Wait - throw IEX from epilog) ;
THROW(vmSymbols::java_lang_InterruptedException());
}
}
// NOTE: Spurious wake up will be consider as timeout.
// Monitor notify has precedence over thread interrupt.
}
wait()中unpark后会从WaitSet中出队,然后重新enter,尝试获取锁,这一操作不一定成功。
由此可以看出,唤醒和执行用户程序是两回事,执行用户程序需要唤醒+获取到锁才行。
通过上面的代码可以看出,膨胀为重量级锁后就一直是重量级锁了,没发降级了?
从omFreeList中获取ObjectMonitor到omInUseList上就没见它在用完后还过,真不用还吗?
其实是又降级的,降级后会变成无锁状态,对应的OjbectMonitor也会从omInUseList回到omFreeList。
代码入口是 ObjectSynchronizer::deflate_idle_monitors
,原理很简单:
在安全点时,遍历所有在用的ObjectMonitor:栈私有的omInUseList和全局的gOmInUseList总的所有ObjectMonitor,选择未被使用的锁作为降级对象。
降级只在safepoint处发生,也就是STW时,这个也很好理解。
void SafepointSynchronize::begin() {
...
// Call stuff that needs to be run when a safepoint is just about to be completed
do_cleanup_tasks();
...
}
// Various cleaning tasks that should be done periodically at safepoints
void SafepointSynchronize::do_cleanup_tasks() {
{
TraceTime t1("deflating idle monitors", TraceSafepointCleanupTime);
ObjectSynchronizer::deflate_idle_monitors();
}
...
}
只分析MonitorInUseLists为true的情况,逻辑很简单,过下源码就知道了
void ObjectSynchronizer::deflate_idle_monitors() {
...
if (MonitorInUseLists) {
int inUse = 0;
// 遍历所有线程的omInUseList
for (JavaThread* cur = Threads::first(); cur != NULL; cur = cur->next()) {
nInCirculation+= cur->omInUseCount;
int deflatedcount = walk_monitor_list(cur->omInUseList_addr(), &FreeHead, &FreeTail);
cur->omInUseCount-= deflatedcount;
// verifyInUse(cur);
nScavenged += deflatedcount;
nInuse += cur->omInUseCount;
}
// For moribund threads, scan gOmInUseList
// 遍历gOmInUseList
if (gOmInUseList) {
nInCirculation += gOmInUseCount;
int deflatedcount = walk_monitor_list((ObjectMonitor **)&gOmInUseList, &FreeHead, &FreeTail);
gOmInUseCount-= deflatedcount;
nScavenged += deflatedcount;
nInuse += gOmInUseCount;
}
} ...
int ObjectSynchronizer::walk_monitor_list(ObjectMonitor** listheadp,
ObjectMonitor** FreeHeadp, ObjectMonitor** FreeTailp) {
ObjectMonitor* mid;
ObjectMonitor* next;
ObjectMonitor* curmidinuse = NULL; // 遍历过程中最新一个正在使用的节点
int deflatedcount = 0;
for (mid = *listheadp; mid != NULL; ) {
oop obj = (oop) mid->object();
bool deflated = false;
if (obj != NULL) {
deflated = deflate_monitor(mid, obj, FreeHeadp, FreeTailp);
}
if (deflated) {
// extract from per-thread in-use-list
// 如果降级了, 从xxInUseList中去除当前节点
if (mid == *listheadp) {
// 如果去除的是头部节点, 更改头部节点位置为降级节点的下一个节点
*listheadp = mid->FreeNext;
} else if (curmidinuse != NULL) {
// 如果不是头部节点, 并且存在正在使用的节点, 将降级节点的下一节点接到它后面
curmidinuse->FreeNext = mid->FreeNext; // maintain the current thread inuselist
}
next = mid->FreeNext;
mid->FreeNext = NULL; // This mid is current tail in the FreeHead list
mid = next;
deflatedcount++;
} else {
// 如果没降级, 则不去除当前节点, 继续遍历下一个
curmidinuse = mid;
mid = mid->FreeNext;
}
}
return deflatedcount;
}
// Deflate a single monitor if not in use
// Return true if deflated, false if in use
bool ObjectSynchronizer::deflate_monitor(ObjectMonitor* mid, oop obj,
ObjectMonitor** FreeHeadp, ObjectMonitor** FreeTailp) {
bool deflated;
// Normal case ... The monitor is associated with obj.
guarantee (obj->mark() == markOopDesc::encode(mid), "invariant") ;
guarantee (mid == obj->mark()->monitor(), "invariant");
guarantee (mid->header()->is_neutral(), "invariant");
// 如果owner, cxq, EntryList, WaitSet有一个不为空都认为busy
if (mid->is_busy()) {
if (ClearResponsibleAtSTW) mid->_Responsible = NULL ;
deflated = false;
} else {
// Deflate the monitor if it is no longer being used
// It's idle - scavenge and return to the global free list
// plain old deflation ...
TEVENT (deflate_idle_monitors - scavenge1) ;
if (TraceMonitorInflation) {
if (obj->is_instance()) {
ResourceMark rm;
tty->print_cr("Deflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(void *) obj, (intptr_t) obj->mark(), obj->klass()->external_name());
}
}
// Restore the header back to obj
// 还原MarkWord为无锁状态
obj->release_set_mark(mid->header());
// 清除owner和关联object
mid->clear();
assert (mid->object() == NULL, "invariant") ;
// Move the object to the working free list defined by FreeHead,FreeTail.
// 将当前节点移到xxFreeList中
if (*FreeHeadp == NULL) *FreeHeadp = mid;
if (*FreeTailp != NULL) {
ObjectMonitor * prevtail = *FreeTailp;
assert(prevtail->FreeNext == NULL, "cleaned up deflated?"); // TODO KK
prevtail->FreeNext = mid;
}
*FreeTailp = mid;
deflated = true;
}
return deflated;
}
重量级锁的实现和之前的偏向锁和轻量级锁都不同,而且也用到了LcokRecord,不过没有像轻量级锁那样重用它。用得比较多的还是ObjectMonitor,基于ObjectMonitor的数据结构,实现了Java中对象的重量级加锁、解锁、wait、notify、notifyAll和降级为无锁的操作。
上面说了,在入口处,无论当前是什么锁,都会从栈上获取一个LockRecord,然后将持有者(obj)设置为当前对象。所以偏向锁是LockRecord+MarkWord共同描述的:
支持偏向锁的对象的MarkWord的状态,在对象刚实例化时,就会是上面那样一个状态:线程ID+epoch+分代年龄+偏向锁标志+锁标志位,其中的线程ID是0,这样的偏向锁状态称为匿名偏向(anonymously biased)。
即使允许偏向锁,偏向锁也不会在jvm启动时立即生效,jvm认为在刚启动的时候竞争是激烈的,所以偏向锁的实际生效时间是由延迟的。可以通过BiasedLockingStartupDelay来设置,设置为0表示立即生效。
偏向锁第一次获取是在入口处,这段代码紧接着上面入口的代码。
...
// implies UseBiasedLocking
if (mark->has_bias_pattern()) {
// 如果MarkWord是偏向锁(后三位101)
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
// 偏向锁状态的MarkWord结构: 线程ID+epoch+age+偏向锁标志+锁标志位
// 锁标志位在无锁和偏向锁情况下都是01
anticipated_bias_locking_value =
// klass->prototype_header(), 包含epoch和偏向锁标志位, 拼上当前线程ID
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident)
// 和对象的MarkWord异或, 不想等的位会是1
^ (uintptr_t)mark)
// 比较结果中age的几位置为0
& ~((uintptr_t) markOopDesc::age_mask_in_place);
// 上面抹掉了age, 锁标志位在无锁和偏向锁情况下都是01, 所以相等
// 如果不一样, 可能不同的地方有: 线程ID, epoch, 偏向锁标志位, 以下分情况处理
if (anticipated_bias_locking_value == 0) {
// 说明完全一样, 偏向的是当前的线程, 并且epoch和clazz中的epoch相同
// 什么都不做
// already biased towards this thread, nothing to do
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
...
}
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
// epoch不想等, 说明mark中的epoch过期(发生过批量重偏向),
// epoch过期可以看作没有偏向任何线程, 可以重偏向
// try rebias
// 尝试重偏向
// 构造一个偏向当前线程的MarkWord, 会设置最新的epoch
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
// 用新的MarkWord重新偏向
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
// 重偏向失败, 说明已经被其他线程提前改了, 存在竞争, 进行膨胀
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
// try to bias towards thread in case object is anonymously biased
// 如果这个时候最新的MarkWord是匿名偏向锁, 尝试指向自己, 这也是非公平锁的体现
// 根据原MarkWord一个匿名偏向锁状态的MarkWord,
// 保留了原mark的锁标志位+偏向锁标志+age+epoch
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
// 设置hashcode
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
// 偏向当前线程
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
// debugging hint
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
// 设置MarkWord
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
// 设置成功, 说明设置前的是匿名偏向锁, 通过CAS改成了偏向为自己的偏向锁
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
else {
// 重置失败, 说明当前已经偏向其他线程, 存在竞争, 进入膨胀流程
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
....
如果当前对象的MarkWord是偏向锁状态,那么会走偏向锁加锁流程,会检查线程ID、epoch、obj类型的偏向锁标志位,决定加锁是否成功,成功加锁的情况后以下几种:
上面MarkWord的更新操作都是CAS,存在失败的可能。
可以看到,如果只是加偏向锁,效率还是非常高的,就是从栈上获取已经分配好的对象,以及CAS操作。
如果当前obj类型已经不支持偏向锁了(说明发生过批量撤销或者一开始就不支持),或者CAS操作失败,说明存在竞争,偏向锁不再适用了,后面继续走后面更复杂的加锁流程。
...
CASE(_monitorexit): {
// synchronized(obj)的obj
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
// derefing's lockee ought to provoke implicit null check
// find our monitor slot
BasicObjectLock* limit = istate->monitor_base();
// limit of expression stack
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
// 按enter时相同的顺序遍历LockRecord, 可以达到栈的效果
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
// 找到当前对象持有的LockRecord
BasicLock* lock = most_recent->lock();
// 从LockRecord中取出刚开始的header
markOop header = lock->displaced_header();
// 解除关联, 偏向锁和重入的轻量级锁只需要这一步就够了
most_recent->set_obj(NULL);
...
// 只需要找到一个就退出, 对于重入的情况肯定是后获取的锁先释放
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
// 下一个LockRecord
most_recent++;
}
...
偏向锁的释放很简单,按monitorenter时相同的顺序遍历LockRecord,同一个对象后入的地址比先入的地址低,所以遍历到的第一个LockRecord就是当前monitorenter时获取的LockRecord。这个顺序对偏向锁来说不是很重要,反过来遍历也无所谓。但是这个地方的代码时偏向锁和轻量级锁公用的,对轻量级锁来说,顺序就很重要了,这个在轻量级锁解锁时会细说。这也是偏向锁和轻量级锁可重入特性的实现,没有额外的重入计数。
还有一点不知道大家发现没,没有将MarkWord中的线程ID置为0。从这点也能看出偏向锁的使用场景就是同一个线程操作的场景,就算这个线程退出了,其他线程进来了也没发获取到偏向锁。
撤销的意思就是从偏向锁状态变成无锁状态。
其实这里无锁这个描述并不准确,准确的说是不支持偏向锁了,不过大家都这么叫,我也这么叫吧,不纠结这个,大家清楚无锁是怎么回事就行。
这段代码和加锁代码有锁重合,但是关注点不同,这段代码的关注点在CAS失败和撤销。在下面这段代码中第一次尝试撤销偏向锁
...
if (entry != NULL) {
...
// implies UseBiasedLocking
if (mark->has_bias_pattern()) {
...
if (anticipated_bias_locking_value == 0) {
...
}
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// 偏向锁标志位不同, 而刚才的mark通过了偏向锁检查, 所以是klass中的偏向锁模式关闭
// 没法继续使用偏向锁了, 需要撤销当前的偏向锁, 然后升级为轻量级锁或者重量级锁
// try revoke bias
// 尝试撤销偏向锁
// 此时的lockee->klass()->prototype_header()应该是无锁状态的MarkWord
markOop header = lockee->klass()->prototype_header();
// 重置MarkWord, 如果有hash, 带上hash
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
// 修改成功, 下面可能会直接升级为轻量级锁
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
// 后面的代码会继续走轻量级锁或者重量级锁的获取逻辑
}
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
...
// 用新的MarkWord重新偏向
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
...
}
else {
// 重偏向失败, 说明已经被其他线程提前改了, 存在竞争, 进行膨胀
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
else {
...
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
...
}
else {
// 重置失败, 说明当前已经偏向其他线程, 存在竞争, 进入膨胀流程
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
// traditional lightweight locking
// 处理非偏向锁的情况
if (!success) {
// 准备使用轻量级锁或者重量级锁
// 用当前的MarkWord构造一个无锁的MarkWord作为DisplacedHeader
markOop displaced = lockee->mark()->set_unlocked();
// 保存到LockRecord中, 用于解锁时MarkWord的还原
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
// 如果参数强制指定使用重量级锁,
// 或者CAS将MarkWord从无锁状态设置为轻量级锁失败
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
// 判断是否轻量级锁重入
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
// 轻量级锁重入时, 重入的锁的DisplacedMarkWord为NULL
entry->lock()->set_displaced_header(NULL);
} else {
// 膨胀
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
...
如果加锁过程中CAS操作失败,或者obj类型中的偏向锁标志关闭了,都说明当前不再是偏向锁适用的场景了,需要从轻量级锁开始获取。
为什么加轻量级锁或重量级锁需要先撤销偏向锁?观察下MarkWord中偏向锁和无锁状态的结构
在无锁的结构中,有25bit用于存放hashCode,而偏向锁结构的MarkWord是没有地方存放hashCode的,那不就成了锁和hashCode不可兼得了?对偏向锁确实是这样,那么轻量级锁和重量级锁怎么解决这个问题的?大家可以发现对于轻量级锁和重量级锁的MarkWord而言,指向了一个对象,可以将无锁状态的MarkWord直接放到指向的对象中去,从而让锁和hashCode共存。所以在加轻量级锁和重量级锁之前,需要先将MarkWord恢复成无锁状态,然后再保存到指向的对象里面去。
从这段分析也能知道,计算hashCode也会造成偏向锁的撤销,后面的代码中会经常碰到这个场景。
回到代码继续分析,在这段代码中,obj类型的偏向锁标志关闭了,会第一次尝试撤销偏向锁,
...
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// 偏向锁标志位不同, 而刚才的mark通过了偏向锁检查, 所以是klass中的偏向锁模式关闭
// 没法继续使用偏向锁了, 需要撤销当前的偏向锁, 然后升级为轻量级锁或者重量级锁
// try revoke bias
// 尝试撤销偏向锁
// 此时的lockee->klass()->prototype_header()应该是无锁状态的MarkWord
markOop header = lockee->klass()->prototype_header();
// 重置MarkWord, 如果有hash, 带上hash
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
// 修改成功, 下面可能会直接升级为轻量级锁
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
// 后面的代码会继续走轻量级锁或者重量级锁的获取逻辑
}
...
如果撤销成功,下面可能直接就加上轻量级锁了
...
if (!success) {
// 准备使用轻量级锁或者重量级锁
// 用当前的MarkWord构造一个无锁的MarkWord作为DisplacedHeader
markOop displaced = lockee->mark()->set_unlocked();
// 保存到LockRecord中, 用于解锁时MarkWord的还原
entry->lock()->set_displaced_header(displaced);
...
// 如果参数强制指定使用重量级锁,
// 或者CAS将MarkWord从无锁状态设置为轻量级锁失败
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
...
如果失败,或者强制使用重量级锁,都会进入到 CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception)
看下这个方法
...
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
...
// 如果允许偏向锁
if (UseBiasedLocking) {
// 加偏向锁
// Retry fast entry if bias is revoked to avoid unnecessary inflation
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
} else {
// 加轻量级锁
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
...
IRT_END
...
代码很简单,如果允许偏向锁,会走下快速通道(fast_enter),在撤销的同时再尝试重偏向,这个当中的加锁还是很高效的;否则就走slow_enter,这个效率就不太高了。
撤销的逻辑在fast_enter中,咱们跟进去
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
// 走到这里肯定是之前获取偏向锁失败了,
// 如果没有出现批量重偏向, 后面肯定得撤销偏向锁, 从轻量级锁开始加起了
if (UseBiasedLocking) {
// 是否在安全点
if (!SafepointSynchronize::is_at_safepoint()) {
// 撤销然后重偏向
// 重偏向只有一种情况下能成功, 发生了批量重偏向, 其他情况都是撤销或者升级为轻量级锁
// 如果当前对象偏向锁没有偏向, 会偏向当前线程, 然后退出
// 否则,
// 如果当前对象偏向的线程已经死亡,
// 会变成匿名偏向锁(attempt_rebias为true)/无锁状态(attempt_rebias为false)
// 如果当前对象偏向的线程依然存活, 会将偏向锁升级为轻量级锁, 但是不改变偏向
// 因为升级为轻量级锁后需要改变偏向说明存在竞争, 存在竞争就得膨为重量级锁
// 这个方法中的部分逻辑是可以不在安全点执行的, 因为这部分逻辑完全是在单线程范围内的, 不存在并发问题
// 对于存在并发问题的部分, 还是会等待到安全点再执行
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
// 如果重偏向成功, 流程结束
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
// 这段代码是在安全点执行的, 逻辑和revoke_and_rebias中需要在安全点执行的代码一样
// 这个当中是不可能重偏向的只会撤销或者升级为轻量级锁
BiasedLocking::revoke_at_safepoint(obj);
}
// 这行代码很明显, 到这里偏向锁肯定被撤销了
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
// 很显然到这里, 偏向锁肯定被撤销了
// 加轻量级锁
slow_enter (obj, lock, THREAD) ;
}
注释写得挺清楚的了。如果fast_enter中没有获取到偏向锁,会走到slow_enter,这个我们以后看。先继续关注撤销,继续看 BiasedLocking::revoke_and_rebias
,不过只看前半段和后半段的部分,其他部分和批量撤销批量重偏向有关,后面细说
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// 不需要在安全点被调用
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
// step1 高效的撤消
// We can revoke the biases of anonymously-biased objects
// efficiently enough that we should not cause these revocations to
// update the heuristics because doing so may cause unwanted bulk
// revocations (which are expensive) to occur.
// 获取MarkWord
markOop mark = obj->mark();
if (mark->is_biased_anonymously() && !attempt_rebias) {
// step1.1 匿名偏向锁, 并且不需要重偏向, 那直接撤销就好
// 这里的匿名重偏向是在那里设置的?
// 匿名偏向在两个地方会设置:
// 1. 这个对象刚new出来, 如果其类型是允许偏向的, 会设置成匿名偏向状态
// 2. 经历了批量重偏向, 批量重偏向中会先置为匿名偏向, 然后再偏向当前线程, 是在safepoint做的
// 按理说这两种情况都不会走到这里, 所以我也不知道什么情况会走到这里了, 可能好似其他地方会用到吧
// We are probably trying to revoke the bias of this object due to
// an identity hash code computation. Try to revoke the bias
// without a safepoint. This is possible if we can successfully
// compare-and-exchange an unbiased header into the mark word of
// the object, meaning that no other thread has raced to acquire
// the bias of the object.
// 有匿名偏向锁(线程ID为0的偏向锁)并且attempt_rebias==false
//
// 计算hashcode会走到这里,
// 如果当前是匿名偏向锁, 说明线程ID和epoch没用, 可以用来存放hashCode
// 所以要将偏向锁标志位先置为0
//
// 撤销偏向锁, 变为无锁状态
markOop biased_value = mark;
// new一个无锁状态的MarkWord
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
// CAS替换
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
// 撤销成功
return BIAS_REVOKED;
}
} else if (mark->has_bias_pattern()) {
// step1.2 偏向锁的情况
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
if (!prototype_header->has_bias_pattern()) {
// step1.2.1 obj的klass不支持偏向锁的情况, 说明经历了批量重撤销,
// 而且批量撤销时当前对象没有被改掉, 当前对象的偏向状态已经没用了, 那直接撤销就好
// This object has a stale bias from before the bulk revocation
// for this data type occurred. It's pointless to update the
// heuristics at this point so simply update the header with a
// CAS. If we fail this race, the object's bias has been revoked
// by another thread so we simply return and let the caller deal
// with it.
// clazz关闭了偏向锁的情况, 和外层情况类似, 外层也做过一次CAS, 应该是失败了进这里的, 因为成功了mark就不是偏向锁了
// 为什么mark中是偏向锁而class不是偏向锁了呢? 因为发生了批量撤销, 修改了class中偏向锁标志, 修改时这个对象应该没被使用, 所以没被改
// 所以这里直接CAS设置下header, 但不关心CAS的结果,
// 因为如果CAS失败, 肯定是其他线程在操作, 在其他线程中应该会被撤销或者升级
// 这里是不用考虑重偏向的, 因为都不支持偏向锁了, 还重偏向什么, 直接撤销, 后面会走slow_enter逻辑加锁
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
} else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
// step1.2.2 epoch过期的情况, 说明经历了批量重偏向, 而这个对象当时没被使用, epoch没更新
// 这说明之前发生过批量重偏向, 更新了epoch, 此时需要重偏向直接CAS就解决, 很高效
// The epoch of this biasing has expired indicating that the
// object is effectively unbiased. Depending on whether we need
// to rebias or revoke the bias of this object we can do it
// efficiently enough with a CAS that we shouldn't update the
// heuristics. This is normally done in the assembly code but we
// can reach this point due to various points in the runtime
// needing to revoke biases.
// 既然epoch过期, 说明MarkWord无效, 说明当前可以随便改
if (attempt_rebias) {
// step1.2.2.1 如果需要重偏向, 就重偏向当前线程
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
// new一个重偏向当前线程的MarkWord
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
// CAS设置成功, 返回偏向锁撤销并重偏向
return BIAS_REVOKED_AND_REBIASED;
}
} else {
// step1.2.2.2 否则直接撤销, 变为无锁状态
markOop biased_value = mark;
// new一个偏向锁标志位为0, 锁标志位为01, hashCode为空, 年龄为mark->age()的MarkWord
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
// CAS替换
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
// CAS设置成功, 返回撤销成功
return BIAS_REVOKED;
}
}
}
}
// 可以发现,
// 上面的情况都是明确不支持偏向锁,
// 或者可以修改偏向的状态(匿名偏向锁, epoch过期)
// 没有并发问题, 只需要一个CAS, 很高效
// 如果走到这里说明上面CAS可能失败了,
...
...
} else if (heuristics == HR_SINGLE_REVOKE) {
// 单个撤销
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
// 只有当前线程在用这个偏向锁
// A thread is trying to revoke the bias of an object biased
// toward it, again likely due to an identity hash code
// computation. We can again avoid a safepoint in this case
// since we are only going to walk our own stack. There are no
// races with revocations occurring in other threads because we
// reach no safepoints in the revocation path.
// Also check the epoch because even if threads match, another thread
// can come in with a CAS to steal the bias of an object that has a
// stale epoch.
// 撤销当前线程的偏向锁, 判断时除了判断线程外, 还判断了epoch是否过期,
// 因为epoch过期, 说明发生过批量重偏向, MarkWord中的线程ID是个无效值, 等于没有
// 如果都是true就说明当前锁是一个在用的锁, 并且被当前线程持有
//
// 计算hashcode的时候会走到这里,
// 而且这个时候发现没法将hashCode存在MarkWord中了(只有在匿名偏向的情况下可以存到MarkWord中, 上面处理了这种情况)
// 需要先膨胀, 再找地方存hashCode
//
// 这里的撤销不用等到safepoint的, 因为只需要遍历当前线程的栈即可, 不需要等待所有线程都到达safepoint
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
// revoke_bias中, 可能会升级为轻量级锁(obj依然在使用), 也可能撤消(无锁)
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} ...
...
}
逻辑看注释,还是挺细的。
前面那部分的逻辑和入口处加偏向锁的逻辑很像,效率很高。为什么入口处判断过了这还要判断,难道还会有不一样的结果吗?还真会,这个方法不在安全点执行,所以在这在这期间是可能发生批量重偏向和批量撤销的,在批量重偏向中会更新obj和obj类型的epoch,将obj的MarkWord变为匿名重偏向,批量撤销中会改变obj类型中向锁标志位。所以这里会再尝试一次之前的操作。
注意,在这段逻辑中,如果碰到了epoch过期的情况,也会直接加偏向锁然后结束。又多了一处加锁成功的地方,因为和之前的加锁串不起来,就放这里提一下了。
关于网传的偏向锁撤销时要等到安全点这个点多说几句。先说结论,偏向锁撤销不一定要等到安全点。比如上的BiasedLocking::revoke_and_rebias
就不是。只有在存在竞争的情况下,才需要等待其他线程到安全点,等安全点很低效,会尽量避免,对于没有竞争的场景,当然就先尝试执行下,如果成功就不用等待安全点了。
上面代码的最后是对当前线程做处理,不涉及到其他线程,所以也不用等到安全点的,看下在这里面调用的 revoke_bias
方法
// 撤销偏向锁
// 如果allow_rebias==false, 最后要么膨胀为轻量级锁, 要么变为无锁状态
// 如果allow_rebias==true, 貌似只有在批量重偏向中对当前对象重偏向可能是true, 最后要么膨胀为轻量级锁, 要么是匿名偏向状态(为后面重偏向做准备)
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread) {
markOop mark = obj->mark();
if (!mark->has_bias_pattern()) {
// 不是偏向锁状态, 不用处理
if (TraceBiasedLocking) {
ResourceMark rm;
tty->print_cr(" (Skipping revocation of object of type %s because it's no longer biased)",
obj->klass()->external_name());
}
return BiasedLocking::NOT_BIASED;
}
uint age = mark->age();
// 匿名偏向锁(101, 且线程ID为0)
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
// 无锁(001)
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
ResourceMark rm;
tty->print_cr("Revoking bias of object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s , prototype header " INTPTR_FORMAT " , allow rebias %d , requesting thread " INTPTR_FORMAT,
p2i((void *)obj), (intptr_t) mark, obj->klass()->external_name(), (intptr_t) obj->klass()->prototype_header(), (allow_rebias ? 1 : 0), (intptr_t) requesting_thread);
}
// 偏向的线程
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 匿名偏向锁的情况,
// Object is anonymously biased. We can get here if, for
// example, we revoke the bias due to an identity hash code
// being computed for an object.
// 比如计算hashcode会走到这里
if (!allow_rebias) {
// 如果不允许重偏向, MarkWord变为无锁(001)
// 批量重偏向时, allow_rebias==true
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of anonymously-biased object");
}
// 返回偏向锁撤销
return BiasedLocking::BIAS_REVOKED;
}
// Handle case where the thread toward which the object was biased has exited
// 处理偏向的线程已经退出的情况
// 判断线程是否存活, 以下两种情况认为存活:
// 1. 偏向线程就是当前请求的线程
// 2. 偏向线程存在于JVM活跃线程列表中, 需要遍历所有jvm线程
bool thread_is_alive = false;
if (requesting_thread == biased_thread) {
thread_is_alive = true;
} else {
for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
if (!thread_is_alive) {
// 偏向的线程死亡的情况
if (allow_rebias) {
// 如果需要重偏向, 设置为匿名偏向状态, 外面可以自己设置为偏向xx线程的
obj->set_mark(biased_prototype);
} else {
// 否则, 设置为无锁状态
obj->set_mark(unbiased_prototype);
}
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of object biased toward dead thread");
}
return BiasedLocking::BIAS_REVOKED;
}
// Thread owning bias is alive.
// Check to see whether it currently owns the lock and, if so,
// write down the needed displaced headers to the thread's stack.
// Otherwise, restore the object's header either to the unlocked
// or unbiased state.
// 偏向的线程依然存活的情况
// 获取偏向线程所有LockRecord(不仅仅是当前对象的, 是所有), 按时间倒序
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
// 遍历所有LockRecord
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
if (mon_info->owner() == obj) {
// LockRecord的持有者是当前对象
// 说明能找到, 代码块肯定没退出
// 此时只撤销是不行的了, 因为这个锁还要用, 并且原先的MarkWord还需要保留,
// 需要升级为轻量级锁了
if (TraceBiasedLocking && Verbose) {
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",
p2i((void *) mon_info->owner()),
p2i((void *) obj));
}
// Assume recursive case and fix up highest lock later
// 升级为轻量级锁, 且displaced_header为NULL, 表示重入的轻量级锁
// 为什么是重入的轻量级锁?
// 因为所有轻量级锁中, 只有第一个进来的displaced_header不为NULL,
// 因为是按时间倒序的, 最先进来的LockRecord最后处理,
// 等到循环退出时, highest_lock就是最先的LockRecord, 到时候再将它的NULL改掉就可以了(这里剧透了)
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
// 从这里也可以看出来, 只有还在使用的LockRecord才有可能膨胀为轻量级锁
} else {
if (TraceBiasedLocking && Verbose) {
tty->print_cr(" mon_info->owner (" PTR_FORMAT ") != obj (" PTR_FORMAT ")",
p2i((void *) mon_info->owner()),
p2i((void *) obj));
}
}
}
// 处理最先的LockRecord
if (highest_lock != NULL) {
// highest_lock存在, 说明代码块没有退出
// Fix up highest lock to contain displaced header and point
// object at it
// 设置第一个LockRecord的DisplacedMarkWord为无锁状态(01)
highest_lock->set_displaced_header(unbiased_prototype);
// Reset object header to point to displaced mark.
// Must release storing the lock address for platforms without TSO
// ordering (e.g. ppc).
// 让obj的mark指向这个LockRecord
obj->release_set_mark(markOopDesc::encode(highest_lock));
assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-locked object");
}
// 到这里所有偏向锁都变成了轻量级锁
// 也就是说, 如果当前锁还在使用, 并且有另一个线程想要加锁, 这个锁就会升级为轻量级锁
} else {
// 走到这里说明highest_lock没找到, 也就是synchronized(obj)偏向的线程的代码块退出了
if (TraceBiasedLocking && (Verbose || !is_bulk)) {
tty->print_cr(" Revoked bias of currently-unlocked object");
}
if (allow_rebias) {
// 允许重偏向, MarkWord设置为匿名偏向状态
obj->set_mark(biased_prototype);
} else {
// Store the unlocked value into the object's header.
// 否则MarkWord设置为无锁状态
obj->set_mark(unbiased_prototype);
}
}
return BiasedLocking::BIAS_REVOKED;
}
在这段代码中,会判断当前obj是否持有锁,不持有的情况有两种:
这两种情况都表示obj的偏向锁没被使用,根据是否尝试重偏向参数决定MarkWord设置为匿名偏向锁还是无锁。
如果还在使用,会升级为轻量级锁。
上面的撤销流程已经提到了批量重偏向和批量撤销了,现在专门看下这个流程。
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
// 不需要在安全点被调用
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
...
// 如果走到这里说明上面CAS可能失败了,
//
// 或者偏向锁正偏向其他线程, 并且偏向的还不是当前线程,
// 但偏向的线程不一定存活, 也就是说这个锁不一定被使用
// 由此可见偏向锁的条件有多苛刻, 它只适用于同一个线程反复进入临界区的场景,
// 就算这个线程退出了, 偏向的线程ID都不会改, 其他线程都不能直接用
//
// 这两种情况都会认为偏向锁不再适用, 需要撤销或者升级
// 先思考下撤销和升级时需要做什么
// 首先, 得找到对象持有的所有LockRecord, 然后修改,
// 如果加锁的不是当前线程, 说明这是并发场景, 还得先等待加锁线程到安全点再操作
// 这两个都是有成本的, 特别是等待安全点
//
// 如果重偏向或者撤销的次数多了, 开销很大
// 于是就有了批量重偏向和批量撤销:
// 当在一段时间内的撤销次数达到一定数量
// 如果达到批量撤销的阈值(默认40),
// 干脆直接不让用偏向锁了, 直接把这个对象类型的对象的锁全撤销为无锁, 如果还在用就升级为轻量级锁
// 如果达到批量重偏向的阈值(默认20),
// 会遍历所有线程找到所有在用的LockRecord, 修改其epoch, 然后尝试重偏向当前线程,
// 其实批量重偏向过程中, 真正可能立即偏向的只有请求线程, 还只是可能, 并不是字面意思那样将大量锁偏向到了一个线程
// 大部分工作只是更新了存活对象的epoch, 变相地将没标记的置为失效了, 给重偏向提高可能
// 这样下次进入时, epoch失效的就可以当作偏向锁使用了.
// 对经常撤销的偏向锁来说, 批量撤销和批量重偏向可以减少等待safepoint的次数, 提高效率
// 批量重偏向和撤销逻辑
// 先判断下怎么撤销或者重定向, 主要是批量重偏向和批量撤销的判断
// update_heuristics会对撤销次数进行统计, 并给出后续的处理意见
// 为什么前面的没有统计?
// 因为前面偏向锁没用到(匿名偏向)或者失效(epoch过期),
// 如果没有竞争, 替换成本就一个CAS, 可以忽略不计
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
// 不是偏向锁, 不用撤销
return NOT_BIASED;
} else if (heuristics == HR_SINGLE_REVOKE) {
// 单个撤销
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
if (mark->biased_locker() == THREAD &&
prototype_header->bias_epoch() == mark->bias_epoch()) {
// 只有当前线程在用这个偏向锁
// A thread is trying to revoke the bias of an object biased
// toward it, again likely due to an identity hash code
// computation. We can again avoid a safepoint in this case
// since we are only going to walk our own stack. There are no
// races with revocations occurring in other threads because we
// reach no safepoints in the revocation path.
// Also check the epoch because even if threads match, another thread
// can come in with a CAS to steal the bias of an object that has a
// stale epoch.
// 撤销当前线程的偏向锁, 判断时除了判断线程外, 还判断了epoch是否过期,
// 因为epoch过期, 说明发生过批量重偏向, MarkWord中的线程ID是个无效值, 等于没有
// 如果都是true就说明当前锁是一个在用的锁, 并且被当前线程持有
//
// 计算hashcode的时候会走到这里,
// 而且这个时候发现没法将hashCode存在MarkWord中了(只有在匿名偏向的情况下可以存到MarkWord中, 上面处理了这种情况)
// 需要先膨胀, 再找地方存hashCode
//
// 这里的撤销不用等到safepoint的, 因为只需要遍历当前线程的栈即可, 不需要等待所有线程都到达safepoint
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
// revoke_bias中, 可能会升级为轻量级锁(obj依然在使用), 也可能撤消(无锁)
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
return cond;
} else {
// 走到这里说明当前偏向锁偏向的不是当前线程,
// 或者当前偏向锁的MarkWord过期了,
// 在存在并发时会这样,
//
// 所以, 这两种情况在当前线程中都不能直接对偏向锁做修改,
// 需要等到一个稳定的状态再处理才保险, 也就是安全点
//
// 以下是通过VM线程去执行的
// 等待到安全点, 然后调用revoke_bias
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
return revoke.status_code();
}
}
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
// 以下是通过VM线程执行的, 会等到安全点再执行
// 批量撤销和批量重偏向的逻辑: bulk_revoke_or_rebias_at_safepoint
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
return bulk_revoke.status_code();
}
注释对批量重偏向和批量撤销的原因说得很细,不赘述了。
看下批量重偏向和批量撤销的的判断逻辑
// 更新撤销计数, 判断是否需要批量重定向或批量撤消
static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
markOop mark = o->mark();
if (!mark->has_bias_pattern()) {
// 如果不是偏向锁, 直接返回
return HR_NOT_BIASED;
}
// Heuristics to attempt to throttle the number of revocations.
// Stages:
// 1. Revoke the biases of all objects in the heap of this type,
// but allow rebiasing of those objects if unlocked.
// 2. Revoke the biases of all objects in the heap of this type
// and don't allow rebiasing of these objects. Disable
// allocation of objects of that type with the bias bit set.
// 锁对象的类, 批量撤销是以类为单位的
Klass* k = o->klass();
// 当前时间
jlong cur_time = os::javaTimeMillis();
// clazz上次批量撤销的时间
jlong last_bulk_revocation_time = k->last_biased_lock_bulk_revocation_time();
// clazz偏向锁撤销次数
int revocation_count = k->biased_lock_revocation_count();
// BiasedLockingBulkRebiasThreshold是批量重偏向阈值
// BiasedLockingBulkRevokeThreshold是批量撤销阈值
// BiasedLockingDecayTime 偏向锁统计的衰退时间
if ((revocation_count >= BiasedLockingBulkRebiasThreshold) &&
(revocation_count < BiasedLockingBulkRevokeThreshold) &&
(last_bulk_revocation_time != 0) &&
(cur_time - last_bulk_revocation_time >= BiasedLockingDecayTime)) {
// 上面两个值的统计不是一直累加, 而是以BiasedLockingDecayTime为周期的
// 如果发现撤销数量达到批量重定向阈值, 还没到批量撤销阈值, 但是已经超过BiasedLockingDecayTime了, 会将撤销次数清零
// This is the first revocation we've seen in a while of an
// object of this type since the last time we performed a bulk
// rebiasing operation. The application is allocating objects in
// bulk which are biased toward a thread and then handing them
// off to another thread. We can cope with this allocation
// pattern via the bulk rebiasing mechanism so we reset the
// klass's revocation count rather than allow it to increase
// monotonically. If we see the need to perform another bulk
// rebias operation later, we will, and if subsequently we see
// many more revocation operations in a short period of time we
// will completely disable biasing for this type.
k->set_biased_lock_revocation_count(0);
revocation_count = 0;
}
// Make revocation count saturate just beyond BiasedLockingBulkRevokeThreshold
// 撤销放次数+1
if (revocation_count <= BiasedLockingBulkRevokeThreshold) {
revocation_count = k->atomic_incr_biased_lock_revocation_count();
}
if (revocation_count == BiasedLockingBulkRevokeThreshold) {
// 达到批量撤销阈值(40), 结果为批量撤销
return HR_BULK_REVOKE;
}
if (revocation_count == BiasedLockingBulkRebiasThreshold) {
// 达到批量重定向阈值(20), 结果批量重偏向
return HR_BULK_REBIAS;
}
// 否则, 单个撤销就可以了
return HR_SINGLE_REVOKE;
}
继续看批量重偏向和批量撤销的主逻辑 bulk_revoke
方法
class VM_BulkRevokeBias : public VM_RevokeBias {
private:
bool _bulk_rebias;
bool _attempt_rebias_of_object;
public:
VM_BulkRevokeBias(Handle* obj, JavaThread* requesting_thread,
bool bulk_rebias,
bool attempt_rebias_of_object)
: VM_RevokeBias(obj, requesting_thread)
, _bulk_rebias(bulk_rebias)
, _attempt_rebias_of_object(attempt_rebias_of_object) {}
virtual VMOp_Type type() const { return VMOp_BulkRevokeBias; }
virtual bool doit_prologue() { return true; }
virtual void doit() {
_status_code = bulk_revoke_or_rebias_at_safepoint((*_obj)(), _bulk_rebias, _attempt_rebias_of_object, _requesting_thread);
clean_up_cached_monitor_info();
}
};
实际逻辑在
// 批量撤销或者重偏向逻辑
static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
bool bulk_rebias,
bool attempt_rebias_of_object,
JavaThread* requesting_thread) {
// 必须在safepoint执行
assert(SafepointSynchronize::is_at_safepoint(), "must be done at safepoint");
if (TraceBiasedLocking) {
tty->print_cr("* Beginning bulk revocation (kind == %s) because of object "
INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s",
(bulk_rebias ? "rebias" : "revoke"),
p2i((void *) o), (intptr_t) o->mark(), o->klass()->external_name());
}
// 设置偏向锁批量撤销时间为当前时间
jlong cur_time = os::javaTimeMillis();
o->klass()->set_last_biased_lock_bulk_revocation_time(cur_time);
Klass* k_o = o->klass();
Klass* klass = k_o;
if (bulk_rebias) {
// 批量重偏向
// Use the epoch in the klass of the object to implicitly revoke
// all biases of objects of this data type and force them to be
// reacquired. However, we also need to walk the stacks of all
// threads and update the headers of lightweight locked objects
// with biases to have the current epoch.
// 使用obj的klass中的epoch来隐式地撤销所有这个类型的锁, 然后强制他们重新获取.
// 这一过程中也会遍历所有线程的所有栈, 更新轻量级锁的LockRecord中的obj为当前epoch
// If the prototype header doesn't have the bias pattern, don't
// try to update the epoch -- assume another VM operation came in
// and reset the header to the unbiased state, which will
// implicitly cause all existing biases to be revoked
// 如果klass不支持偏向锁, 不要更新epoch.
// 因为这说明在另一个VM线程中将klass中的偏向锁标志置为0了, 置为0的同时已经批量撤销过了
if (klass->prototype_header()->has_bias_pattern()) {
int prev_epoch = klass->prototype_header()->bias_epoch();
// klass中的epoch自增, epoch只有2位哦, 总共就4个值...
klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
int cur_epoch = klass->prototype_header()->bias_epoch();
// Now walk all threads' stacks and adjust epochs of any biased
// and locked objects of this data type we encounter
// 遍历所有线程的栈, 更新偏向锁的LockRecord的epoch
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
// 找到当前对象类型在栈中有偏向锁的对象
// 更新他们的epoch,
// 但不会重偏向, 因为这些锁正在使用中, 不能直接重偏向了
// 从这里也能看出, 只会处理正在使用的LockRecord,
// 如果不在使用, 所以每次monitorenter入口进来后都需要和klass的epoch比较, 看是否过期
// We might have encountered this object already in the case of recursive locking
assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
owner->set_mark(mark->set_bias_epoch(cur_epoch));
}
}
}
}
// At this point we're done. All we have to do is potentially
// adjust the header of the given object to revoke its bias.
// 当前obj的重偏向操作, 变为可重偏向的匿名重偏向状态
revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
} else {
// 批量撤销逻辑
if (TraceBiasedLocking) {
ResourceMark rm;
tty->print_cr("* Disabling biased locking for type %s", klass->external_name());
}
// Disable biased locking for this data type. Not only will this
// cause future instances to not be biased, but existing biased
// instances will notice that this implicitly caused their biases
// to be revoked.
// 设置klass的MarkWord为无锁的状态, 后面实例化的这个类型的对象的MarkWord都是无锁的
// 如果要加锁, 得从轻量级锁开始加
klass->set_prototype_header(markOopDesc::prototype());
// Now walk all threads' stacks and forcibly revoke the biases of
// any locked and biased objects of this data type we encounter.
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
// 找到该类型的偏向锁状态的对象
// 撤销偏向锁, 参数中第二个参数表示不允许重偏向
revoke_bias(owner, false, true, requesting_thread);
}
}
}
// Must force the bias of the passed object to be forcibly revoked
// as well to ensure guarantees to callers
// 撤销当前对象的偏向锁
revoke_bias(o, false, true, requesting_thread);
}
if (TraceBiasedLocking) {
tty->print_cr("* Ending bulk revocation");
}
BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;
// 如果需要重偏向, 再设置下obj的MarkWord, 偏向请求线程
// bulk_rebias==false, attempt_rebias_of_object==true
if (attempt_rebias_of_object &&
o->mark()->has_bias_pattern() &&
klass->prototype_header()->has_bias_pattern()) {
// 如果希望尝试重偏向, 并且当前支持偏向锁
// 构造一个偏向当前线程的偏向锁
markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
klass->prototype_header()->bias_epoch());
o->set_mark(new_mark);
status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
if (TraceBiasedLocking) {
tty->print_cr(" Rebiased object toward thread " INTPTR_FORMAT, (intptr_t) requesting_thread);
}
}
assert(!o->mark()->has_bias_pattern() ||
(attempt_rebias_of_object && (o->mark()->biased_locker() == requesting_thread)),
"bug in bulk bias revocation");
return status_code;
}
注释很细了,不赘述了。
总结下:
批量重偏向和批量撤销都是优化。
批量重偏会升级obj类型的epoch,遍历正在使用的LockRecord,修改同类型对象的epoch。这样epoch没更新的说明当时没在使用,后面如果需要可以偏向其他线程,完成了对不在使用的偏向锁的批量再利用。由于批量重偏向的存在,从长时间来看,在不同时刻偏向锁是有偏向多个线程的可能的。
批量撤销则是在判断偏向锁不适用时,会关闭obj对应类型的偏向锁标志,然后依次遍历正在使用的LockRecord批量撤销或者升级为轻量级锁。
升级的场景就是存在竞争,偏向锁不再适用的时候。操作就是数据结构的转换,具体可以看下偏向锁和轻量级锁的数据结构。
先将偏向锁撤销为无锁,再将无锁的MarkWord存到轻量级锁的LockRecord的displaced_header中,然后再让obj的MarkWord指向轻量级锁的LockRecord。
总的来说,偏向锁只适用于同一个线程反复进入的场景,也就是完全没有竞争的场景。
工作中需要对应用生成trace并上报到jaeger,我主要负责Java应用部分,我基于opentelemetry-java-instrumentation进行了二次开发,以满足定制化需求。
本文是开发过程中的一些笔记,供自己日后查阅。
我是基于1.10.1版本开发的,以下所有笔记都是以此版本为基础。
Trace需要以OpenTelemetry协议对很多框架和类库进行支持,并上报到后端。
opentelemetry-java-instrumentation包含了对大部分框架和类库的支持,并提供了无侵入的javaagent,所以大部分的trace可以通过接入javaagent实现。
对于一些定制化的需求,需要对其进行拓展。
拓展时建议先考虑手动trace的方式,手动trace满足不了或者成本较高时,可以考虑auto-instrumentation的方式,即修改opentelemetry-java-instrumentation。
下面分别介绍这两种方式。
最近再次看并发,对synchronized越看越好奇。大家都知道synchronized实现上有偏向锁、轻量级锁以及重量级锁,大致原因和膨胀逻辑也来自于网络。
我也是这么偷懒的,从网上搜集了些资料,不得不说,大家对膨胀的原因说得都很生动,就是很雷同,不过也确实是那么回事。
但这些资料都经不住几个为什么。比如MarkWord中的hashCode在锁膨胀后存哪里去了?偏向锁和轻量级锁那么像干嘛不直接合二为一,搞三套锁烦不烦?而且这些资料大多说了其然没说其所以然,有些甚至逻辑上是自相矛盾的。所以看完除了感觉好复杂好牛逼没啥用,对自己后面设计同步帮助不大。
于是我自己找了下hotspot的代码,分析了下。
这篇会从源码的角度看synchronized,以下内容仅是个人理解。感谢在这个过程中看过的各种资料的作者。也不打算走大家的老路了,反正看到我这篇的也肯定看过那些生动的故事了。
先找synchronzied的入口,从字节码入手。
先看下面这段代码,包含了synchronized的所有用法,对于重入的情况,大家自己脑补下吧。
public class SynchronizedTest {
private static final SynchronizedTest obj = new SynchronizedTest();
public static synchronized void syncStaticMethod() {
}
public static void syncInStaticMethod() {
synchronized (SynchronizedTest.class) {
}
}
public synchronized void syncInstanceMethod() {
}
public void syncInInstanceMethod() {
synchronized (obj) {
}
}
public static void main(String[] args) {
}
}
关键字节码如下:
public class cn.chenhenry.java.SynchronizedTest
...
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
...
{
...
public static synchronized void syncStaticMethod();
...
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
...
public static void syncInStaticMethod();
...
Code:
stack=2, locals=2, args_size=0
0: ldc #2 // class cn/chenhenry/java/SynchronizedTest
2: dup
3: astore_0
4: monitorenter
5: aload_0
6: monitorexit
7: goto 15
10: astore_1
11: aload_0
12: monitorexit
13: aload_1
14: athrow
15: return
...
public synchronized void syncInstanceMethod();
...
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
...
public void syncInInstanceMethod();
...
Code:
stack=2, locals=3, args_size=1
0: getstatic #3 // Field obj:Lcn/chenhenry/java/SynchronizedTest;
3: dup
4: astore_1
5: monitorenter
6: aload_1
7: monitorexit // 正常退出
8: goto 16
11: astore_2
12: aload_1
13: monitorexit // 异常退出
14: aload_2
15: athrow
16: return
...
...
...
}
....
可以看到在字节码层面,分两种:
对于代码块使用了monitorenter和monitorexit,而且会有两条monitorexit指令,一条是正常退出时走的,一条是异常退出时走的,用Java伪代码看就是和平时使用锁的姿势一样
monitorenter
try {
// do sth
} finally {
// monitorexit
}
对于方法加了ACC_SYNCHRONIZED的flag,据说也是用monitorenter和monitorexit实现的,这点我没有去找对应的代码,真的感兴趣大家可以留言,人数够多我去找下看看。
所以下面就以字节码指令monitorenter和monitorexit为入口,看下synchronized的原理
我看的是
HotSpot虚拟机,代码地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/
分析wait和notify时看了jdk代码,地址是:
http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/
不同版本会有不同,对jdk8而言,有些版本不支持偏向锁,不信你可以找其他版本和上面的monitorenter的入口对比看看。
入口其实有两个。虚拟机的解释器有两个,一个是字节码解释器,一个是模板解释器。模板解释器会将字节码翻译成汇编执行,既然我看源码的目的是了解原理,那么不费这个劲去看汇编了,直接看字节码解释器逻辑了。我猜他俩的实现差不多。
...
CASE(_monitorenter): {
// 获取对象
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
// 找到一个可用的LockRecord,
// 遍历栈中的LockRecord
// base of monitors on the native stack
BasicObjectLock* limit = istate->monitor_base();
// base of monitors on the native stack
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
// 从栈底向栈顶遍历
while (most_recent != limit ) {
// obj为NULL代表可用,
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break; // 说明锁重入了, 取刚才最近的一个为空的LockRecord
most_recent++;
}
// entry不为空, 说明存在可用的LockRecord
if (entry != NULL) {
// LockRecord指向当前对象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
// 获取MarkWord
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
...
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
// 说明LockRecord没获取到,
// 什么情况下获取不到, 跟本次无关, 先不看
istate->set_msg(more_monitors);
// 重新执行
UPDATE_PC_AND_RETURN(0); // Re-execute
}
}
...
上面的逻辑很简单:
LockRecord就是锁对象,来自于线程私有的栈上,每次进来都会从低地址向高地址遍历,获取一个可用的LockRecord。
对同一个对象的锁,重入时会保证分配顺序,先入获取的LockRecord的地址的地址比后入的地址高,这样对同一个对象的多个锁来说,就是一个栈的结构,保证后入先出。比如对obj2来说,第二次进入的的地址比第一次进入的地址低,释放时先释放第二次进入的锁,再释放第一次进入的锁。
不仅是轻量级锁,对于偏向锁,也会获取同款的LockRecord,只不过使用姿势不同。
synchronized中三种锁的实现都和MarkWord有关,MarkWord是对象头中的一部分,对象头中包含MarkWord和类型指针,jdk代码中对32位的头是这样描述的:
...
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// size:32 ------------------------------------------>| (CMS free block)
// PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
...
//
// [JavaThread* | epoch | age | 1 | 01] lock is biased toward given thread 偏向某个线程
// [0 | epoch | age | 1 | 01] lock is anonymously biased 匿名偏向
//
// - the two lock bits are used to describe three states: locked/unlocked and monitor.
//
// [ptr | 00] locked ptr points to real header on stack 轻量级锁
// [header | 0 | 01] unlocked regular object header 无锁
// [ptr | 10] monitor inflated lock (header is wapped out) 重量级锁
// [ptr | 11] marked used by markSweep to mark an object
...
通过对比源码可以发现,网上的理解和源码还是有出入的
LockRecord的使用得配合MarkWord一起,下图给出了三者的关系:
图中的MarkWord画的是32位的MarkWord,64位的情况类似。
和偏向锁类似,轻量级锁的实现也是基于LockRecord的,在HotSpot中叫stack-based lock,也就是基于栈的锁,很形象。基于栈的这个点在偏向锁中的体验尤为明细,栈结构和它的重入特性息息相关。
它的MarkWord是轻量级锁状态,MarkWord会指向LockRecord,LockRecord中会包含无锁的MarkWord,称为dispalced_header,用于轻量级锁释放时还原MarkWord为无锁状态。
图中有两个轻量级锁的LockRecord,一个是obj1的第一次加锁,另一个是obj1的重入。他俩的LockRecord的dispalced_header是不同的,具体细节在后面会说
入口处,如果obj的类型关闭了偏向锁标志,会在撤销偏向锁后,尝试加一次轻量级锁
...
CASE(_monitorenter): {
...
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
// 偏向锁标志位不同, 而刚才的mark通过了偏向锁检查, 所以是klass中的偏向锁模式关闭
// 没法继续使用偏向锁了, 需要撤销当前的偏向锁, 然后升级为轻量级锁或者重量级锁
// try revoke bias
// 尝试撤销偏向锁
// 此时的lockee->klass()->prototype_header()应该是无锁状态的MarkWord
markOop header = lockee->klass()->prototype_header();
// 重置MarkWord, 如果有hash, 带上hash
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
// 修改成功
// 这里修改为无锁的MarkWord可能是想提前修改, 后面少几次判断, 算是个优化吧
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
// 后面的代码会继续走轻量级锁或者重量级锁的获取逻辑
...
// traditional lightweight locking
// 处理非偏向锁的情况
if (!success) {
// 准备使用轻量级锁或者重量级锁
// 用当前的MarkWord构造一个无锁的MarkWord作为DisplacedHeader
markOop displaced = lockee->mark()->set_unlocked();
// 保存到LockRecord中, 用于解锁时MarkWord的还原
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
// 如果参数强制指定使用重量级锁,
// 或者CAS将MarkWord从无锁状态设置为轻量级锁失败
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// Is it simple recursive case?
// 判断是否轻量级锁重入
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
// 轻量级锁重入时, 重入的锁的DisplacedMarkWord为NULL
entry->lock()->set_displaced_header(NULL);
} else {
// 膨胀
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
...
在后面的 InterpreterRuntime::monitorenter
中不允许偏向锁以及 fast_enter
中加锁失败,也会进轻量级锁获取流程:
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
...
// 如果允许偏向锁
if (UseBiasedLocking) {
...
} else {
// 加轻量级锁
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),
"must be NULL or an object");
#ifdef ASSERT
thread->last_frame().interpreter_frame_verify_monitor(elem);
#endif
IRT_END
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
...
if (UseBiasedLocking) {
...
}
// 很显然到这里, 偏向锁肯定被撤销了
// 加轻量级锁
slow_enter (obj, lock, THREAD) ;
}
在进入轻量级锁加锁流程前,如果之前是偏向锁,会先撤销偏向锁,这个逻辑在偏向锁中已经说了。
接下来说下加轻量级锁的主逻辑
// 加轻量级锁
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
// 获取Mark Word
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
if (mark->is_neutral()) {
// 无锁(001)的情况
// Anticipate successful CAS -- the ST of the displaced mark must
// be visible <= the ST performed by the CAS.
// 尝试升级为轻量级锁
lock->set_displaced_header(mark);
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
// 升级成功, 结束
TEVENT (slow_enter: release stacklock) ;
return ;
}
// 如果失败, 说明存在竞争, 膨胀为重量级锁
// Fall through to inflate() ...
} else
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
// 轻量级锁重入的情况
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
// 重入的轻量级锁,LockRecord的displaced_header会被置为NULL
lock->set_displaced_header(NULL);
return;
}
// The object header will never be displaced to this lock,
// so it does not matter what the value is, except that it
// must be non-zero to avoid looking like a re-entrant lock,
// and must not look locked either.
// 以上都失败, 轻量级锁膨胀为重量级锁
//
// 标记MarkWord为unused_mark(0b011),
// 其实这里标记为11只是为了设置一个有别于轻量级锁的特殊值
// 如果没改, 后面没法判断是轻量级锁重入还是轻量级锁膨胀
lock->set_displaced_header(markOopDesc::unused_mark());
// 调用重量级锁方法
ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
逻辑很简单,能成功加偏向锁的情况有两种:
如果失败,会膨胀为重量级锁。由于这两部分代码和重量级锁的数据结构关系很大,所以放到重量级锁中分析。
在重入时有个细节需要注意,对于重入的轻量级锁displaced_header为NULL,也就是说,只有第一次进入时的轻量级锁的displaced_header为无锁的MarkWord,为什么?请看解锁流程。
这部分代码和偏向锁解锁重合度很高,这是因为偏向锁和轻量级锁是基于相同的数据结构实现的,
CASE(_monitorexit): {
...
// 按enter时相同的顺序遍历LockRecord, 可以达到栈的效果
while (most_recent != limit ) {
if ((most_recent)->obj() == lockee) {
// 找到当前对象持有的LockRecord
BasicLock* lock = most_recent->lock();
// 从LockRecord中取出刚开始的header
markOop header = lock->displaced_header();
// 解除关联, 偏向锁和重入的轻量级锁只需要这一步就够了
most_recent->set_obj(NULL);
if (!lockee->mark()->has_bias_pattern()) {
// 当前不是偏向锁的情况, 可能之前膨胀了
bool call_vm = UseHeavyMonitors;
// If it isn't recursive we either must swap old header or call the runtime
if (header != NULL || call_vm) {
// header != NULL说明不是轻量级锁的重入, call_vm==true说明是强制使用重量级锁
// 对于不是重入的轻量级锁, 从DisplacedMarkWord还原回去就行,
// 如果失败, 说明存在竞争, 可能在某个地方膨胀了,
// 需要走InterpreterRuntime::monitorexit
// 对于call_vm==true, 也就是强制走重量级锁的情况, 直接InterpreterRuntime::monitorexit
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
...
}
}
}
// 只需要找到一个就退出, 对于重入的情况肯定是后获取的锁先释放
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
// 下一个LockRecord
most_recent++;
}
和偏向锁一样,都会将LockRecord中的obj置为NULL,表示这个LockRecord和obj没关系了,这个很好理解。
对于轻量级锁这样还没完,还会取出LockRecord的dispalced_header判断,如果不为NULL,就会将这个header恢复到MarkWord中。因为对于重入的场景,第一次进入时获取的LockRecord的displaced_header的是无锁的MarkWord,后面进入的都是NULL,所以不为NULL表示这是第一次进入时的LockRecord,需要恢复回去,表示所有的锁都解锁了。这里就严格依赖栈结构,基于栈实现了重入特性。
轻量级锁的场景比偏向锁宽松点,可以允许多个线程交替进入,但是不允许有竞争。从MarkWord状态看就是不断在无锁和轻量级锁之间切换。
···c
CASE(_monitorenter) : {
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock *limit = istate->monitor_base();
BasicObjectLock *most_recent = (BasicObjectLock *)istate->stack_base();
BasicObjectLock *entry = NULL;
while (most_recent != limit) {
if (most_recent->obj() == NULL)
entry = most_recent;
else if (most_recent->obj() == lockee)
break;
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place =
(uintptr_t)markOopDesc::epoch_mask_in_place;
···
如上代码,首先会申请一个lock record,那么我想问一下 偏向所为什么需要Lock Record呢?
这种方式比较简单,javaagent的方式的最终实现也是手动的方式。
在使用这种方式前,假设都接入了javaagent,这样可以免去对OpenTelemetry的很多配置。
这种方式比较简单,没自己试过,具体可以参考Annotations | OpenTelemetry。
这种方式必须先使用javaagent。
这部分参考了Manual Instrumentation | OpenTelemetry
OpenTelemetry SDK为各个语言对OpenTelemetry API的实现,包含了对trace的所有配置,是整个trace实现的核心。
这种方式也建议和javaagent配合使用,这样可以直接使用javaagent中配置好的openTelemetry,获取方式非常简单
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
如果没有引入javaagent,或者非要自己配置,请参考https://opentelemetry.io/docs/instrumentation/java/manual/#set-up。
个人不推荐这样做,这意味放弃社区支持,完全自己实现所有类库的Instrumentation。
如果引入了javaagent,获取方式很简单:
Tracer tracer = GlobalOpenTelemetry.getTracer("instrumentation-library-name", "1.0.0"); // 版本可以不传
如果从手动创建的openTelemetry获取:
Tracer tracer = openTelemetry.getTracer("instrumentation-library-name", "1.0.0");
注意,入参不是Instrumented的类库的名称,是Instrumentation的名称,体现在jaeger中为:
@GetMapping("/simpleSpan")
public String simpleSpan() {
Span span1 = tracer.spanBuilder("span1").startSpan();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
span1.end();
}
Span span2 = tracer.spanBuilder("span2").startSpan();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
span2.end();
}
return Span.current().getSpanContext().getTraceId();
}
@GetMapping("/nestedSpan")
public String nestedSpan() {
Span parent = tracer.spanBuilder("parent").startSpan();
try (Scope scope = parent.makeCurrent()) {
Span child = tracer.spanBuilder("child").startSpan();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
child.end();
}
} finally {
parent.end();
}
return Span.current().getSpanContext().getTraceId();
}
@GetMapping("/spanWithAttributes")
public String spanWithAttributes() {
Span span = tracer.spanBuilder("span1").startSpan();
try {
span.setAttribute("my.key", "my.value");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
span.end();
}
return Span.current().getSpanContext().getTraceId();
}
@GetMapping("/spanWithEvent")
public String spanWithEvent() {
Span span = tracer.spanBuilder("span1").startSpan();
try {
span.addEvent("my.simple.event");
span.addEvent("my.complex.event", Attributes.of(
AttributeKey.stringKey("key"), "value",
AttributeKey.longKey("result"), 0L
));
} finally {
span.end();
}
return Span.current().getSpanContext().getTraceId();
}
@GetMapping("/spanWithStatus")
public String spanWithStatus() {
Span span = tracer.spanBuilder("span1").startSpan();
try {
// 默认是UNSET
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
return Span.current().getSpanContext().getTraceId();
}
@GetMapping("/spanWithException")
public String spanWithException() {
Span span = tracer.spanBuilder("span1").startSpan();
try {
throw new RuntimeException("my.exception");
} catch (Throwable e) {
span.setStatus(StatusCode.ERROR);
span.recordException(e);
} finally {
span.end();
}
return Span.current().getSpanContext().getTraceId();
}
可在多线程场景使用,表示两个span的关联关系,但不是那么强的父子关系,比如fork/join中。
@GetMapping("/spanWithLinks")
public String spanWithLinks() {
ConcurrentHashMap<String, String> resultsForNewThread = new ConcurrentHashMap<>(16);
ConcurrentHashMap<String, String> resultsForThreadPool = new ConcurrentHashMap<>(16);
Span parent = tracer.spanBuilder("parent").startSpan();
try {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
Span threadSpan = tracer.spanBuilder("new thread")
.addLink(parent.getSpanContext())
.startSpan();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadSpan.end();
}
resultsForNewThread.put(Thread.currentThread().getName(), threadSpan.getSpanContext().getTraceId());
}).start();
}
for (int i = 0; i < 3; i++) {
threadPool.submit(() -> {
Span threadSpan = tracer.spanBuilder("thread in thread pool")
.addLink(parent.getSpanContext())
.startSpan();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
threadSpan.end();
}
resultsForThreadPool.put(Thread.currentThread().getName(), threadSpan.getSpanContext().getTraceId());
});
}
} finally {
parent.end();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<String, Object> ret = new HashMap<>(4);
ret.put("parent", parent.getSpanContext().getTraceId());
ret.put("resultForNewThread", resultsForNewThread.values());
ret.put("resultForThreadPool", resultsForThreadPool.values());
return ret.toString();
}
结果为:
{resultForNewThread=[fbad0b6d99a8178d6544063b20b42669, 7ac1abcd94809c14611b0934011b79c2, 63bbe4d161c87d025543c0b2c8c8e7a5], parent=3d184af32208e23e75123a8f5c7a2ab2, resultForThreadPool=[3d184af32208e23e75123a8f5c7a2ab2, 3d184af32208e23e75123a8f5c7a2ab2, 3d184af32208e23e75123a8f5c7a2ab2]}
对于单独new出来的线程时看不到link的parent span的,在thread pool中的线程,能够看到这些线程的span和parent是在一起的,并且traceId相同,看来对线程池做了特别的改动。
CASE(_monitorenter) : {
oop lockee = STACK_OBJECT(-1);
// derefing's lockee ought to provoke implicit null check
CHECK_NULL(lockee);
// find a free monitor or one already allocated for this object
// if we find a matching object then we need a new monitor
// since this is recursive enter
BasicObjectLock *limit = istate->monitor_base();
BasicObjectLock *most_recent = (BasicObjectLock *)istate->stack_base();
BasicObjectLock *entry = NULL;
while (most_recent != limit) {
if (most_recent->obj() == NULL)
entry = most_recent;
else if (most_recent->obj() == lockee)
break;
most_recent++;
}
if (entry != NULL) {
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place =
(uintptr_t)markOopDesc::epoch_mask_in_place;
如上代码,首先会申请一个lock record,那么我想问一下 偏向所为什么需要Lock Record呢?
由于一些定制化需求,需要对opentelemetry-java- instrumentation二次开发,这个项目比较复杂,在开发之前,有必要对其实现有所了解。
OpenTelemetry Java Instrumentation实现主要由两个项目组成:opentelemetry-java和opentelemetry-java-instrumentation。
在二次开发时并不需要对opentelemetry-java进行修改,只修改opentelemetry-java-instrumentation项目即可。
instrumented library: 被trace的第三方库,比如springmvc
instrumentation: trace第三方库的具体逻辑的载体,比如trace springmvc的逻辑的module
auto-Instrumentation: 不侵入业务代码的trace方式,是一种抽象的概念,对Java而言,其实现为javaagent的方式
注意:
上面将instrument翻译为trace仅为当前场景下的翻译,并不适用于其他地方,
instrument有翻译成探针的,但我还是觉得不够准确,所以没用这个翻译,直接用了英文。
主要模块说明:
由于javaagent和app会使用相同类的不同版本而导致程序出错,所以项目中需要隔离javaagent和应用程序的类。
具体做法是:
版本隔离时为了隔离javaagent和app中相同的类,互不影响,这些类可以是同一版本,也可以是不同版本。这样app在引入类时就无需考虑和javaagent中的类的兼容问题。
版本隔离的手段是shaded,很简单,就是
将指定的类,即在opentelemetry-api-shaded-for-instrumenting和opentelemetry-ext-annotations-shaded-for-instrumenting指定的包中的类,
在打包时将类拷贝一份到指定的shaded目录下,即解压后的io/opentelemetry/javaagent/shaded目录:
✗ tree -d -L 3 io/opentelemetry/javaagent/shaded
io/opentelemetry/javaagent/shaded
├── instrumentation
│ └── api
│ ├── annotation
│ ├── annotations
│ ├── appender
│ ├── cache
│ ├── config
│ ├── db
│ ├── field
│ ├── instrumenter
│ ├── internal
│ ├── log
│ ├── server
│ ├── tracer
│ └── util
└── io
└── opentelemetry
├── api
├── context
└── semconv
同时会修改这些类被引用地方的绝对路径,比如我在OpenTelemetryAgent中加了行根据绝对路径加载GlobalOpenTelemetry的代码,在java文件和class文件中的路径是不同的:
字节码文件反编译后的绝对路径中变为了shaded后的路径
这样AppClassLoader在默认情况下是不可能加载到的,如果非要加载也是能加载到的
对于Instrumentation类,是不希望在app中使用的,所以打包时做了两件事:
这样的类只会被AgentClassLoader找到:
在创建AgentClassLoader时传入了inst目录的路径:
在找类时也会将.class重命名成.classdata
有些类是需要共享的,比如GlobalOpenTelemetry是在opentelemetry-api包中的,但在app中也会通过GlobalOpenTelemetry.get获取openTelemetry。
为了实现共享,做了两件事情,以GlobalOpenTelemetry为例:
在使用时需要通过GlobalOpenTelemetry.get()获取到openTelemetry
并没有在这个app中对OpenTelemetry做任何配置,最后却获取到了javaagent中的OpenTelemetry。
如何实现呢?
其实是对GlobalOpenTelemetry也实现了个Instrumentation,对get方法字节码做了修改,返回的类实际上是对javaagent中openTelemetry做的包装,和javaagent的openTelemetry功能相同,但类不同。
看GlobalOpenTelemetry.get的字节码会更明显些:
public static OpenTelemetry get() {
OpenTelemetry openTelemetry;
Object object;
block16: {
OpenTelemetry openTelemetry2;
block15: {
Object var0 = null;
if (var0 == null) {
openTelemetry2 = null;
} else {
try {
ObfuscatedOpenTelemetry openTelemetry3 = globalOpenTelemetry;
/* 57*/ if (openTelemetry3 == null) {
/* 58*/ object = mutex;
synchronized (object) {
/* 59*/ openTelemetry3 = globalOpenTelemetry;
/* 60*/ if (openTelemetry3 == null) {
/* 62*/ OpenTelemetry autoConfigured = GlobalOpenTelemetry.maybeAutoConfigureAndSetGlobal();
/* 63*/ if (autoConfigured != null) {
/* 64*/ openTelemetry2 = autoConfigured;
break block15;
} else {
/* 67*/ GlobalOpenTelemetry.set(OpenTelemetry.noop());
/* 68*/ openTelemetry2 = OpenTelemetry.noop();
}
break block15;
}
}
}
/* 72*/ openTelemetry2 = openTelemetry3;
}
catch (Throwable throwable) {
openTelemetry = null;
break block16;
}
}
}
openTelemetry = openTelemetry2;
object = null;
}
// 这一段就是通过ByteBuddy加的
try {
openTelemetry = ApplicationOpenTelemetry.INSTANCE;
}
catch (Throwable throwable) {
try {
LoggerFactory.getLogger(ExceptionLogger.class).debug("Failed to handle exception in instrumentation for io.opentelemetry.api.GlobalOpenTelemetry on null", throwable);
}
catch (Throwable throwable2) {}
}
if (object != null) {
throw object;
}
return openTelemetry;
}
GlobalOpenTelemetry是需要被共享的类之一,
从上面的实现可以看到在修改的字节码中加载的openTelemetry也是从GlobalOpenTelemetry来的,
而此时GlobalOpenTelemetry正在AppClassLoader中被加载(因为DebugController是被AppClassLoader加载的),
是不是哪里不对?但为什么没报错?
因为另一个GlobalOpenTelemetry是shaded的版本,在javaagent包的shaded目录下,
为了在AppClassLoader中使用shaded的类,必须保证这个类在AppClassLoader的双亲中。
这里选择的就是bootstrap class loader,这样在AgentClassLoader和AppClassLoader中都可以使用这个类。
这一过程具体发生在javaagent初始化过程的最开始,即在OpenTelemetryAgent中,将整个javaagent的jar包放到bootstrap class loader中,
此时jar包中除了已被AppClassLoader加载的OpenTelemetryAgent外,其他类在后面都将被bootstrap class laoder加载。
上面的流程从class loader的角度看就是,在通过AppClassLoader加载GlobalOpenTelemetry时,使用到了shaded版本的GlobalOpenTelemetry。
所以这个共享并不是真正的共享了类,而是共享了功能,javaagent和app的类依然是不同的。
在上面的过程中已经提到了每个class loader会加载的类,现在将javaagent的jar包解压,详细说明下每个class loader加载的javaagent的类
或者叫system class loader
就是内置的bootstrap class loader,
入口类OpenTelemetryAgent一初始化会将javaagent的jar包放到bootstrap class loader中,
所以javaagent jar包中除OpenTelemetryAgent之外的以的类都会由bootstrap class loader加载,包括:
代码中叫extension class loader,不是常说的ext class loader,是javaagent自己创建的class laoder,对app隔离。
会加载javaagent独享的类,path为jar包中的inst目录,其中的类都以classdata为后缀,包含了所有三方库的Instrumentation。
具体包含:
流程中涉及到的拓展点都标出来了,拓展时优先考虑这些拓展点,若没有再考虑修改源码。
span上报逻辑的位置可能有点不好找,这里说下。
span是在span.end()时上报,如果是手动的一般直接调用这个方法,如果是自动的,会在Instrumenter#end的尾部调用。
然后span内部调用SpanProcessor#onEnd,线上应该都是BatchSpanProcessor,如果是BatchSpanProcessor,会放到队列中,
然后在worker线程中上报,在上报逻辑中会调用具体的spanExporter,
接下来就是spanExporter的逻辑了,如果是JaegerThriftSpanExporter,会走到JaegerThriftSpanExporter#exporter。
这是一种安全检查机制,检查javaagent的ByteBuddy需要使用的symbols和app class path上的类的symbols是否兼容。
大概原理是:
具体match过程可以在InstrumentationModuleInstaller#matchers中打个断点研究下。
更详细的可以参考muzzle docs。
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.