Comments (14)
So I suppose this is the same exact case as with spring-cloud-stream.
Here is the documentation that we collaborated on: https://docs.spring.io/spring-cloud-stream/reference/observability.html#_reactive_functions
Perhaps this project could use a similar approach? One thing to keep in mind though - for spring-cloud-stream they decided not to have any Observation in scope for the reactive methods, while this case shows that there is one in scope but it's immediately terminated and not stored in the reactive Context. That might lead to unexpected results. Yet still, really worth documenting.
from spring-amqp.
Very nice @chemicL !
Updating code with:
return this.webClient
.post()
.uri(url)
.contentType(MediaType.APPLICATION_JSON)
.headers(
httpHeaders ->
headers.forEach((k, v) -> httpHeaders.add(k, v != null ? v.toString() : null)))
.bodyValue(requestBody)
.exchangeToMono(this::defaultResponseHandler)
.tap(Micrometer.observation(this.registry))
worked like a charm
from spring-amqp.
One more thought. As the listener method is invoked while an Observation
is in scope, you might find it useful to call .contextCapture()
after .tap(Micrometer.observation(this.registry))
.
The listener adapter (AbstractAdaptableMessageListener
) also subscribes while the Observation
is still in scope, therefore this way you'd be able to establish a parent-child relationship between the Observation
that is immediately finished after the Mono
is returned. However, you'd be able to correlate any logs that you perform before you actually return the Mono
. It would also mean that messages in the same batch share the trace-id of the batch.
@garyrussell please consider including these remarks in the documentation for reactive types. I'm happy to help as well.
from spring-amqp.
See my comment on the referenced issue: reactor/reactor-core#3648 (comment)
from spring-amqp.
Note that you need to enable observation in the listener container; it is disabled by default - see https://docs.spring.io/spring-amqp/docs/current/reference/html/#micrometer-observation
from spring-amqp.
Thanks for the response @garyrussell, the factories already enable the observation and context propagation is set to auto.
from spring-amqp.
Thanks for the update; I didn't look at the project (it's a holiday here until Monday).
from spring-amqp.
Wow, sorry! Have a great time and forget about me! :D
from spring-amqp.
Bear in mind that the Observation
created by the listener container only lives for the scope of the listener method; the observation is closed when the listener method returns. The container cannot handle async listener methods (it knows nothing about the actual listener method - that is all handled in the listener adapter).
I suspect you will need to handle it yourself in your listener.
from spring-amqp.
unfortunately, this is not working for me.. for the headers copy step
.headers(
httpHeaders ->
headers.forEach((k, v) -> httpHeaders.add(k, v != null ? v.toString() : null)))
What is the source of this headers.forEach, for reactor Kafka I see only traceheader propagated.
Is there a way to instrument this in the back ground, my use case is a Kafka consumer on receiving a message from source, executes a web client call. This call needs to be in the same trace..
from spring-amqp.
I don't think the Reactor Kafka is related somehow to this issue.
@chemicL ,
I'm not sure what we need to document since that contextCapture()
is a standard Reactor functionality if we would like to propagate the current thread context to the reactive stream we initiate from there: https://projectreactor.io/docs/core/release/reference/#_contextcapture_operator.
Closing this as Works as Designed
.
Will be happy to reopen if there is really a reasonable justification what we have to document.
from spring-amqp.
I'm not sure what we need to document since that
contextCapture()
is a standard Reactor functionality if we would like to propagate the current thread context to the reactive stream we initiate from there: https://projectreactor.io/docs/core/release/reference/#_contextcapture_operator.
The contextCapture()
operator on its own doesn't help much in this context, as was explained in the earlier comments (observation is immediately stopped). The situation is quite tricky for users in my opinion.
Will be happy to reopen if there is really a reasonable justification what we have to document.
I can just point back to what I previously said: #2560 (comment)
Just to reiterate, after having worked with @olegz on the spring-cloud-stream case which looks very much like this one here, we came up with what can be seen in their documentation. I believe it was justified there the same way it is for this project and either I am failing to find this in the existing documentation or there is potential to do the same brief explanation as in spring-cloud-stream's case.
I believe this issue could be renamed to something like "Document observability for listeners with asynchronous types" and reopened.
from spring-amqp.
We probably need to write some unit test to verify such a behavior, but at a glance it must work.
We do this:
observation.observe(() -> executeListenerAndHandleException(channel, data));
So, an Observation
is set into ThreadLocal
.
Then within that executeListenerAndHandleException()
we do this:
((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);
And if that returnValue
has a contextCapture()
it must get an effect taking the current observation from the ThreadLocal
.
So, that all should work OK even if we close an observation immediately after returning from the listener method call.
I see the code in question is like this:
.map(
body -> {
if (status.is2xxSuccessful()) {
log.info("HTTP[webhook] response '{}'", body);
In my experience only handle()
operator does a log correlation 🤷
from spring-amqp.
In my experience only
handle()
operator does a log correlation 🤷
@Jojoooo1 uses the new Spring Boot 3.2 config for automatic propagation to other operators.
I'll try to play with it a bit and perhaps prepare a more limited example as a unit test to validate the case.
from spring-amqp.
Related Issues (20)
- Get rid of `synchronized` in `RabbitAdmin`
- Get rid of `synchronized` in `RabbitTemplate` HOT 1
- Get rid of `synchronized` in the `BlockingQueueConsumer`
- Get rid of `synchronized` in `spring-rabbit-test` module
- Get rid of the rest of `synchronized` in the ListenerContainer infrastructure
- Improve Extensibility of RepublishMessageRecovererWithConfirms by supporting SpEL expressions HOT 1
- AmqpRejectAndDontRequeueException completely restarts Consumer HOT 2
- The `SimpleMessageListenerContainer` does not shutdown properly
- The `SimpleMessageListenerContainer` does not shutdown properly HOT 1
- Swallowed exception 'AmqpTimeoutException: No available channels' HOT 4
- Add batchReceiveTimeout in SimpleMessageListenerContainer HOT 3
- MessageProperties setDelay maximum value problem HOT 3
- ImmediateAcknowledgeAmqpException keeps the message in the queue HOT 10
- TraceId propagation to the new thread local HOT 4
- Wrong ClassLoader is used for message deserialization when devtools are active
- Wrong ClassLoader is used for message deserialization when devtools are active HOT 1
- Kotlin suspend functions return type is incorrect HOT 3
- Kotlin suspend functions return type is incorrect HOT 1
- Channel cache leak when no answers from broker for pending confirms
- Channel cache leak when no answers from broker for pending confirms HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spring-amqp.