Продолжить отслеживание пролета, начатого с помощью сыщика

Я разрабатываю приложение на основе микросервисов с тремя службами:

  • Служба оркестратора
  • Услуга А
  • Услуга Б

Служба Orchestrator — это приложение Spring Boot, которое отслеживает запросы с помощью Sleuth/B3. Вот пример записи в журнале:

      2023-09-19 16:52:49.401 DEBUG [orchestrator,6ddce8f8b325a9b9,6ddce8f8b325a9b9] 30743 --- [nio-8080-exec-8] org.hibernate.SQL

Три микросервиса взаимодействуют через Kafka Broker, поэтому в конце обработки запроса сервис Orchestrator помещает сообщение в тему Kafka, отправляя следующие заголовки (включая заголовки B3):

      {
    "X-B3-SpanId": "582942e743b9446f",
    "X-B3-TraceId": "582942e743b9446f",
    "id": "9"
}

Служба приема — это служба NestJS, которая использует OpenTelemetry с распространителем B3 для перехвата заголовков B3 из сообщения Kafka. Теперь я хотел бы продолжить отслеживание TraceId и SpanId, созданных с помощью Sleuth, но не могу понять, как это сделать. Вот как я настроил открытую телеметрию в сервисе NestJS:

трассер.js

      import {
    ConsoleSpanExporter,
    SimpleSpanProcessor,
  } from '@opentelemetry/sdk-trace-base';
  import { api, NodeSDK } from '@opentelemetry/sdk-node';
  import * as process from 'process';
  import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
  import {Resource} from '@opentelemetry/resources';
  import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
  import { B3Propagator, B3InjectEncoding } from '@opentelemetry/propagator-b3';
  const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
  const { KafkaJsInstrumentation } = require('opentelemetry-instrumentation-kafkajs');

  
  const traceExporter = new ConsoleSpanExporter();
  
  api.propagation.setGlobalPropagator(new B3Propagator({ injectEncoding: B3InjectEncoding.MULTI_HEADER }));

  export const otelSDK = new NodeSDK({
    spanProcessor: new SimpleSpanProcessor(traceExporter),
    instrumentations: [
        getNodeAutoInstrumentations(),
        new KafkaJsInstrumentation({
            // see under for available configuration
        })
    ],
    resource: new Resource({
        [SemanticResourceAttributes.SERVICE_NAME]: `ipfs-service`,
    }),
  });
  
  // gracefully shut down the SDK on process exit
  process.on('SIGTERM', () => {
    otelSDK
      .shutdown()
      .then(
        () => console.log('SDK shut down successfully'),
        (err) => console.log('Error shutting down SDK', err),
      )
      .finally(() => process.exit(0));
  });
  

main.ts

      import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { otelSDK } from './tracer';

async function bootstrap() {

  otelSDK.start();

  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'ipfs-client',
        brokers: ['localhost:29092'],
      },
      subscribe: {
        fromBeginning: true
      },
      consumer: {
        groupId: 'ipfs-consumer'
      }
    }
  });
  app.listen();
}
bootstrap();

и вот что регистрирует микросервис NestJS:

      {
  traceId: '48f139a5b81cd0bf60748321310047d2',
  parentId: undefined,
  traceState: undefined,
  name: 'IPFS',
  id: '353136b7af6bcc2d',
  kind: 4,
  timestamp: 1695137391490000,
  duration: 2493.083,
  attributes: {
    'messaging.system': 'kafka',
    'messaging.destination': 'IPFS',
    'messaging.destination_kind': 'topic',
    'messaging.operation': 'process'
  },
  status: { code: 0 },
  events: [],
  links: []
}

как вы можете видеть, идентификатор трассировки отличается от оригинала, созданного Сыщиком.

Заранее спасибо.

0 ответов

Другие вопросы по тегам