分布式追踪实战

Jaeger 分布式追踪

什么是分布式追踪

在微服务架构中,一个用户请求可能经过多个服务才能完成。分布式追踪用于跟踪请求在各个服务间的调用路径和性能。

问题场景

用户请求很慢,但不知道哪个服务慢?
├─ API Gateway (10ms)
├─ 用户服务 (50ms)
├─ 订单服务 (2000ms) ← 找到瓶颈!
├─ 支付服务 (100ms)
└─ 通知服务 (30ms)

核心概念

1. Trace(追踪)

一次完整的请求调用链:

Trace ID: abc123
├─ Span 1: API Gateway [100ms]
│  └─ Span 2: Auth Service [50ms]
├─ Span 3: Order Service [200ms]
│  ├─ Span 4: DB Query [150ms]
│  └─ Span 5: Cache Check [20ms]
└─ Span 6: Payment Service [80ms]

2. Span(跨度)

单个服务的处理单元:

Span:
  TraceID: abc123
  SpanID: span-001
  ParentSpanID: null
  Operation: GET /api/orders
  StartTime: 2024-01-09 10:00:00.000
  Duration: 200ms
  Tags:
    - http.method: GET
    - http.status_code: 200
  Logs:
    - timestamp: 10:00:00.050
      event: db.query.start
    - timestamp: 10:00:00.200
      event: db.query.end

3. 关键指标

  • 延迟(Latency):请求总耗时
  • 错误率(Error Rate):失败的请求比例
  • 吞吐量(Throughput):每秒请求数
  • 依赖关系:服务间的调用拓扑

Jaeger 架构

┌─────────────┐
│ Application │
│  + Jaeger   │
│   Client    │
└──────┬──────┘
       │ UDP
       ▼
┌──────────────┐
│ Jaeger Agent │ (DaemonSet)
└──────┬───────┘
       │ TCP/HTTP
       ▼
┌──────────────┐
│ Jaeger       │
│ Collector    │
└──────┬───────┘
       │
       ▼
┌──────────────┐      ┌──────────────┐
│   Storage    │ ←──→ │ Jaeger Query │
│ (ES/Cassandra)│      │   + UI       │
└──────────────┘      └──────────────┘

组件

  • Jaeger Client:应用内嵌的追踪库
  • Jaeger Agent:本地代理,收集 Spans
  • Jaeger Collector:接收、验证、存储
  • Storage:存储后端(ElasticSearch、Cassandra)
  • Jaeger Query:查询服务和 UI

在 Kubernetes 中部署 Jaeger

方式 1:使用 Jaeger Operator

1. 安装 Operator

# 安装 Cert Manager(依赖)
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.0/cert-manager.yaml

# 安装 Jaeger Operator
kubectl create namespace observability
kubectl create -f https://github.com/jaegertracing/jaeger-operator/releases/download/v1.51.0/jaeger-operator.yaml -n observability

2. 部署 Jaeger All-in-One(开发环境)

apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: simplest
  namespace: observability
spec:
  strategy: allInOne
  allInOne:
    image: jaegertracing/all-in-one:latest
    options:
      log-level: debug
  storage:
    type: memory
  ingress:
    enabled: true
  ui:
    options:
      dependencies:
        menuEnabled: true

应用配置:

kubectl apply -f jaeger-allinone.yaml

# 验证
kubectl get pods -n observability
kubectl get svc -n observability

3. 生产环境配置(使用 ElasticSearch)

apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: jaeger-production
  namespace: observability
spec:
  strategy: production
  storage:
    type: elasticsearch
    options:
      es:
        server-urls: http://elasticsearch:9200
        index-prefix: jaeger
    esIndexCleaner:
      enabled: true
      numberOfDays: 7
      schedule: "55 23 * * *"
  collector:
    replicas: 3
    resources:
      requests:
        cpu: 500m
        memory: 1Gi
      limits:
        cpu: 1000m
        memory: 2Gi
  query:
    replicas: 2
    resources:
      requests:
        cpu: 200m
        memory: 512Mi

方式 2:使用 Helm 部署

# 添加 Helm 仓库
helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
helm repo update

# 部署(开发环境)
helm install jaeger jaegertracing/jaeger \
  --namespace observability \
  --create-namespace \
  --set allInOne.enabled=true \
  --set storage.type=memory \
  --set agent.enabled=false

# 部署(生产环境)
helm install jaeger jaegertracing/jaeger \
  --namespace observability \
  --create-namespace \
  --set provisionDataStore.cassandra=true \
  --set storage.type=cassandra

访问 Jaeger UI

# Port Forward
kubectl port-forward -n observability svc/simplest-query 16686:16686

# 访问
open http://localhost:16686

与 Istio 集成

1. Istio 自动追踪

Istio 自动为所有服务生成追踪数据:

# 安装 Istio 时启用追踪
istioctl install --set profile=demo \
  --set meshConfig.defaultConfig.tracing.zipkin.address=jaeger-collector.observability:9411 \
  --set meshConfig.defaultConfig.tracing.sampling=100.0

2. 配置 Istio 追踪

apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
  meshConfig:
    enableTracing: true
    defaultConfig:
      tracing:
        sampling: 100.0  # 采样率 100%
        zipkin:
          address: jaeger-collector.observability:9411
    extensionProviders:
    - name: jaeger
      zipkin:
        service: jaeger-collector.observability
        port: 9411

应用配置:

istioctl install -f istio-config.yaml

3. 在应用中传递追踪头

为了实现端到端追踪,应用需要传递追踪头:

