Отслеживание распространения идентификатора в Spring Boot 3 с помощью облачных потоков Spring и WebFlux

Я попытался использовать весенний облачный поток с связующим кафкой. Но когда я вызываю WebClient в цепочке, идентификатор трассировки теряется.

Мой поток'external service' -> 'functionStream-in' -> 'http call' -> functionStream-out' -> 'testStream-in' -> 'testStream-out' -> 'external service'

Но после http-вызова (или нет?) идентификатор трассировки не распространяется, и я не понимаю, почему. Если убрать http вызов, то все ок.

я пытался добавитьHooks.enableAutomaticContextPropagation();, но это не помогло. я пытался добавитьContextSnapshot.setThreadLocalsFromвокруг http вызова - то же самое.

Как я могу это решить?

Зависимости:

      dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    
    implementation 'io.micrometer:micrometer-tracing-bridge-brave'
    implementation 'io.zipkin.reporter2:zipkin-reporter-brave'
    
    implementation "io.projectreactor:reactor-core:3.5.3"
    implementation "io.micrometer:context-propagation:1.0.2"
    implementation "io.micrometer:micrometer-core:1.10.4"
    implementation "io.micrometer:micrometer-tracing:1.0.2"
}

приложение.yml:

      spring:
  cloud.stream:
    kafka.binder:
      enableObservation: true
      headers:
        - b3
    function.definition: functionStream;testStream
    default.producer.useNativeEncoding: true
    bindings:
      functionStream-in-0:
        destination: spring-in
        group: spring-test1
      functionStream-out-0:
        destination: test-in
      testStream-in-0:
        destination: test-in
        group: spring-test2
      testStream-out-0:
        destination: spring-out
  integration:
    management:
      observation-patterns: "*"
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

management:
  tracing:
    enabled: true
    sampling.probability: 1.0
    propagation.type: b3
logging.pattern.level: "%5p [%X{traceId:-},%X{spanId:-}]"

Код:

          @Bean
    WebClient webClient(final WebClient.Builder builder) {
        return builder.build();
    }

    @Bean
    Function<Flux<Message<String>>, Flux<Message<String>>> functionStream(final WebClient webClient, final ObservationRegistry registry) {
        return flux -> flux
                .<Message<String>>handle((msg, sink) -> {
                    log.info("functionStream-1");
                    sink.next(msg);
                })
                .flatMap(msg -> webClient.get()
                        .uri("http://localhost:8080/test")
                        .exchangeToMono(httpResponse -> httpResponse.bodyToMono(String.class)
                                .map(httpBody -> MessageBuilder.withPayload(httpBody)
                                        .copyHeaders(httpResponse.headers().asHttpHeaders())
                                        .build())
                                .<Message<String>>handle((m, sink) -> {
                                    log.info("functionStream-3");
                                    sink.next(m);
                                })
                        )
                )
                .handle((msg, sink) -> {
                    log.info("functionStream-2");
                    sink.next(msg);
                });
    }

    @Bean
    Function<Flux<Message<String>>, Flux<Message<String>>> testStream(final ObservationRegistry registry) {
        return flux -> flux
                .publishOn(Schedulers.boundedElastic())
                .<Message<String>>handle((msg, sink) -> {
                    log.info("testStream-1");
                    sink.next(msg);
                })
                .map(msg -> MessageBuilder
                        .withPayload(msg.getPayload())
                        .copyHeaders(msg.getHeaders())
                        .build());
    }

    @Bean
    RouterFunction<ServerResponse> router(final ObservationRegistry registry) {
        return route()
                .GET("/test", r -> ServerResponse.ok().body(Mono.deferContextual(contextView -> {
                    try (final var scope = ContextSnapshot.setThreadLocalsFrom(contextView, ObservationThreadLocalAccessor.KEY)) {
                        log.info("GET /test");
                    }
                    return Mono.just("answer");
                }), String.class))
                .build();
    }

С этим кодом у меня есть вывод:

      2023-02-16T17:06:22.111  INFO [63ee385de15f1061dea076eb06b0d1e0,39a60588a695a702] 220348 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-1
2023-02-16T17:06:22.166  WARN [63ee385de15f1061dea076eb06b0d1e0,39a60588a695a702] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@523fe6a9]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.170  WARN [63ee385de15f1061dea076eb06b0d1e0,de5d233d531b10f7] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@545339d8]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.187  WARN [63ee385de15f1061dea076eb06b0d1e0,de5d233d531b10f7] 220348 --- [container-0-C-1] i.m.o.c.ObservationThreadLocalAccessor   : Scope from ObservationThreadLocalAccessor [null] is not the same as the one from ObservationRegistry [io.micrometer.observation.SimpleObservation$SimpleScope@44400bcc]. You must have created additional scopes and forgotten to close them. Will close both of them
2023-02-16T17:06:22.361  INFO [63ee385de15f1061dea076eb06b0d1e0,908f48f8485a4277] 220348 --- [ctor-http-nio-4] com.example.demo.TestApplication         : GET /test
2023-02-16T17:06:22.407  INFO [,] 220348 --- [ctor-http-nio-3] com.example.demo.TestApplication         : functionStream-3
2023-02-16T17:06:22.409  INFO [,] 220348 --- [ctor-http-nio-3] com.example.demo.TestApplication         : functionStream-2
2023-02-16T17:06:22.448  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:06:22.456  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:06:22.457  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:06:22.457  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,dd1b0fd86a6c39ca] 220348 --- [ctor-http-nio-3] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556382456
2023-02-16T17:06:22.477  INFO [,] 220348 --- [| adminclient-6] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-6 unregistered
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-16T17:06:22.481  INFO [,] 220348 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2023-02-16T17:06:22.512  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,b5babc6bef4e30ca] 220348 --- [oundedElastic-1] com.example.demo.TestApplication         : testStream-1
2023-02-16T17:06:22.539  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:06:22.543  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:06:22.544  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:06:22.544  INFO [63ee385eda64dcebdd1b0fd86a6c39ca,30126c50752d5928] 220348 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556382543

Без http-вызова у меня есть вывод:

      2023-02-16T17:03:09.518  INFO [63ee379d924e5645fc1d9e27b8135b48,9ad408700a3b5684] 204228 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-1
2023-02-16T17:03:09.518  INFO [63ee379d924e5645fc1d9e27b8135b48,9ad408700a3b5684] 204228 --- [container-0-C-1] com.example.demo.TestApplication         : functionStream-2
2023-02-16T17:03:09.615  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:03:09.629  INFO [63ee379d924e5645fc1d9e27b8135b48,3d4c6bd14a3ca4b6] 204228 --- [container-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556189628
2023-02-16T17:03:09.691  INFO [,] 204228 --- [| adminclient-6] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-6 unregistered
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics scheduler closed
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2023-02-16T17:03:09.693  INFO [,] 204228 --- [| adminclient-6] o.apache.kafka.common.metrics.Metrics    : Metrics reporters closed
2023-02-16T17:03:09.859  INFO [63ee379d924e5645fc1d9e27b8135b48,b92a1a59ffd32d80] 204228 --- [oundedElastic-1] com.example.demo.TestApplication         : testStream-1
2023-02-16T17:03:09.868  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-02-16T17:03:09.874  INFO [63ee379d924e5645fc1d9e27b8135b48,db97f5eed98602f6] 204228 --- [oundedElastic-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1676556189874

0 ответов

Другие вопросы по тегам