Продолжить отслеживание пролета, начатого с помощью сыщика
Я разрабатываю приложение на основе микросервисов с тремя службами:
- Служба оркестратора
- Услуга А
- Услуга Б
Служба Orchestrator — это приложение Spring Boot, которое отслеживает запросы с помощью Sleuth/B3. Вот пример записи в журнале:
2023-09-19 16:52:49.401 DEBUG [orchestrator,6ddce8f8b325a9b9,6ddce8f8b325a9b9] 30743 --- [nio-8080-exec-8] org.hibernate.SQL
Три микросервиса взаимодействуют через Kafka Broker, поэтому в конце обработки запроса сервис Orchestrator помещает сообщение в тему Kafka, отправляя следующие заголовки (включая заголовки B3):
{
"X-B3-SpanId": "582942e743b9446f",
"X-B3-TraceId": "582942e743b9446f",
"id": "9"
}
Служба приема — это служба NestJS, которая использует OpenTelemetry с распространителем B3 для перехвата заголовков B3 из сообщения Kafka. Теперь я хотел бы продолжить отслеживание TraceId и SpanId, созданных с помощью Sleuth, но не могу понять, как это сделать. Вот как я настроил открытую телеметрию в сервисе NestJS:
трассер.js
import {
ConsoleSpanExporter,
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { api, NodeSDK } from '@opentelemetry/sdk-node';
import * as process from 'process';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import {Resource} from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { B3Propagator, B3InjectEncoding } from '@opentelemetry/propagator-b3';
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { KafkaJsInstrumentation } = require('opentelemetry-instrumentation-kafkajs');
const traceExporter = new ConsoleSpanExporter();
api.propagation.setGlobalPropagator(new B3Propagator({ injectEncoding: B3InjectEncoding.MULTI_HEADER }));
export const otelSDK = new NodeSDK({
spanProcessor: new SimpleSpanProcessor(traceExporter),
instrumentations: [
getNodeAutoInstrumentations(),
new KafkaJsInstrumentation({
// see under for available configuration
})
],
resource: new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: `ipfs-service`,
}),
});
// gracefully shut down the SDK on process exit
process.on('SIGTERM', () => {
otelSDK
.shutdown()
.then(
() => console.log('SDK shut down successfully'),
(err) => console.log('Error shutting down SDK', err),
)
.finally(() => process.exit(0));
});
main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { otelSDK } from './tracer';
async function bootstrap() {
otelSDK.start();
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'ipfs-client',
brokers: ['localhost:29092'],
},
subscribe: {
fromBeginning: true
},
consumer: {
groupId: 'ipfs-consumer'
}
}
});
app.listen();
}
bootstrap();
и вот что регистрирует микросервис NestJS:
{
traceId: '48f139a5b81cd0bf60748321310047d2',
parentId: undefined,
traceState: undefined,
name: 'IPFS',
id: '353136b7af6bcc2d',
kind: 4,
timestamp: 1695137391490000,
duration: 2493.083,
attributes: {
'messaging.system': 'kafka',
'messaging.destination': 'IPFS',
'messaging.destination_kind': 'topic',
'messaging.operation': 'process'
},
status: { code: 0 },
events: [],
links: []
}
как вы можете видеть, идентификатор трассировки отличается от оригинала, созданного Сыщиком.
Заранее спасибо.