# Python/Flask 示例
from flask import Flask, request
import requests

app = Flask(__name__)

@app.route('/order')
def create_order():
    # 获取追踪头
    headers = {}
    for header in ['x-request-id', 'x-b3-traceid', 'x-b3-spanid', 
                   'x-b3-parentspanid', 'x-b3-sampled', 'x-b3-flags']:
        value = request.headers.get(header)
        if value:
            headers[header] = value
    
    # 调用下游服务时传递追踪头
    response = requests.get('http://payment-service/pay', headers=headers)
    return response.json()
// Node.js/Express 示例
const express = require('express');
const axios = require('axios');

const app = express();

app.get('/order', async (req, res) => {
    // 提取追踪头
    const tracingHeaders = [
        'x-request-id',
        'x-b3-traceid',
        'x-b3-spanid',
        'x-b3-parentspanid',
        'x-b3-sampled',
        'x-b3-flags'
    ];
    
    const headers = {};
    tracingHeaders.forEach(header => {
        if (req.headers[header]) {
            headers[header] = req.headers[header];
        }
    });
    
    // 调用下游服务
    const response = await axios.get('http://payment-service/pay', { headers });
    res.json(response.data);
});

应用代码集成

方式 1:使用 OpenTelemetry(推荐)

Python 应用

# 安装依赖
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-jaeger

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor

# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
    agent_host_name="jaeger-agent",
    agent_port=6831,
)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

# Flask 应用自动追踪
from flask import Flask
app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)

@app.route('/api/order')
def create_order():
    # 自定义 Span
    with tracer.start_as_current_span("process-order") as span:
        span.set_attribute("order.id", "12345")
        span.set_attribute("user.id", "user-001")
        
        # 业务逻辑
        result = process_order()
        
        span.add_event("order.created", {"order_id": result.id})
        return {"status": "success"}

Go 应用

// go get go.opentelemetry.io/otel
// go get go.opentelemetry.io/otel/exporters/jaeger

package main

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

func initTracer() (*sdktrace.TracerProvider, error) {
    // 创建 Jaeger Exporter
    exporter, err := jaeger.New(
        jaeger.WithAgentEndpoint(
            jaeger.WithAgentHost("jaeger-agent"),
            jaeger.WithAgentPort("6831"),
        ),
    )
    if err != nil {
        return nil, err
    }

    // 创建 TracerProvider
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("order-service"),
        )),
    )
    
    otel.SetTracerProvider(tp)
    return tp, nil
}

func createOrder(ctx context.Context) {
    tracer := otel.Tracer("order-service")
    
    ctx, span := tracer.Start(ctx, "create-order")
    defer span.End()
    
    span.SetAttributes(
        semconv.HTTPMethodKey.String("POST"),
        semconv.HTTPURLKey.String("/api/orders"),
    )
    
    // 业务逻辑
    processOrder(ctx)
}

Java 应用(Spring Boot)

<!-- pom.xml -->
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-api</artifactId>
    <version>1.32.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-sdk</artifactId>
    <version>1.32.0</version>
</dependency>
<dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-jaeger</artifactId>
    <version>1.32.0</version>
</dependency>
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.Span;

@RestController
public class OrderController {
    
    private final Tracer tracer = GlobalOpenTelemetry.getTracer("order-service");
    
    @GetMapping("/api/order")
    public ResponseEntity<Order> createOrder() {
        Span span = tracer.spanBuilder("create-order").startSpan();
        
        try (Scope scope = span.makeCurrent()) {
            span.setAttribute("order.id", orderId);
            span.addEvent("Processing order");
            
            // 业务逻辑
            Order order = orderService.createOrder();
            
            span.setAttribute("order.status", "created");
            return ResponseEntity.ok(order);
        } finally {
            span.end();
        }
    }
}

方式 2:使用 Jaeger Client SDK

Python Jaeger Client

# pip install jaeger-client

from jaeger_client import Config

def init_tracer(service_name):
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'local_agent': {
                'reporting_host': 'jaeger-agent',
                'reporting_port': 6831,
            },
            'logging': True,
        },
        service_name=service_name,
    )
    return config.initialize_tracer()

tracer = init_tracer('order-service')

@app.route('/order')
def create_order():
    with tracer.start_span('create-order') as span:
        span.set_tag('order.id', '12345')
        
        # 调用其他服务
        with tracer.start_span('call-payment', child_of=span) as child_span:
            payment_result = call_payment_service()
            child_span.log_kv({'payment.status': payment_result.status})
        
        return {'status': 'success'}

Rust 应用

// Cargo.toml
// [dependencies]
// opentelemetry = { version = "0.21", features = ["trace"] }
// opentelemetry-jaeger = "0.20"
// opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
// tracing = "0.1"
// tracing-opentelemetry = "0.22"
// tracing-subscriber = "0.3"

use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
use opentelemetry_jaeger::JaegerPipeline;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;

fn init_tracer() -> Result<(), Box<dyn std::error::Error>> {
    global::set_text_map_propagator(TraceContextPropagator::new());
    
    let tracer = opentelemetry_jaeger::new_agent_pipeline()
        .with_service_name("order-service")
        .with_endpoint("jaeger-agent:6831")
        .install_simple()?;
    
    let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
    let subscriber = Registry::default().with(telemetry);
    tracing::subscriber::set_global_default(subscriber)?;
    
    Ok(())
}

