Git Product home page Git Product logo

Comments (14)

chemicL avatar chemicL commented on April 28, 2024 1

So I suppose this is the same exact case as with spring-cloud-stream.

Here is the documentation that we collaborated on: https://docs.spring.io/spring-cloud-stream/reference/observability.html#_reactive_functions

Perhaps this project could use a similar approach? One thing to keep in mind though - for spring-cloud-stream they decided not to have any Observation in scope for the reactive methods, while this case shows that there is one in scope but it's immediately terminated and not stored in the reactive Context. That might lead to unexpected results. Yet still, really worth documenting.

from spring-amqp.

Jojoooo1 avatar Jojoooo1 commented on April 28, 2024 1

Very nice @chemicL !
Updating code with:

    return this.webClient
        .post()
        .uri(url)
        .contentType(MediaType.APPLICATION_JSON)
        .headers(
            httpHeaders ->
                headers.forEach((k, v) -> httpHeaders.add(k, v != null ? v.toString() : null)))
        .bodyValue(requestBody)
        .exchangeToMono(this::defaultResponseHandler)
        .tap(Micrometer.observation(this.registry))

worked like a charm

from spring-amqp.

chemicL avatar chemicL commented on April 28, 2024 1

One more thought. As the listener method is invoked while an Observation is in scope, you might find it useful to call .contextCapture() after .tap(Micrometer.observation(this.registry)).
The listener adapter (AbstractAdaptableMessageListener) also subscribes while the Observation is still in scope, therefore this way you'd be able to establish a parent-child relationship between the Observation that is immediately finished after the Mono is returned. However, you'd be able to correlate any logs that you perform before you actually return the Mono. It would also mean that messages in the same batch share the trace-id of the batch.

@garyrussell please consider including these remarks in the documentation for reactive types. I'm happy to help as well.

from spring-amqp.

garyrussell avatar garyrussell commented on April 28, 2024

See my comment on the referenced issue: reactor/reactor-core#3648 (comment)

from spring-amqp.

garyrussell avatar garyrussell commented on April 28, 2024

Note that you need to enable observation in the listener container; it is disabled by default - see https://docs.spring.io/spring-amqp/docs/current/reference/html/#micrometer-observation

from spring-amqp.

Jojoooo1 avatar Jojoooo1 commented on April 28, 2024

Thanks for the response @garyrussell, the factories already enable the observation and context propagation is set to auto.

from spring-amqp.

garyrussell avatar garyrussell commented on April 28, 2024

Thanks for the update; I didn't look at the project (it's a holiday here until Monday).

from spring-amqp.

Jojoooo1 avatar Jojoooo1 commented on April 28, 2024

Wow, sorry! Have a great time and forget about me! :D

from spring-amqp.

garyrussell avatar garyrussell commented on April 28, 2024

Bear in mind that the Observation created by the listener container only lives for the scope of the listener method; the observation is closed when the listener method returns. The container cannot handle async listener methods (it knows nothing about the actual listener method - that is all handled in the listener adapter).

I suspect you will need to handle it yourself in your listener.

from spring-amqp.

sinfull1 avatar sinfull1 commented on April 28, 2024

unfortunately, this is not working for me.. for the headers copy step

.headers(
httpHeaders ->
headers.forEach((k, v) -> httpHeaders.add(k, v != null ? v.toString() : null)))

What is the source of this headers.forEach, for reactor Kafka I see only traceheader propagated.

Is there a way to instrument this in the back ground, my use case is a Kafka consumer on receiving a message from source, executes a web client call. This call needs to be in the same trace..

from spring-amqp.

artembilan avatar artembilan commented on April 28, 2024

@sinfull1 ,

I don't think the Reactor Kafka is related somehow to this issue.

@chemicL ,

I'm not sure what we need to document since that contextCapture() is a standard Reactor functionality if we would like to propagate the current thread context to the reactive stream we initiate from there: https://projectreactor.io/docs/core/release/reference/#_contextcapture_operator.

Closing this as Works as Designed.

Will be happy to reopen if there is really a reasonable justification what we have to document.

from spring-amqp.

chemicL avatar chemicL commented on April 28, 2024

I'm not sure what we need to document since that contextCapture() is a standard Reactor functionality if we would like to propagate the current thread context to the reactive stream we initiate from there: https://projectreactor.io/docs/core/release/reference/#_contextcapture_operator.

The contextCapture() operator on its own doesn't help much in this context, as was explained in the earlier comments (observation is immediately stopped). The situation is quite tricky for users in my opinion.

Will be happy to reopen if there is really a reasonable justification what we have to document.

I can just point back to what I previously said: #2560 (comment)

Just to reiterate, after having worked with @olegz on the spring-cloud-stream case which looks very much like this one here, we came up with what can be seen in their documentation. I believe it was justified there the same way it is for this project and either I am failing to find this in the existing documentation or there is potential to do the same brief explanation as in spring-cloud-stream's case.

I believe this issue could be renamed to something like "Document observability for listeners with asynchronous types" and reopened.

from spring-amqp.

artembilan avatar artembilan commented on April 28, 2024

We probably need to write some unit test to verify such a behavior, but at a glance it must work.
We do this:

observation.observe(() -> executeListenerAndHandleException(channel, data));

So, an Observation is set into ThreadLocal.
Then within that executeListenerAndHandleException() we do this:

((Mono<? super Object>) returnValue).subscribe(success, failure, completeConsumer);

And if that returnValue has a contextCapture() it must get an effect taking the current observation from the ThreadLocal.
So, that all should work OK even if we close an observation immediately after returning from the listener method call.
I see the code in question is like this:

    .map(
            body -> {
              if (status.is2xxSuccessful()) {
                log.info("HTTP[webhook] response '{}'", body);

In my experience only handle() operator does a log correlation 🤷

from spring-amqp.

chemicL avatar chemicL commented on April 28, 2024

In my experience only handle() operator does a log correlation 🤷

@Jojoooo1 uses the new Spring Boot 3.2 config for automatic propagation to other operators.

I'll try to play with it a bit and perhaps prepare a more limited example as a unit test to validate the case.

from spring-amqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.