分布式追踪实战
Jaeger 分布式追踪
什么是分布式追踪
在微服务架构中,一个用户请求可能经过多个服务才能完成。分布式追踪用于跟踪请求在各个服务间的调用路径和性能。
问题场景:
用户请求很慢,但不知道哪个服务慢?
├─ API Gateway (10ms)
├─ 用户服务 (50ms)
├─ 订单服务 (2000ms) ← 找到瓶颈!
├─ 支付服务 (100ms)
└─ 通知服务 (30ms)
核心概念
1. Trace(追踪)
一次完整的请求调用链:
Trace ID: abc123
├─ Span 1: API Gateway [100ms]
│ └─ Span 2: Auth Service [50ms]
├─ Span 3: Order Service [200ms]
│ ├─ Span 4: DB Query [150ms]
│ └─ Span 5: Cache Check [20ms]
└─ Span 6: Payment Service [80ms]
2. Span(跨度)
单个服务的处理单元:
Span:
TraceID: abc123
SpanID: span-001
ParentSpanID: null
Operation: GET /api/orders
StartTime: 2024-01-09 10:00:00.000
Duration: 200ms
Tags:
- http.method: GET
- http.status_code: 200
Logs:
- timestamp: 10:00:00.050
event: db.query.start
- timestamp: 10:00:00.200
event: db.query.end
3. 关键指标
- 延迟(Latency):请求总耗时
- 错误率(Error Rate):失败的请求比例
- 吞吐量(Throughput):每秒请求数
- 依赖关系:服务间的调用拓扑
Jaeger 架构
┌─────────────┐
│ Application │
│ + Jaeger │
│ Client │
└──────┬──────┘
│ UDP
▼
┌──────────────┐
│ Jaeger Agent │ (DaemonSet)
└──────┬───────┘
│ TCP/HTTP
▼
┌──────────────┐
│ Jaeger │
│ Collector │
└──────┬───────┘
│
▼
┌──────────────┐ ┌──────────────┐
│ Storage │ ←──→ │ Jaeger Query │
│ (ES/Cassandra)│ │ + UI │
└──────────────┘ └──────────────┘
组件:
- Jaeger Client:应用内嵌的追踪库
- Jaeger Agent:本地代理,收集 Spans
- Jaeger Collector:接收、验证、存储
- Storage:存储后端(ElasticSearch、Cassandra)
- Jaeger Query:查询服务和 UI
在 Kubernetes 中部署 Jaeger
方式 1:使用 Jaeger Operator
1. 安装 Operator
# 安装 Cert Manager(依赖)
kubectl apply -f https://github.com/cert-manager/cert-manager/releases/download/v1.13.0/cert-manager.yaml
# 安装 Jaeger Operator
kubectl create namespace observability
kubectl create -f https://github.com/jaegertracing/jaeger-operator/releases/download/v1.51.0/jaeger-operator.yaml -n observability
2. 部署 Jaeger All-in-One(开发环境)
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: simplest
namespace: observability
spec:
strategy: allInOne
allInOne:
image: jaegertracing/all-in-one:latest
options:
log-level: debug
storage:
type: memory
ingress:
enabled: true
ui:
options:
dependencies:
menuEnabled: true
应用配置:
kubectl apply -f jaeger-allinone.yaml
# 验证
kubectl get pods -n observability
kubectl get svc -n observability
3. 生产环境配置(使用 ElasticSearch)
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: jaeger-production
namespace: observability
spec:
strategy: production
storage:
type: elasticsearch
options:
es:
server-urls: http://elasticsearch:9200
index-prefix: jaeger
esIndexCleaner:
enabled: true
numberOfDays: 7
schedule: "55 23 * * *"
collector:
replicas: 3
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1000m
memory: 2Gi
query:
replicas: 2
resources:
requests:
cpu: 200m
memory: 512Mi
方式 2:使用 Helm 部署
# 添加 Helm 仓库
helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
helm repo update
# 部署(开发环境)
helm install jaeger jaegertracing/jaeger \
--namespace observability \
--create-namespace \
--set allInOne.enabled=true \
--set storage.type=memory \
--set agent.enabled=false
# 部署(生产环境)
helm install jaeger jaegertracing/jaeger \
--namespace observability \
--create-namespace \
--set provisionDataStore.cassandra=true \
--set storage.type=cassandra
访问 Jaeger UI
# Port Forward
kubectl port-forward -n observability svc/simplest-query 16686:16686
# 访问
open http://localhost:16686
与 Istio 集成
1. Istio 自动追踪
Istio 自动为所有服务生成追踪数据:
# 安装 Istio 时启用追踪
istioctl install --set profile=demo \
--set meshConfig.defaultConfig.tracing.zipkin.address=jaeger-collector.observability:9411 \
--set meshConfig.defaultConfig.tracing.sampling=100.0
2. 配置 Istio 追踪
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
meshConfig:
enableTracing: true
defaultConfig:
tracing:
sampling: 100.0 # 采样率 100%
zipkin:
address: jaeger-collector.observability:9411
extensionProviders:
- name: jaeger
zipkin:
service: jaeger-collector.observability
port: 9411
应用配置:
istioctl install -f istio-config.yaml
3. 在应用中传递追踪头
为了实现端到端追踪,应用需要传递追踪头:
# Python/Flask 示例
from flask import Flask, request
import requests
app = Flask(__name__)
@app.route('/order')
def create_order():
# 获取追踪头
headers = {}
for header in ['x-request-id', 'x-b3-traceid', 'x-b3-spanid',
'x-b3-parentspanid', 'x-b3-sampled', 'x-b3-flags']:
value = request.headers.get(header)
if value:
headers[header] = value
# 调用下游服务时传递追踪头
response = requests.get('http://payment-service/pay', headers=headers)
return response.json()
// Node.js/Express 示例
const express = require('express');
const axios = require('axios');
const app = express();
app.get('/order', async (req, res) => {
// 提取追踪头
const tracingHeaders = [
'x-request-id',
'x-b3-traceid',
'x-b3-spanid',
'x-b3-parentspanid',
'x-b3-sampled',
'x-b3-flags'
];
const headers = {};
tracingHeaders.forEach(header => {
if (req.headers[header]) {
headers[header] = req.headers[header];
}
});
// 调用下游服务
const response = await axios.get('http://payment-service/pay', { headers });
res.json(response.data);
});
应用代码集成
方式 1:使用 OpenTelemetry(推荐)
Python 应用
# 安装依赖
# pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-jaeger
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
# Flask 应用自动追踪
from flask import Flask
app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)
@app.route('/api/order')
def create_order():
# 自定义 Span
with tracer.start_as_current_span("process-order") as span:
span.set_attribute("order.id", "12345")
span.set_attribute("user.id", "user-001")
# 业务逻辑
result = process_order()
span.add_event("order.created", {"order_id": result.id})
return {"status": "success"}
Go 应用
// go get go.opentelemetry.io/otel
// go get go.opentelemetry.io/otel/exporters/jaeger
package main
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)
func initTracer() (*sdktrace.TracerProvider, error) {
// 创建 Jaeger Exporter
exporter, err := jaeger.New(
jaeger.WithAgentEndpoint(
jaeger.WithAgentHost("jaeger-agent"),
jaeger.WithAgentPort("6831"),
),
)
if err != nil {
return nil, err
}
// 创建 TracerProvider
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("order-service"),
)),
)
otel.SetTracerProvider(tp)
return tp, nil
}
func createOrder(ctx context.Context) {
tracer := otel.Tracer("order-service")
ctx, span := tracer.Start(ctx, "create-order")
defer span.End()
span.SetAttributes(
semconv.HTTPMethodKey.String("POST"),
semconv.HTTPURLKey.String("/api/orders"),
)
// 业务逻辑
processOrder(ctx)
}
Java 应用(Spring Boot)
<!-- pom.xml -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.32.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.32.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-jaeger</artifactId>
<version>1.32.0</version>
</dependency>
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.Span;
@RestController
public class OrderController {
private final Tracer tracer = GlobalOpenTelemetry.getTracer("order-service");
@GetMapping("/api/order")
public ResponseEntity<Order> createOrder() {
Span span = tracer.spanBuilder("create-order").startSpan();
try (Scope scope = span.makeCurrent()) {
span.setAttribute("order.id", orderId);
span.addEvent("Processing order");
// 业务逻辑
Order order = orderService.createOrder();
span.setAttribute("order.status", "created");
return ResponseEntity.ok(order);
} finally {
span.end();
}
}
}
方式 2:使用 Jaeger Client SDK
Python Jaeger Client
# pip install jaeger-client
from jaeger_client import Config
def init_tracer(service_name):
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'local_agent': {
'reporting_host': 'jaeger-agent',
'reporting_port': 6831,
},
'logging': True,
},
service_name=service_name,
)
return config.initialize_tracer()
tracer = init_tracer('order-service')
@app.route('/order')
def create_order():
with tracer.start_span('create-order') as span:
span.set_tag('order.id', '12345')
# 调用其他服务
with tracer.start_span('call-payment', child_of=span) as child_span:
payment_result = call_payment_service()
child_span.log_kv({'payment.status': payment_result.status})
return {'status': 'success'}
Rust 应用
// Cargo.toml
// [dependencies]
// opentelemetry = { version = "0.21", features = ["trace"] }
// opentelemetry-jaeger = "0.20"
// opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
// tracing = "0.1"
// tracing-opentelemetry = "0.22"
// tracing-subscriber = "0.3"
use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
use opentelemetry_jaeger::JaegerPipeline;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;
fn init_tracer() -> Result<(), Box<dyn std::error::Error>> {
global::set_text_map_propagator(TraceContextPropagator::new());
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("order-service")
.with_endpoint("jaeger-agent:6831")
.install_simple()?;
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = Registry::default().with(telemetry);
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
#[tracing::instrument]
async fn create_order(order_id: &str) -> Result<Order, OrderError> {
// 自动创建 span
tracing::info!(order.id = order_id, "Processing order");
// 调用数据库
let order = fetch_order_from_db(order_id).await?;
// 调用其他服务
let payment = process_payment(&order).await?;
tracing::info!(payment.status = ?payment.status, "Payment processed");
Ok(order)
}
#[tracing::instrument]
async fn fetch_order_from_db(order_id: &str) -> Result<Order, DbError> {
// 数据库操作会自动记录在 span 中
let order = db.query("SELECT * FROM orders WHERE id = $1")
.bind(order_id)
.fetch_one()
.await?;
Ok(order)
}
.NET/C# 应用
// NuGet 包
// Install-Package OpenTelemetry
// Install-Package OpenTelemetry.Exporter.Jaeger
// Install-Package OpenTelemetry.Extensions.Hosting
// Install-Package OpenTelemetry.Instrumentation.AspNetCore
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 配置 OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithTracing(tracerProviderBuilder =>
{
tracerProviderBuilder
.SetResourceBuilder(ResourceBuilder.CreateDefault()
.AddService("order-service")
.AddTelemetrySdk())
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddSqlClientInstrumentation()
.AddJaegerExporter(options =>
{
options.AgentHost = "jaeger-agent";
options.AgentPort = 6831;
});
});
var app = builder.Build();
// OrderController.cs
using System.Diagnostics;
[ApiController]
[Route("api/[controller]")]
public class OrderController : ControllerBase
{
private static readonly ActivitySource ActivitySource = new("OrderService");
private readonly IOrderService _orderService;
public OrderController(IOrderService orderService)
{
_orderService = orderService;
}
[HttpPost]
public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request)
{
// 创建自定义 Span
using var activity = ActivitySource.StartActivity("CreateOrder");
activity?.SetTag("order.id", request.OrderId);
activity?.SetTag("user.id", request.UserId);
try
{
var order = await _orderService.CreateOrderAsync(request);
activity?.AddEvent(new ActivityEvent("Order created"));
activity?.SetTag("order.status", order.Status);
return Ok(order);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);
throw;
}
}
}
// OrderService.cs
public class OrderService : IOrderService
{
private static readonly ActivitySource ActivitySource = new("OrderService");
private readonly HttpClient _httpClient;
private readonly IDbConnection _dbConnection;
public async Task<Order> CreateOrderAsync(OrderRequest request)
{
using var activity = ActivitySource.StartActivity("ProcessOrder");
// 数据库查询
using (var dbActivity = ActivitySource.StartActivity("DatabaseQuery"))
{
dbActivity?.SetTag("db.system", "postgresql");
dbActivity?.SetTag("db.statement", "INSERT INTO orders...");
await _dbConnection.ExecuteAsync(
"INSERT INTO orders (id, user_id) VALUES (@Id, @UserId)",
new { Id = request.OrderId, UserId = request.UserId }
);
}
// 调用支付服务
using (var paymentActivity = ActivitySource.StartActivity("CallPaymentService"))
{
paymentActivity?.SetTag("service.name", "payment-service");
var response = await _httpClient.PostAsJsonAsync(
"http://payment-service/api/payment",
new { OrderId = request.OrderId, Amount = request.Amount }
);
paymentActivity?.SetTag("http.status_code", (int)response.StatusCode);
}
return new Order { Id = request.OrderId, Status = "Created" };
}
}
PHP 应用(Laravel)
<?php
// composer require open-telemetry/opentelemetry
// composer require open-telemetry/exporter-jaeger
use OpenTelemetry\SDK\Trace\TracerProvider;
use OpenTelemetry\SDK\Trace\SpanExporter\JaegerExporter;
use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor;
use OpenTelemetry\SDK\Resource\ResourceInfo;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\API\Trace\Propagation\TraceContextPropagator;
// config/tracing.php
class TracingConfig
{
public static function init()
{
$exporter = new JaegerExporter(
'order-service',
'http://jaeger-collector:14268/api/traces'
);
$tracerProvider = new TracerProvider(
new SimpleSpanProcessor($exporter),
null,
ResourceInfo::create(Attributes::create([
'service.name' => 'order-service',
'service.version' => '1.0.0',
]))
);
return $tracerProvider->getTracer('order-service');
}
}
// app/Http/Controllers/OrderController.php
use OpenTelemetry\API\Trace\SpanKind;
use OpenTelemetry\API\Trace\StatusCode;
class OrderController extends Controller
{
private $tracer;
public function __construct()
{
$this->tracer = TracingConfig::init();
}
public function createOrder(Request $request)
{
// 创建 span
$span = $this->tracer->spanBuilder('create-order')
->setSpanKind(SpanKind::KIND_SERVER)
->startSpan();
$span->setAttribute('order.id', $request->input('order_id'));
$span->setAttribute('user.id', $request->input('user_id'));
try {
// 数据库操作
$dbSpan = $this->tracer->spanBuilder('db.query')
->setSpanKind(SpanKind::KIND_CLIENT)
->startSpan();
$dbSpan->setAttribute('db.system', 'mysql');
$dbSpan->setAttribute('db.statement', 'INSERT INTO orders...');
$order = Order::create([
'order_id' => $request->input('order_id'),
'user_id' => $request->input('user_id'),
]);
$dbSpan->end();
// 调用支付服务
$paymentSpan = $this->tracer->spanBuilder('call-payment-service')
->setSpanKind(SpanKind::KIND_CLIENT)
->startSpan();
$paymentSpan->setAttribute('service.name', 'payment-service');
$response = Http::withHeaders([
// 传递追踪头
'traceparent' => $this->getTraceParent($span),
])->post('http://payment-service/api/payment', [
'order_id' => $order->id,
'amount' => $request->input('amount'),
]);
$paymentSpan->setAttribute('http.status_code', $response->status());
$paymentSpan->end();
$span->addEvent('order.created', [
'order.id' => $order->id,
'order.status' => 'created',
]);
$span->setStatus(StatusCode::STATUS_OK);
return response()->json($order);
} catch (\Exception $e) {
$span->recordException($e);
$span->setStatus(StatusCode::STATUS_ERROR, $e->getMessage());
throw $e;
} finally {
$span->end();
}
}
}
方式 3:数据库追踪集成
MySQL/PostgreSQL 追踪
# Python + SQLAlchemy + OpenTelemetry
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from sqlalchemy import create_engine
# 自动追踪所有 SQL 查询
engine = create_engine('postgresql://user:pass@localhost/db')
SQLAlchemyInstrumentor().instrument(engine=engine)
# 使用时会自动创建 span
with engine.connect() as conn:
result = conn.execute("SELECT * FROM orders WHERE id = %s", order_id)
// Go + database/sql + OpenTelemetry
import (
"database/sql"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/contrib/instrumentation/database/sql/otelsql"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)
func initDB() (*sql.DB, error) {
// 使用 otelsql 包装驱动
driverName, err := otelsql.Register("postgres",
otelsql.WithAttributes(
semconv.DBSystemPostgreSQL,
),
)
if err != nil {
return nil, err
}
db, err := sql.Open(driverName, "postgres://user:pass@localhost/db")
if err != nil {
return nil, err
}
return db, nil
}
func queryOrder(ctx context.Context, orderID string) (*Order, error) {
// 所有数据库操作自动追踪
var order Order
err := db.QueryRowContext(ctx,
"SELECT * FROM orders WHERE id = $1", orderID,
).Scan(&order.ID, &order.UserID, &order.Status)
return &order, err
}
Redis 缓存追踪
# Python + redis-py + OpenTelemetry
from opentelemetry.instrumentation.redis import RedisInstrumentor
import redis
# 自动追踪 Redis 操作
RedisInstrumentor().instrument()
redis_client = redis.Redis(host='localhost', port=6379)
# 使用时会自动创建 span
redis_client.get('order:12345')
redis_client.setex('order:12345', 3600, json.dumps(order_data))
// Go + go-redis + OpenTelemetry
import (
"github.com/go-redis/redis/v8"
"go.opentelemetry.io/contrib/instrumentation/github.com/go-redis/redis/v8/otelredis"
)
func initRedis() *redis.Client {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
// 启用追踪
rdb.AddHook(otelredis.NewTracingHook())
return rdb
}
func getOrderFromCache(ctx context.Context, orderID string) (*Order, error) {
// Redis 操作自动追踪
val, err := rdb.Get(ctx, "order:"+orderID).Result()
if err != nil {
return nil, err
}
var order Order
json.Unmarshal([]byte(val), &order)
return &order, nil
}
MongoDB 追踪
// Node.js + MongoDB + OpenTelemetry
const { MongoClient } = require('mongodb');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { MongoDBInstrumentation } = require('@opentelemetry/instrumentation-mongodb');
// 启用 MongoDB 自动追踪
registerInstrumentations({
instrumentations: [
new MongoDBInstrumentation({
enhancedDatabaseReporting: true,
}),
],
});
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
// 所有操作自动追踪
const order = await db.collection('orders').findOne({ _id: orderId });
await db.collection('orders').insertOne({ orderId, userId, status: 'created' });
方式 4:消息队列追踪
Kafka 追踪
# Python + kafka-python + OpenTelemetry
from opentelemetry.instrumentation.kafka import KafkaInstrumentor
from kafka import KafkaProducer, KafkaConsumer
# 自动追踪 Kafka
KafkaInstrumentor().instrument()
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 发送消息(自动创建 span)
producer.send('orders', value=json.dumps(order_data).encode())
# 消费消息(自动创建 span)
consumer = KafkaConsumer('orders', bootstrap_servers='localhost:9092')
for message in consumer:
order = json.loads(message.value)
process_order(order)
// Go + Sarama + OpenTelemetry
import (
"github.com/Shopify/sarama"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
)
// Producer
func sendOrderEvent(ctx context.Context, order *Order) error {
config := sarama.NewConfig()
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
return err
}
defer producer.Close()
// 使用 otelsarama 包装
producer = otelsarama.WrapSyncProducer(config, producer)
msg := &sarama.ProducerMessage{
Topic: "orders",
Value: sarama.StringEncoder(order.ToJSON()),
}
// 注入追踪上下文
otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(msg))
_, _, err = producer.SendMessage(msg)
return err
}
// Consumer
func consumeOrders() {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "order-group", config)
if err != nil {
panic(err)
}
handler := otelsarama.WrapConsumerGroupHandler(&OrderHandler{})
for {
consumer.Consume(context.Background(), []string{"orders"}, handler)
}
}
type OrderHandler struct{}
func (h *OrderHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
// 提取追踪上下文
ctx := otel.GetTextMapPropagator().Extract(
context.Background(),
otelsarama.NewConsumerMessageCarrier(message),
)
tracer := otel.Tracer("order-consumer")
ctx, span := tracer.Start(ctx, "process-order")
// 处理订单
processOrder(ctx, message.Value)
span.End()
session.MarkMessage(message, "")
}
return nil
}
RabbitMQ 追踪
# Python + pika + OpenTelemetry
from opentelemetry import trace
import pika
import json
tracer = trace.get_tracer(__name__)
# 生产者
def send_order_event(order):
with tracer.start_as_current_span("rabbitmq.publish") as span:
span.set_attribute("messaging.system", "rabbitmq")
span.set_attribute("messaging.destination", "orders")
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='orders')
# 注入追踪上下文到消息头
headers = {}
from opentelemetry.propagate import inject
inject(headers)
properties = pika.BasicProperties(
headers=headers,
content_type='application/json'
)
channel.basic_publish(
exchange='',
routing_key='orders',
body=json.dumps(order),
properties=properties
)
connection.close()
span.set_attribute("messaging.message_id", order['id'])
# 消费者
def callback(ch, method, properties, body):
# 提取追踪上下文
from opentelemetry.propagate import extract
ctx = extract(properties.headers or {})
with tracer.start_as_current_span("rabbitmq.process", context=ctx) as span:
span.set_attribute("messaging.system", "rabbitmq")
span.set_attribute("messaging.operation", "process")
order = json.loads(body)
process_order(order)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='orders')
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
方式 5:异步任务追踪
Celery 任务追踪
# Python + Celery + OpenTelemetry
from celery import Celery
from opentelemetry import trace
from opentelemetry.propagate import inject, extract
app = Celery('tasks', broker='redis://localhost:6379')
tracer = trace.get_tracer(__name__)
@app.task
def process_order(order_id, trace_context=None):
# 恢复追踪上下文
ctx = extract(trace_context or {})
with tracer.start_as_current_span("celery.process_order", context=ctx) as span:
span.set_attribute("order.id", order_id)
span.set_attribute("task.name", "process_order")
# 执行任务
order = fetch_order(order_id)
validate_order(order)
# 调用其他任务
headers = {}
inject(headers)
send_notification.apply_async(
args=[order_id],
kwargs={'trace_context': headers}
)
span.add_event("order.processed")
return {"status": "success", "order_id": order_id}
@app.task
def send_notification(order_id, trace_context=None):
ctx = extract(trace_context or {})
with tracer.start_as_current_span("celery.send_notification", context=ctx) as span:
span.set_attribute("order.id", order_id)
# 发送通知
send_email(order_id)
span.add_event("notification.sent")
# 触发任务时注入追踪上下文
def create_order_api():
with tracer.start_as_current_span("api.create_order") as span:
order_id = "12345"
span.set_attribute("order.id", order_id)
# 注入追踪上下文
headers = {}
inject(headers)
# 异步处理
process_order.apply_async(
args=[order_id],
kwargs={'trace_context': headers}
)
return {"status": "processing", "order_id": order_id}
完整微服务链路追踪示例
场景:电商订单系统
用户请求 -> API Gateway -> 订单服务 -> 库存服务
-> 支付服务 -> 通知服务
-> Redis缓存
-> MySQL数据库
1. API Gateway(Go)
package main
import (
"context"
"net/http"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
func createOrderHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
tracer := otel.Tracer("api-gateway")
ctx, span := tracer.Start(ctx, "gateway.createOrder")
defer span.End()
span.SetAttributes(
attribute.String("http.method", r.Method),
attribute.String("http.url", r.URL.String()),
attribute.String("user.id", r.Header.Get("X-User-ID")),
)
// 转发到订单服务
req, _ := http.NewRequestWithContext(ctx, "POST",
"http://order-service:8080/api/orders", r.Body)
// 传播追踪上下文
otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
resp, err := client.Do(req)
if err != nil {
span.RecordError(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
// 返回响应
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func main() {
initTracer()
http.Handle("/api/orders",
otelhttp.NewHandler(http.HandlerFunc(createOrderHandler), "createOrder"))
http.ListenAndServe(":8080", nil)
}
2. 订单服务(Python/Flask)
from flask import Flask, request, jsonify
from opentelemetry import trace
from opentelemetry.propagate import extract
import requests
import redis
import psycopg2
app = Flask(__name__)
tracer = trace.get_tracer(__name__)
redis_client = redis.Redis(host='redis')
@app.route('/api/orders', methods=['POST'])
def create_order():
# 提取追踪上下文
ctx = extract(request.headers)
with tracer.start_as_current_span("order.create", context=ctx) as span:
data = request.json
order_id = data['order_id']
user_id = data['user_id']
items = data['items']
span.set_attribute("order.id", order_id)
span.set_attribute("user.id", user_id)
span.set_attribute("items.count", len(items))
# 1. 检查缓存
with tracer.start_as_current_span("cache.check") as cache_span:
cached = redis_client.get(f"order:{order_id}")
if cached:
cache_span.set_attribute("cache.hit", True)
return jsonify(json.loads(cached))
cache_span.set_attribute("cache.hit", False)
# 2. 检查库存
with tracer.start_as_current_span("inventory.check") as inv_span:
inv_span.set_attribute("service.name", "inventory-service")
# 传递追踪上下文
headers = {}
from opentelemetry.propagate import inject
inject(headers)
inv_resp = requests.post(
'http://inventory-service:8080/api/inventory/check',
json={'items': items},
headers=headers
)
inv_span.set_attribute("http.status_code", inv_resp.status_code)
if inv_resp.status_code != 200:
span.set_attribute("order.status", "inventory_failed")
return jsonify({"error": "Insufficient inventory"}), 400
# 3. 保存订单到数据库
with tracer.start_as_current_span("db.insert") as db_span:
db_span.set_attribute("db.system", "postgresql")
db_span.set_attribute("db.statement", "INSERT INTO orders...")
conn = psycopg2.connect("dbname=orders user=postgres")
cur = conn.cursor()
cur.execute(
"INSERT INTO orders (id, user_id, status) VALUES (%s, %s, %s)",
(order_id, user_id, 'pending')
)
conn.commit()
cur.close()
conn.close()
db_span.add_event("order.inserted")
# 4. 调用支付服务
with tracer.start_as_current_span("payment.process") as pay_span:
pay_span.set_attribute("service.name", "payment-service")
pay_span.set_attribute("payment.amount", data['amount'])
headers = {}
inject(headers)
pay_resp = requests.post(
'http://payment-service:8080/api/payment',
json={'order_id': order_id, 'amount': data['amount']},
headers=headers
)
pay_span.set_attribute("http.status_code", pay_resp.status_code)
pay_span.set_attribute("payment.status", pay_resp.json()['status'])
# 5. 缓存结果
order = {
'order_id': order_id,
'user_id': user_id,
'status': 'completed',
'payment_status': pay_resp.json()['status']
}
with tracer.start_as_current_span("cache.set") as cache_span:
redis_client.setex(
f"order:{order_id}",
3600,
json.dumps(order)
)
span.set_attribute("order.status", "completed")
span.add_event("order.completed", {
"order.id": order_id,
"payment.status": order['payment_status']
})
return jsonify(order)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
3. 库存服务(Node.js)
const express = require('express');
const { trace, context, propagation } = require('@opentelemetry/api');
const { MongoClient } = require('mongodb');
const app = express();
app.use(express.json());
const mongoClient = new MongoClient('mongodb://mongo:27017');
const tracer = trace.getTracer('inventory-service');
app.post('/api/inventory/check', async (req, res) => {
// 提取追踪上下文
const ctx = propagation.extract(context.active(), req.headers);
const span = tracer.startSpan('inventory.check', undefined, ctx);
try {
const { items } = req.body;
span.setAttribute('items.count', items.length);
// 查询 MongoDB
const dbSpan = tracer.startSpan('mongodb.query', { parent: span });
dbSpan.setAttribute('db.system', 'mongodb');
dbSpan.setAttribute('db.collection', 'inventory');
const db = mongoClient.db('inventory');
const collection = db.collection('items');
const results = [];
for (const item of items) {
const doc = await collection.findOne({ sku: item.sku });
if (!doc || doc.quantity < item.quantity) {
dbSpan.setAttribute('inventory.sufficient', false);
dbSpan.end();
span.setAttribute('check.result', 'insufficient');
span.end();
return res.status(400).json({
error: 'Insufficient inventory',
sku: item.sku
});
}
results.push({ sku: item.sku, available: doc.quantity });
}
dbSpan.setAttribute('inventory.sufficient', true);
dbSpan.end();
// 预留库存
const reserveSpan = tracer.startSpan('inventory.reserve', { parent: span });
for (const item of items) {
await collection.updateOne(
{ sku: item.sku },
{ $inc: { quantity: -item.quantity, reserved: item.quantity } }
);
}
reserveSpan.addEvent('inventory.reserved', {
items_count: items.length
});
reserveSpan.end();
span.setAttribute('check.result', 'success');
span.end();
res.json({ status: 'available', items: results });
} catch (error) {
span.recordException(error);
span.setStatus({ code: 2, message: error.message });
span.end();
res.status(500).json({ error: error.message });
}
});
app.listen(8080, () => {
console.log('Inventory service listening on port 8080');
});
4. 支付服务(Java/Spring Boot)
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
@RestController
@RequestMapping("/api/payment")
public class PaymentController {
private final Tracer tracer = GlobalOpenTelemetry.getTracer("payment-service");
private final RestTemplate restTemplate;
@PostMapping
public PaymentResponse processPayment(
@RequestBody PaymentRequest request,
@RequestHeader Map<String, String> headers) {
// 提取追踪上下文
Context extractedContext = GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.extract(Context.current(), headers, new HeadersGetter());
Span span = tracer.spanBuilder("payment.process")
.setParent(extractedContext)
.startSpan();
try (var scope = span.makeCurrent()) {
span.setAttribute("order.id", request.getOrderId());
span.setAttribute("payment.amount", request.getAmount());
span.setAttribute("payment.method", request.getMethod());
// 1. 验证支付信息
Span validateSpan = tracer.spanBuilder("payment.validate")
.startSpan();
validateSpan.setAttribute("validation.type", "card");
boolean isValid = validatePaymentInfo(request);
validateSpan.setAttribute("validation.result", isValid);
validateSpan.end();
if (!isValid) {
span.setAttribute("payment.status", "invalid");
throw new PaymentException("Invalid payment info");
}
// 2. 调用第三方支付网关
Span gatewaySpan = tracer.spanBuilder("payment.gateway")
.startSpan();
gatewaySpan.setAttribute("gateway.provider", "stripe");
String transactionId = callPaymentGateway(request);
gatewaySpan.setAttribute("transaction.id", transactionId);
gatewaySpan.addEvent("payment.charged");
gatewaySpan.end();
// 3. 保存支付记录
Span dbSpan = tracer.spanBuilder("db.insert")
.startSpan();
dbSpan.setAttribute("db.system", "mysql");
dbSpan.setAttribute("db.table", "payments");
savePaymentRecord(request.getOrderId(), transactionId);
dbSpan.end();
// 4. 发送通知
Span notifySpan = tracer.spanBuilder("notification.send")
.startSpan();
// 注入追踪上下文到 HTTP 请求
HttpHeaders notifyHeaders = new HttpHeaders();
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(Context.current(), notifyHeaders, HttpHeaders::set);
HttpEntity<NotificationRequest> entity = new HttpEntity<>(
new NotificationRequest(request.getOrderId(), "Payment successful"),
notifyHeaders
);
restTemplate.postForObject(
"http://notification-service:8080/api/notify",
entity,
NotificationResponse.class
);
notifySpan.setAttribute("notification.type", "payment_success");
notifySpan.end();
span.setAttribute("payment.status", "success");
span.addEvent("payment.completed", Attributes.of(
AttributeKey.stringKey("transaction.id"), transactionId
));
return new PaymentResponse("success", transactionId);
} catch (Exception e) {
span.recordException(e);
span.setAttribute("payment.status", "failed");
throw e;
} finally {
span.end();
}
}
private static class HeadersGetter implements TextMapGetter<Map<String, String>> {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
}
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
}
}
部署示例应用
1. 应用 Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: order-service
spec:
replicas: 2
selector:
matchLabels:
app: order-service
template:
metadata:
labels:
app: order-service
spec:
containers:
- name: order-service
image: myapp/order-service:v1.0
env:
- name: JAEGER_AGENT_HOST
value: "jaeger-agent.observability.svc.cluster.local"
- name: JAEGER_AGENT_PORT
value: "6831"
- name: JAEGER_SAMPLER_TYPE
value: "const"
- name: JAEGER_SAMPLER_PARAM
value: "1"
ports:
- containerPort: 8080
2. 生成测试流量
apiVersion: batch/v1
kind: Job
metadata:
name: load-test
spec:
template:
spec:
containers:
- name: curl
image: curlimages/curl:latest
command:
- /bin/sh
- -c
- |
for i in $(seq 1 1000); do
curl http://order-service:8080/api/order
sleep 0.1
done
restartPolicy: Never
Jaeger UI 使用
1. 查询追踪
访问 http://localhost:16686:
搜索选项:
- Service:选择服务
- Operation:选择操作(如 GET /api/order)
- Tags:按标签过滤(如 http.status_code=500)
- Lookback:时间范围(Last Hour, Last 24 Hours)
- Min/Max Duration:按耗时筛选
2. 追踪详情
点击 Trace 查看:
Timeline View:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
API Gateway [████████████] 500ms
Auth Service [██] 50ms
Order Service [█████████] 400ms
DB Query [████████] 350ms
Cache [█] 30ms
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
关键信息:
- Duration:每个 Span 的耗时
- Tags:HTTP 方法、状态码、错误信息
- Logs:事件日志
- References:父子关系
3. 服务依赖图
查看 System Architecture 标签:
┌─────────────┐
│ API Gateway │
└──────┬──────┘
│
┌───┴───┐
│ │
▼ ▼
┌─────┐ ┌──────┐
│Auth │ │Order │
└─────┘ └───┬──┘
│
┌───┴───┐
│ │
▼ ▼
┌──┐ ┌────┐
│DB│ │Cache│
└──┘ └────┘
4. 比较追踪
选择多个 Trace 进行对比,分析性能差异:
Trace A (Fast): API -> Auth (20ms) -> Order (50ms)
Trace B (Slow): API -> Auth (20ms) -> Order (2000ms) ← 慢
高级配置
1. 采样策略
# 自适应采样
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: jaeger
spec:
strategy: production
sampling:
options:
default_strategy:
type: probabilistic
param: 0.1 # 10% 采样率
per_operation_strategies:
- operation: /health
type: const
param: 0 # 健康检查不采样
- operation: /api/critical
type: const
param: 1 # 关键 API 100% 采样
2. 存储配置(ElasticSearch)
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: jaeger
spec:
storage:
type: elasticsearch
options:
es:
server-urls: http://elasticsearch:9200
index-prefix: jaeger
username: elastic
password: changeme
tls:
ca: /path/to/ca.crt
esIndexCleaner:
enabled: true
numberOfDays: 7
schedule: "55 23 * * *"
3. 资源限制
apiVersion: jaegertracing.io/v1
kind: Jaeger
metadata:
name: jaeger
spec:
collector:
replicas: 3
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 2000m
memory: 4Gi
autoscale: true
maxReplicas: 10
query:
replicas: 2
resources:
requests:
cpu: 200m
memory: 512Mi
与 Prometheus 集成
1. 暴露 Jaeger 指标
apiVersion: v1
kind: Service
metadata:
name: jaeger-collector-metrics
labels:
app: jaeger
spec:
ports:
- name: metrics
port: 14269
targetPort: 14269
selector:
app: jaeger-collector
2. ServiceMonitor(Prometheus Operator)
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: jaeger-collector
spec:
selector:
matchLabels:
app: jaeger
endpoints:
- port: metrics
interval: 30s
3. Grafana Dashboard
# 导入 Jaeger 官方 Dashboard
# Dashboard ID: 10001, 10003, 10004
最佳实践
1. 采样策略
开发环境: 100% 采样
测试环境: 50% 采样
生产环境:
- 正常请求: 1-10% 采样
- 错误请求: 100% 采样
- 慢请求: 100% 采样
2. 追踪头传递
确保应用传递所有追踪相关头:
x-request-id
x-b3-traceid
x-b3-spanid
x-b3-parentspanid
x-b3-sampled
x-b3-flags
x-ot-span-context
3. 有意义的 Span 命名
# ❌ 不好
span.operation_name = "function1"
# ✅ 好
span.operation_name = "create-order"
span.set_tag("order.id", order_id)
span.set_tag("user.id", user_id)
4. 记录关键事件
span.log_kv({
'event': 'db.query.start',
'statement': 'SELECT * FROM orders WHERE id = ?'
})
# 执行查询
span.log_kv({
'event': 'db.query.end',
'rows_returned': 1
})
5. 错误标记
try:
result = risky_operation()
except Exception as e:
span.set_tag('error', True)
span.log_kv({
'event': 'error',
'error.kind': type(e).__name__,
'error.message': str(e),
'stack': traceback.format_exc()
})
raise
故障排查
1. 没有追踪数据
# 检查 Jaeger Agent
kubectl get pods -n observability -l app=jaeger-agent
# 检查应用环境变量
kubectl exec <pod-name> -- env | grep JAEGER
# 查看 Collector 日志
kubectl logs -n observability -l app=jaeger-collector
# 测试连接
kubectl exec <pod-name> -- nc -zv jaeger-agent 6831
2. 追踪不完整
# 检查追踪头传递
curl -v http://service/api | grep -i x-b3
# 验证 Istio 配置
kubectl get configmap istio -n istio-system -o yaml | grep tracing
# 查看应用日志
kubectl logs <pod-name> | grep -i trace
3. UI 查询慢
# 检查 ElasticSearch 性能
kubectl exec -it <es-pod> -- curl localhost:9200/_cluster/health
# 优化索引
# 减少保留天数
# 增加 Query 服务副本数
性能考虑
1. 采样率建议
QPS < 100: 100% 采样
QPS 100-1K: 10-50% 采样
QPS 1K-10K: 1-10% 采样
QPS > 10K: 0.1-1% 采样
2. 资源消耗
每秒 1000 spans:
- Agent: 50MB memory
- Collector: 500MB memory
- Storage: 10GB/day (压缩后)
3. 优化建议
- 使用 Agent 本地缓冲
- 批量发送 Spans
- 合理设置 Span 大小限制
- 定期清理旧数据
总结
Jaeger 核心能力:
| 功能 | 说明 | 价值 |
|---|---|---|
| 端到端追踪 | 完整请求链路 | 快速定位问题 |
| 性能分析 | 各服务耗时 | 优化性能瓶颈 |
| 依赖分析 | 服务调用关系 | 理解系统架构 |
| 根因分析 | 错误追踪 | 快速故障定位 |
实施步骤:
- 部署 Jaeger(All-in-One 或 Production)
- 配置 Istio 集成(自动追踪)
- 应用集成 OpenTelemetry
- 配置采样策略
- 监控和告警