#[tracing::instrument]
async fn create_order(order_id: &str) -> Result<Order, OrderError> {
    // 自动创建 span
    tracing::info!(order.id = order_id, "Processing order");
    
    // 调用数据库
    let order = fetch_order_from_db(order_id).await?;
    
    // 调用其他服务
    let payment = process_payment(&order).await?;
    
    tracing::info!(payment.status = ?payment.status, "Payment processed");
    
    Ok(order)
}

#[tracing::instrument]
async fn fetch_order_from_db(order_id: &str) -> Result<Order, DbError> {
    // 数据库操作会自动记录在 span 中
    let order = db.query("SELECT * FROM orders WHERE id = $1")
        .bind(order_id)
        .fetch_one()
        .await?;
    
    Ok(order)
}

.NET/C# 应用

// NuGet 包
// Install-Package OpenTelemetry
// Install-Package OpenTelemetry.Exporter.Jaeger
// Install-Package OpenTelemetry.Extensions.Hosting
// Install-Package OpenTelemetry.Instrumentation.AspNetCore

using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// 配置 OpenTelemetry
builder.Services.AddOpenTelemetry()
    .WithTracing(tracerProviderBuilder =>
    {
        tracerProviderBuilder
            .SetResourceBuilder(ResourceBuilder.CreateDefault()
                .AddService("order-service")
                .AddTelemetrySdk())
            .AddAspNetCoreInstrumentation()
            .AddHttpClientInstrumentation()
            .AddSqlClientInstrumentation()
            .AddJaegerExporter(options =>
            {
                options.AgentHost = "jaeger-agent";
                options.AgentPort = 6831;
            });
    });

var app = builder.Build();

// OrderController.cs
using System.Diagnostics;

[ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
    private static readonly ActivitySource ActivitySource = new("OrderService");
    private readonly IOrderService _orderService;
    
    public OrderController(IOrderService orderService)
    {
        _orderService = orderService;
    }
    
    [HttpPost]
    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)
    {
        // 创建自定义 Span
        using var activity = ActivitySource.StartActivity("CreateOrder");
        activity?.SetTag("order.id", request.OrderId);
        activity?.SetTag("user.id", request.UserId);
        
        try
        {
            var order = await _orderService.CreateOrderAsync(request);
            
            activity?.AddEvent(new ActivityEvent("Order created"));
            activity?.SetTag("order.status", order.Status);
            
            return Ok(order);
        }
        catch (Exception ex)
        {
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            activity?.RecordException(ex);
            throw;
        }
    }
}

// OrderService.cs
public class OrderService : IOrderService
{
    private static readonly ActivitySource ActivitySource = new("OrderService");
    private readonly HttpClient _httpClient;
    private readonly IDbConnection _dbConnection;
    
    public async Task<Order> CreateOrderAsync(OrderRequest request)
    {
        using var activity = ActivitySource.StartActivity("ProcessOrder");
        
        // 数据库查询
        using (var dbActivity = ActivitySource.StartActivity("DatabaseQuery"))
        {
            dbActivity?.SetTag("db.system", "postgresql");
            dbActivity?.SetTag("db.statement", "INSERT INTO orders...");
            
            await _dbConnection.ExecuteAsync(
                "INSERT INTO orders (id, user_id) VALUES (@Id, @UserId)",
                new { Id = request.OrderId, UserId = request.UserId }
            );
        }
        
        // 调用支付服务
        using (var paymentActivity = ActivitySource.StartActivity("CallPaymentService"))
        {
            paymentActivity?.SetTag("service.name", "payment-service");
            
            var response = await _httpClient.PostAsJsonAsync(
                "http://payment-service/api/payment",
                new { OrderId = request.OrderId, Amount = request.Amount }
            );
            
            paymentActivity?.SetTag("http.status_code", (int)response.StatusCode);
        }
        
        return new Order { Id = request.OrderId, Status = "Created" };
    }
}

PHP 应用(Laravel)

<?php
// composer require open-telemetry/opentelemetry
// composer require open-telemetry/exporter-jaeger

use OpenTelemetry\SDK\Trace\TracerProvider;
use OpenTelemetry\SDK\Trace\SpanExporter\JaegerExporter;
use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor;
use OpenTelemetry\SDK\Resource\ResourceInfo;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;

// config/tracing.php
class TracingConfig
{
    public static function init()
    {
        $exporter = new JaegerExporter(
            'order-service',
            'http://jaeger-collector:14268/api/traces'
        );
        
        $tracerProvider = new TracerProvider(
            new SimpleSpanProcessor($exporter),
            null,
            ResourceInfo::create(Attributes::create([
                'service.name' => 'order-service',
                'service.version' => '1.0.0',
            ]))
        );
        
        return $tracerProvider->getTracer('order-service');
    }
}

// app/Http/Controllers/OrderController.php
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;

class OrderController extends Controller
{
    private $tracer;
    
    public function __construct()
    {
        $this->tracer = TracingConfig::init();
    }
    
    public function createOrder(Request $request)
    {
        // 创建 span
        $span = $this->tracer->spanBuilder('create-order')
            ->setSpanKind(SpanKind::KIND_SERVER)
            ->startSpan();
        
        $span->setAttribute('order.id', $request->input('order_id'));
        $span->setAttribute('user.id', $request->input('user_id'));
        
        try {
            // 数据库操作
            $dbSpan = $this->tracer->spanBuilder('db.query')
                ->setSpanKind(SpanKind::KIND_CLIENT)
                ->startSpan();
            
            $dbSpan->setAttribute('db.system', 'mysql');
            $dbSpan->setAttribute('db.statement', 'INSERT INTO orders...');
            
            $order = Order::create([
                'order_id' => $request->input('order_id'),
                'user_id' => $request->input('user_id'),
            ]);
            
            $dbSpan->end();
            
            // 调用支付服务
            $paymentSpan = $this->tracer->spanBuilder('call-payment-service')
                ->setSpanKind(SpanKind::KIND_CLIENT)
                ->startSpan();
            
            $paymentSpan->setAttribute('service.name', 'payment-service');
            
            $response = Http::withHeaders([
                // 传递追踪头
                'traceparent' => $this->getTraceParent($span),
            ])->post('http://payment-service/api/payment', [
                'order_id' => $order->id,
                'amount' => $request->input('amount'),
            ]);
            
            $paymentSpan->setAttribute('http.status_code', $response->status());
            $paymentSpan->end();
            
            $span->addEvent('order.created', [
                'order.id' => $order->id,
                'order.status' => 'created',
            ]);
            
            $span->setStatus(StatusCode::STATUS_OK);
            
            return response()->json($order);
        } catch (\Exception $e) {
            $span->recordException($e);
            $span->setStatus(StatusCode::STATUS_ERROR, $e->getMessage());
            throw $e;
        } finally {
            $span->end();
        }
    }
}

方式 3:数据库追踪集成

MySQL/PostgreSQL 追踪

# Python + SQLAlchemy + OpenTelemetry
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy import create_engine

# 自动追踪所有 SQL 查询
engine = create_engine('postgresql://user:pass@localhost/db')
SQLAlchemyInstrumentor().instrument(engine=engine)

# 使用时会自动创建 span
with engine.connect() as conn:
    result = conn.execute("SELECT * FROM orders WHERE id = %s", order_id)
// Go + database/sql + OpenTelemetry
import (
    "database/sql"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/contrib/instrumentation/database/sql/otelsql"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

func initDB() (*sql.DB, error) {
    // 使用 otelsql 包装驱动
    driverName, err := otelsql.Register("postgres",
        otelsql.WithAttributes(
            semconv.DBSystemPostgreSQL,
        ),
    )
    if err != nil {
        return nil, err
    }
    
    db, err := sql.Open(driverName, "postgres://user:pass@localhost/db")
    if err != nil {
        return nil, err
    }
    
    return db, nil
}

func queryOrder(ctx context.Context, orderID string) (*Order, error) {
    // 所有数据库操作自动追踪
    var order Order
    err := db.QueryRowContext(ctx, 
        "SELECT * FROM orders WHERE id = $1", orderID,
    ).Scan(&order.ID, &order.UserID, &order.Status)
    
    return &order, err
}

Redis 缓存追踪

# Python + redis-py + OpenTelemetry
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis

# 自动追踪 Redis 操作
RedisInstrumentor().instrument()

redis_client = redis.Redis(host='localhost', port=6379)

# 使用时会自动创建 span
redis_client.get('order:12345')
redis_client.setex('order:12345', 3600, json.dumps(order_data))
// Go + go-redis + OpenTelemetry
import (
    "github.com/go-redis/redis/v8"
    "go.opentelemetry.io/contrib/instrumentation/github.com/go-redis/redis/v8/otelredis"
)

func initRedis() *redis.Client {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    
    // 启用追踪
    rdb.AddHook(otelredis.NewTracingHook())
    
    return rdb
}

func getOrderFromCache(ctx context.Context, orderID string) (*Order, error) {
    // Redis 操作自动追踪
    val, err := rdb.Get(ctx, "order:"+orderID).Result()
    if err != nil {
        return nil, err
    }
    
    var order Order
    json.Unmarshal([]byte(val), &order)
    return &order, nil
}

MongoDB 追踪

// Node.js + MongoDB + OpenTelemetry
const { MongoClient } = require('mongodb');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { MongoDBInstrumentation } = require('@opentelemetry/instrumentation-mongodb');

// 启用 MongoDB 自动追踪
registerInstrumentations({
  instrumentations: [
    new MongoDBInstrumentation({
      enhancedDatabaseReporting: true,
    }),
  ],
});

const client = new MongoClient('mongodb://localhost:27017');
await client.connect();

const db = client.db('myapp');

// 所有操作自动追踪
const order = await db.collection('orders').findOne({ _id: orderId });
await db.collection('orders').insertOne({ orderId, userId, status: 'created' });

方式 4:消息队列追踪

Kafka 追踪

# Python + kafka-python + OpenTelemetry
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from kafka import KafkaProducer, KafkaConsumer

# 自动追踪 Kafka
KafkaInstrumentor().instrument()

producer = KafkaProducer(bootstrap_servers='localhost:9092')

# 发送消息(自动创建 span)
producer.send('orders', value=json.dumps(order_data).encode())

# 消费消息(自动创建 span)
consumer = KafkaConsumer('orders', bootstrap_servers='localhost:9092')
for message in consumer:
    order = json.loads(message.value)
    process_order(order)
// Go + Sarama + OpenTelemetry
import (
    "github.com/Shopify/sarama"
    "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
)

// Producer
func sendOrderEvent(ctx context.Context, order *Order) error {
    config := sarama.NewConfig()
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        return err
    }
    defer producer.Close()
    
    // 使用 otelsarama 包装
    producer = otelsarama.WrapSyncProducer(config, producer)
    
    msg := &sarama.ProducerMessage{
        Topic: "orders",
        Value: sarama.StringEncoder(order.ToJSON()),
    }
    
    // 注入追踪上下文
    otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(msg))
    
    _, _, err = producer.SendMessage(msg)
    return err
}

// Consumer
func consumeOrders() {
    config := sarama.NewConfig()
    consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "order-group", config)
    if err != nil {
        panic(err)
    }
    
    handler := otelsarama.WrapConsumerGroupHandler(&OrderHandler{})
    
    for {
        consumer.Consume(context.Background(), []string{"orders"}, handler)
    }
}

type OrderHandler struct{}

func (h *OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        // 提取追踪上下文
        ctx := otel.GetTextMapPropagator().Extract(
            context.Background(),
            otelsarama.NewConsumerMessageCarrier(message),
        )
        
        tracer := otel.Tracer("order-consumer")
        ctx, span := tracer.Start(ctx, "process-order")
        
        // 处理订单
        processOrder(ctx, message.Value)
        
        span.End()
        session.MarkMessage(message, "")
    }
    return nil
}

RabbitMQ 追踪

# Python + pika + OpenTelemetry
from opentelemetry import trace
import pika
import json

tracer = trace.get_tracer(__name__)

# 生产者
def send_order_event(order):
    with tracer.start_as_current_span("rabbitmq.publish") as span:
        span.set_attribute("messaging.system", "rabbitmq")
        span.set_attribute("messaging.destination", "orders")
        
        connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        channel = connection.channel()
        channel.queue_declare(queue='orders')
        
        # 注入追踪上下文到消息头
        headers = {}
        from opentelemetry.propagate import inject
        inject(headers)
        
        properties = pika.BasicProperties(
            headers=headers,
            content_type='application/json'
        )
        
        channel.basic_publish(
            exchange='',
            routing_key='orders',
            body=json.dumps(order),
            properties=properties
        )
        
        connection.close()
        span.set_attribute("messaging.message_id", order['id'])

# 消费者
def callback(ch, method, properties, body):
    # 提取追踪上下文
    from opentelemetry.propagate import extract
    ctx = extract(properties.headers or {})
    
    with tracer.start_as_current_span("rabbitmq.process", context=ctx) as span:
        span.set_attribute("messaging.system", "rabbitmq")
        span.set_attribute("messaging.operation", "process")
        
        order = json.loads(body)
        process_order(order)
        
        ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='orders')
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()

方式 5:异步任务追踪

Celery 任务追踪

# Python + Celery + OpenTelemetry
from celery import Celery
from opentelemetry import trace
from opentelemetry.propagate import inject, extract

app = Celery('tasks', broker='redis://localhost:6379')
tracer = trace.get_tracer(__name__)

@app.task
def process_order(order_id, trace_context=None):
    # 恢复追踪上下文
    ctx = extract(trace_context or {})
    
    with tracer.start_as_current_span("celery.process_order", context=ctx) as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("task.name", "process_order")
        
        # 执行任务
        order = fetch_order(order_id)
        validate_order(order)
        
        # 调用其他任务
        headers = {}
        inject(headers)
        send_notification.apply_async(
            args=[order_id],
            kwargs={'trace_context': headers}
        )
        
        span.add_event("order.processed")
        return {"status": "success", "order_id": order_id}

@app.task
def send_notification(order_id, trace_context=None):
    ctx = extract(trace_context or {})
    
    with tracer.start_as_current_span("celery.send_notification", context=ctx) as span:
        span.set_attribute("order.id", order_id)
        # 发送通知
        send_email(order_id)
        span.add_event("notification.sent")

# 触发任务时注入追踪上下文
def create_order_api():
    with tracer.start_as_current_span("api.create_order") as span:
        order_id = "12345"
        span.set_attribute("order.id", order_id)
        
        # 注入追踪上下文
        headers = {}
        inject(headers)
        
        # 异步处理
        process_order.apply_async(
            args=[order_id],
            kwargs={'trace_context': headers}
        )
        
        return {"status": "processing", "order_id": order_id}

完整微服务链路追踪示例

场景:电商订单系统

用户请求 -> API Gateway -> 订单服务 -> 库存服务
                                    -> 支付服务 -> 通知服务
                                    -> Redis缓存
                                    -> MySQL数据库

1. API Gateway(Go)

package main

import (
    "context"
    "net/http"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)

func createOrderHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    tracer := otel.Tracer("api-gateway")
    
    ctx, span := tracer.Start(ctx, "gateway.createOrder")
    defer span.End()
    
    span.SetAttributes(
        attribute.String("http.method", r.Method),
        attribute.String("http.url", r.URL.String()),
        attribute.String("user.id", r.Header.Get("X-User-ID")),
    )
    
    // 转发到订单服务
    req, _ := http.NewRequestWithContext(ctx, "POST", 
        "http://order-service:8080/api/orders", r.Body)
    
    // 传播追踪上下文
    otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
    
    client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
    resp, err := client.Do(req)
    if err != nil {
        span.RecordError(err)
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    defer resp.Body.Close()
    
    span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
    
    // 返回响应
    w.WriteHeader(resp.StatusCode)
    io.Copy(w, resp.Body)
}

func main() {
    initTracer()
    
    http.Handle("/api/orders", 
        otelhttp.NewHandler(http.HandlerFunc(createOrderHandler), "createOrder"))
    
    http.ListenAndServe(":8080", nil)
}

2. 订单服务(Python/Flask)

from flask import Flask, request, jsonify
from opentelemetry import trace
from opentelemetry.propagate import extract
import requests
import redis
import psycopg2

app = Flask(__name__)
tracer = trace.get_tracer(__name__)
redis_client = redis.Redis(host='redis')

@app.route('/api/orders', methods=['POST'])
def create_order():
    # 提取追踪上下文
    ctx = extract(request.headers)
    
    with tracer.start_as_current_span("order.create", context=ctx) as span:
        data = request.json
        order_id = data['order_id']
        user_id = data['user_id']
        items = data['items']
        
        span.set_attribute("order.id", order_id)
        span.set_attribute("user.id", user_id)
        span.set_attribute("items.count", len(items))
        
        # 1. 检查缓存
        with tracer.start_as_current_span("cache.check") as cache_span:
            cached = redis_client.get(f"order:{order_id}")
            if cached:
                cache_span.set_attribute("cache.hit", True)
                return jsonify(json.loads(cached))
            cache_span.set_attribute("cache.hit", False)
        
        # 2. 检查库存
        with tracer.start_as_current_span("inventory.check") as inv_span:
            inv_span.set_attribute("service.name", "inventory-service")
            
            # 传递追踪上下文
            headers = {}
            from opentelemetry.propagate import inject
            inject(headers)
            
            inv_resp = requests.post(
                'http://inventory-service:8080/api/inventory/check',
                json={'items': items},
                headers=headers
            )
            
            inv_span.set_attribute("http.status_code", inv_resp.status_code)
            
            if inv_resp.status_code != 200:
                span.set_attribute("order.status", "inventory_failed")
                return jsonify({"error": "Insufficient inventory"}), 400
        
        # 3. 保存订单到数据库
        with tracer.start_as_current_span("db.insert") as db_span:
            db_span.set_attribute("db.system", "postgresql")
            db_span.set_attribute("db.statement", "INSERT INTO orders...")
            
            conn = psycopg2.connect("dbname=orders user=postgres")
            cur = conn.cursor()
            cur.execute(
                "INSERT INTO orders (id, user_id, status) VALUES (%s, %s, %s)",
                (order_id, user_id, 'pending')
            )
            conn.commit()
            cur.close()
            conn.close()
            
            db_span.add_event("order.inserted")
        
        # 4. 调用支付服务
        with tracer.start_as_current_span("payment.process") as pay_span:
            pay_span.set_attribute("service.name", "payment-service")
            pay_span.set_attribute("payment.amount", data['amount'])
            
            headers = {}
            inject(headers)
            
            pay_resp = requests.post(
                'http://payment-service:8080/api/payment',
                json={'order_id': order_id, 'amount': data['amount']},
                headers=headers
            )
            
            pay_span.set_attribute("http.status_code", pay_resp.status_code)
            pay_span.set_attribute("payment.status", pay_resp.json()['status'])
        
        # 5. 缓存结果
        order = {
            'order_id': order_id,
            'user_id': user_id,
            'status': 'completed',
            'payment_status': pay_resp.json()['status']
        }
        
        with tracer.start_as_current_span("cache.set") as cache_span:
            redis_client.setex(
                f"order:{order_id}",
                3600,
                json.dumps(order)
            )
        
        span.set_attribute("order.status", "completed")
        span.add_event("order.completed", {
            "order.id": order_id,
            "payment.status": order['payment_status']
        })
        
        return jsonify(order)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

3. 库存服务(Node.js)

const express = require('express');
const { trace, context, propagation } = require('@opentelemetry/api');
const { MongoClient } = require('mongodb');

const app = express();
app.use(express.json());

const mongoClient = new MongoClient('mongodb://mongo:27017');
const tracer = trace.getTracer('inventory-service');

app.post('/api/inventory/check', async (req, res) => {
  // 提取追踪上下文
  const ctx = propagation.extract(context.active(), req.headers);
  
  const span = tracer.startSpan('inventory.check', undefined, ctx);
  
  try {
    const { items } = req.body;
    span.setAttribute('items.count', items.length);
    
    // 查询 MongoDB
    const dbSpan = tracer.startSpan('mongodb.query', { parent: span });
    dbSpan.setAttribute('db.system', 'mongodb');
    dbSpan.setAttribute('db.collection', 'inventory');
    
    const db = mongoClient.db('inventory');
    const collection = db.collection('items');
    
    const results = [];
    for (const item of items) {
      const doc = await collection.findOne({ sku: item.sku });
      
      if (!doc || doc.quantity < item.quantity) {
        dbSpan.setAttribute('inventory.sufficient', false);
        dbSpan.end();
        span.setAttribute('check.result', 'insufficient');
        span.end();
        
        return res.status(400).json({
          error: 'Insufficient inventory',
          sku: item.sku
        });
      }
      
      results.push({ sku: item.sku, available: doc.quantity });
    }
    
    dbSpan.setAttribute('inventory.sufficient', true);
    dbSpan.end();
    
    // 预留库存
    const reserveSpan = tracer.startSpan('inventory.reserve', { parent: span });
    
    for (const item of items) {
      await collection.updateOne(
        { sku: item.sku },
        { $inc: { quantity: -item.quantity, reserved: item.quantity } }
      );
    }
    
    reserveSpan.addEvent('inventory.reserved', {
      items_count: items.length
    });
    reserveSpan.end();
    
    span.setAttribute('check.result', 'success');
    span.end();
    
    res.json({ status: 'available', items: results });
  } catch (error) {
    span.recordException(error);
    span.setStatus({ code: 2, message: error.message });
    span.end();
    
    res.status(500).json({ error: error.message });
  }
});

app.listen(8080, () => {
  console.log('Inventory service listening on port 8080');
});

4. 支付服务(Java/Spring Boot)

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

@RestController
@RequestMapping("/api/payment")
public class PaymentController {
    
    private final Tracer tracer = GlobalOpenTelemetry.getTracer("payment-service");
    private final RestTemplate restTemplate;
    
    @PostMapping
    public PaymentResponse processPayment(
            @RequestBody PaymentRequest request,
            @RequestHeader Map<String, String> headers) {
        
        // 提取追踪上下文
        Context extractedContext = GlobalOpenTelemetry.getPropagators()
            .getTextMapPropagator()
            .extract(Context.current(), headers, new HeadersGetter());
        
        Span span = tracer.spanBuilder("payment.process")
            .setParent(extractedContext)
            .startSpan();
        
        try (var scope = span.makeCurrent()) {
            span.setAttribute("order.id", request.getOrderId());
            span.setAttribute("payment.amount", request.getAmount());
            span.setAttribute("payment.method", request.getMethod());
            
            // 1. 验证支付信息
            Span validateSpan = tracer.spanBuilder("payment.validate")
                .startSpan();
            validateSpan.setAttribute("validation.type", "card");
            
            boolean isValid = validatePaymentInfo(request);
            validateSpan.setAttribute("validation.result", isValid);
            validateSpan.end();
            
            if (!isValid) {
                span.setAttribute("payment.status", "invalid");
                throw new PaymentException("Invalid payment info");
            }
            
            // 2. 调用第三方支付网关
            Span gatewaySpan = tracer.spanBuilder("payment.gateway")
                .startSpan();
            gatewaySpan.setAttribute("gateway.provider", "stripe");
            
            String transactionId = callPaymentGateway(request);
            gatewaySpan.setAttribute("transaction.id", transactionId);
            gatewaySpan.addEvent("payment.charged");
            gatewaySpan.end();
            
            // 3. 保存支付记录
            Span dbSpan = tracer.spanBuilder("db.insert")
                .startSpan();
            dbSpan.setAttribute("db.system", "mysql");
            dbSpan.setAttribute("db.table", "payments");
            
            savePaymentRecord(request.getOrderId(), transactionId);
            dbSpan.end();
            
            // 4. 发送通知
            Span notifySpan = tracer.spanBuilder("notification.send")
                .startSpan();
            
            // 注入追踪上下文到 HTTP 请求
            HttpHeaders notifyHeaders = new HttpHeaders();
            GlobalOpenTelemetry.getPropagators()
                .getTextMapPropagator()
                .inject(Context.current(), notifyHeaders, HttpHeaders::set);
            
            HttpEntity<NotificationRequest> entity = new HttpEntity<>(
                new NotificationRequest(request.getOrderId(), "Payment successful"),
                notifyHeaders
            );
            
            restTemplate.postForObject(
                "http://notification-service:8080/api/notify",
                entity,
                NotificationResponse.class
            );
            
            notifySpan.setAttribute("notification.type", "payment_success");
            notifySpan.end();
            
            span.setAttribute("payment.status", "success");
            span.addEvent("payment.completed", Attributes.of(
                AttributeKey.stringKey("transaction.id"), transactionId
            ));
            
            return new PaymentResponse("success", transactionId);
            
        } catch (Exception e) {
            span.recordException(e);
            span.setAttribute("payment.status", "failed");
            throw e;
        } finally {
            span.end();
        }
    }
    
    private static class HeadersGetter implements TextMapGetter<Map<String, String>> {
        @Override
        public Iterable<String> keys(Map<String, String> carrier) {
            return carrier.keySet();
        }
        
        @Override
        public String get(Map<String, String> carrier, String key) {
            return carrier.get(key);
        }
    }
}

部署示例应用

1. 应用 Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
spec:
  replicas: 2
  selector:
    matchLabels:
      app: order-service
  template:
    metadata:
      labels:
        app: order-service
    spec:
      containers:
      - name: order-service
        image: myapp/order-service:v1.0
        env:
        - name: JAEGER_AGENT_HOST
          value: "jaeger-agent.observability.svc.cluster.local"
        - name: JAEGER_AGENT_PORT
          value: "6831"
        - name: JAEGER_SAMPLER_TYPE
          value: "const"
        - name: JAEGER_SAMPLER_PARAM
          value: "1"
        ports:
        - containerPort: 8080

2. 生成测试流量

apiVersion: batch/v1
kind: Job
metadata:
  name: load-test
spec:
  template:
    spec:
      containers:
      - name: curl
        image: curlimages/curl:latest
        command:
        - /bin/sh
        - -c
        - |
          for i in $(seq 1 1000); do
            curl http://order-service:8080/api/order
            sleep 0.1
          done
      restartPolicy: Never

Jaeger UI 使用

1. 查询追踪

访问 http://localhost:16686

搜索选项

  • Service:选择服务
  • Operation:选择操作(如 GET /api/order)
  • Tags:按标签过滤(如 http.status_code=500)
  • Lookback:时间范围(Last Hour, Last 24 Hours)
  • Min/Max Duration:按耗时筛选

2. 追踪详情

点击 Trace 查看:

Timeline View:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
API Gateway      [████████████] 500ms
  Auth Service   [██] 50ms
  Order Service  [█████████] 400ms
    DB Query     [████████] 350ms
    Cache        [█] 30ms
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

关键信息

  • Duration:每个 Span 的耗时
  • Tags:HTTP 方法、状态码、错误信息
  • Logs:事件日志
  • References:父子关系

3. 服务依赖图

查看 System Architecture 标签:

┌─────────────┐
│ API Gateway │
└──────┬──────┘
       │
   ┌───┴───┐
   │       │
   ▼       ▼
┌─────┐ ┌──────┐
│Auth │ │Order │
└─────┘ └───┬──┘
            │
        ┌───┴───┐
        │       │
        ▼       ▼
      ┌──┐   ┌────┐
      │DB│   │Cache│
      └──┘   └────┘

4. 比较追踪

选择多个 Trace 进行对比,分析性能差异:

Trace A (Fast):  API -> Auth (20ms) -> Order (50ms)
Trace B (Slow):  API -> Auth (20ms) -> Order (2000ms) ← 慢

高级配置

1. 采样策略

# 自适应采样
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: jaeger
spec:
  strategy: production
  sampling:
    options:
      default_strategy:
        type: probabilistic
        param: 0.1  # 10% 采样率
      per_operation_strategies:
        - operation: /health
          type: const
          param: 0  # 健康检查不采样
        - operation: /api/critical
          type: const
          param: 1  # 关键 API 100% 采样

2. 存储配置(ElasticSearch)

apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: jaeger
spec:
  storage:
    type: elasticsearch
    options:
      es:
        server-urls: http://elasticsearch:9200
        index-prefix: jaeger
        username: elastic
        password: changeme
        tls:
          ca: /path/to/ca.crt
    esIndexCleaner:
      enabled: true
      numberOfDays: 7
      schedule: "55 23 * * *"

3. 资源限制

apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
  name: jaeger
spec:
  collector:
    replicas: 3
    resources:
      requests:
        cpu: 500m
        memory: 1Gi
      limits:
        cpu: 2000m
        memory: 4Gi
    autoscale: true
    maxReplicas: 10
  query:
    replicas: 2
    resources:
      requests:
        cpu: 200m
        memory: 512Mi

与 Prometheus 集成

1. 暴露 Jaeger 指标

apiVersion: v1
kind: Service
metadata:
  name: jaeger-collector-metrics
  labels:
    app: jaeger
spec:
  ports:
  - name: metrics
    port: 14269
    targetPort: 14269
  selector:
    app: jaeger-collector

2. ServiceMonitor(Prometheus Operator)

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: jaeger-collector
spec:
  selector:
    matchLabels:
      app: jaeger
  endpoints:
  - port: metrics
    interval: 30s

3. Grafana Dashboard

# 导入 Jaeger 官方 Dashboard
# Dashboard ID: 10001, 10003, 10004

最佳实践

1. 采样策略

开发环境:  100% 采样
测试环境:  50% 采样
生产环境:  
  - 正常请求: 1-10% 采样
  - 错误请求: 100% 采样
  - 慢请求: 100% 采样

2. 追踪头传递

确保应用传递所有追踪相关头:

x-request-id
x-b3-traceid
x-b3-spanid
x-b3-parentspanid
x-b3-sampled
x-b3-flags
x-ot-span-context

3. 有意义的 Span 命名

# ❌ 不好
span.operation_name = "function1"

# ✅ 好
span.operation_name = "create-order"
span.set_tag("order.id", order_id)
span.set_tag("user.id", user_id)

4. 记录关键事件

span.log_kv({
    'event': 'db.query.start',
    'statement': 'SELECT * FROM orders WHERE id = ?'
})

# 执行查询

span.log_kv({
    'event': 'db.query.end',
    'rows_returned': 1
})

5. 错误标记

try:
    result = risky_operation()
except Exception as e:
    span.set_tag('error', True)
    span.log_kv({
        'event': 'error',
        'error.kind': type(e).__name__,
        'error.message': str(e),
        'stack': traceback.format_exc()
    })
    raise

故障排查

1. 没有追踪数据

# 检查 Jaeger Agent
kubectl get pods -n observability -l app=jaeger-agent

# 检查应用环境变量
kubectl exec <pod-name> -- env | grep JAEGER

# 查看 Collector 日志
kubectl logs -n observability -l app=jaeger-collector

# 测试连接
kubectl exec <pod-name> -- nc -zv jaeger-agent 6831

2. 追踪不完整

# 检查追踪头传递
curl -v http://service/api | grep -i x-b3

# 验证 Istio 配置
kubectl get configmap istio -n istio-system -o yaml | grep tracing

# 查看应用日志
kubectl logs <pod-name> | grep -i trace

3. UI 查询慢

# 检查 ElasticSearch 性能
kubectl exec -it <es-pod> -- curl localhost:9200/_cluster/health

# 优化索引
# 减少保留天数
# 增加 Query 服务副本数

性能考虑

1. 采样率建议

QPS < 100:    100% 采样
QPS 100-1K:   10-50% 采样
QPS 1K-10K:   1-10% 采样
QPS > 10K:    0.1-1% 采样

2. 资源消耗

每秒 1000 spans:
  - Agent: 50MB memory
  - Collector: 500MB memory
  - Storage: 10GB/day (压缩后)

3. 优化建议

  • 使用 Agent 本地缓冲
  • 批量发送 Spans
  • 合理设置 Span 大小限制
  • 定期清理旧数据

总结

Jaeger 核心能力:

功能 说明 价值
端到端追踪 完整请求链路 快速定位问题
性能分析 各服务耗时 优化性能瓶颈
依赖分析 服务调用关系 理解系统架构
根因分析 错误追踪 快速故障定位

实施步骤:

  1. 部署 Jaeger(All-in-One 或 Production)
  2. 配置 Istio 集成(自动追踪)
  3. 应用集成 OpenTelemetry
  4. 配置采样策略
  5. 监控和告警