uri_group、event_class 实时统计请求速率,支持 Grafana 告警order 事件标签,支持订单成功率/失败率/异常原因查询duration 字段提取,无需额外 APM| 阶段 | CPU | 内存 | 采集延迟 |
|---|---|---|---|
| 原始全量 JSON 解析(Promtail/无过滤) | 150-200% | 400MB | >30s |
| 本方案(Vector + AsyncAppender) | 8-15% | 80-100MB | <2s |
┌─────────────────────────────────────────────────────────────┐
│ 应用端(Spring 微服务) │
│ │
│ 请求进来 → LoggingFilter/Interceptor(统一设置 MDC) │
│ → TraceId、uri、uri_group、event、start_time │
│ → 业务代码(无改动) │
│ → log.info 等出 JSON(Logback Async JSON) │
└─────────────────────────────────────────────────────────────┘
↓
┌──────────────┐ ┌──────────────────────────────┐
│ 容器运行时 │ → 日志流 → │ 采集端(Vector Agent) │
└──────────────┘ │ parse_json → remap VRL → filter │
│ ├─ 导出 Loki tags(env/app/event_class/uri_group/status)
│ ├─ 聚合出 Prometheus metrics(requests_total/orders_failed)
│ └─ 采样/限流/drop DEBUG TRACE
└──────────────┬──────────────┘
↓
┌───────────────────────────────┐
│ Loki(日志存储) │
│ Prometheus(metric store) │
└──────────────┬────────────────┘
↓
┌───────────────────────────────┐
│ Grafana(Dashboards + Alerts) │
│ • QPS / orders / latency │
│ • 基数门禁误差可视化 │
└───────────────────────────────┘
| 名称 | 说明 | 预期基数 | 是否做 labels |
|---|---|---|---|
env |
环境:prod/stage/dev | ~3 | 是 |
app |
服务名(shop-recycle-xxx) | <50 | 是 |
level |
日志级别 INFO/WARN/ERROR | ~5 | 是 |
event_class |
业务事件类别(order/api/audit) | <20 | 是 |
uri_group |
规范化 URI(/order/*) | <100 | 是 |
status |
success/client_error/server_error | ~5 | 是 |
userId、orderId:高基数,留在 message 供查询traceId、spanId:跟踪系统用,不做标签error、exception:自由文本,保留原文msg:应用日志内容,不做 label业务部署前运行脚本,统计 candidate labels(event_class、uri_group 等)24h 唯一值,超过 5000 即阻断发布。
#!/bin/bash
set -euo pipefail
LABELS=("event_class" "uri_group" "status")
for label in "${LABELS[@]}"; do
COUNT=$(logcli query '{env="prod"}' --since=24h -o raw |
jq -r ".[][$label]" | sort | uniq | wc -l)
if [ "$COUNT" -gt 5000 ]; then
echo "❌ label $label 基数 $COUNT 超过 5000,拒绝发布"
exit 1
fi
done
echo "✅ label 基数检查通过"
package com.ssm.shop.common.web.interceptor;
import org.slf4j.MDC;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.UUID;
public class LoggingMdcInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = MDC.get("traceId");
if (traceId == null) {
traceId = request.getHeader("X-B3-TraceId");
if (traceId == null) {
traceId = UUID.randomUUID().toString();
}
MDC.put("traceId", traceId);
}
String uri = request.getRequestURI();
MDC.put("uri", uri);
MDC.put("uri_group", normalizeUri(uri));
String userId = request.getHeader("X-User-Id");
if (userId != null) {
MDC.put("userId", userId);
}
MDC.put("event_class", deriveEventClass(uri));
MDC.put("start_time", String.valueOf(System.currentTimeMillis()));
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
String startTime = MDC.get("start_time");
if (startTime != null) {
long duration = System.currentTimeMillis() - Long.parseLong(startTime);
MDC.put("duration", String.valueOf(duration));
}
if (ex != null || response.getStatus() >= 400) {
MDC.put("error", ex != null ? ex.toString() : "HTTP_" + response.getStatus());
}
if (response.getStatus() >= 400) {
MDC.put("status", response.getStatus() >= 500 ? "server_error" : "client_error");
} else {
MDC.put("status", "success");
}
MDC.clear();
}
private String normalizeUri(String uri) {
if (uri.startsWith("/order/")) {
return "/order/*";
}
if (uri.startsWith("/payment/")) {
return "/payment/*";
}
return uri;
}
private String deriveEventClass(String uri) {
if (uri.contains("order")) {
return "order";
}
if (uri.contains("login")) {
return "auth";
}
return "api";
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProperty name="app" source="spring.application.name" />
<springProperty name="env" source="spring.profiles.active" defaultValue="prod" />
<appender name="ASYNC_JSON" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<discardingThreshold>0</discardingThreshold>
<appender-ref ref="JSON" />
</appender>
<appender name="JSON" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>true</includeContext>
<customFields>{"app":"${app}","env":"${env}"}</customFields>
<provider class="net.logstash.logback.composite.loggingevent.LoggingEventPatternJsonProvider">
<pattern>
{
"ts":"%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}",
"level":"%level",
"logger":"%logger",
"msg":"%msg",
"traceId":"%X{traceId:-}",
"uri":"%X{uri:-}",
"uri_group":"%X{uri_group:-}",
"duration":"%X{duration:-}",
"userId":"%X{userId:-}",
"event":"%X{event_class:-}",
"error":"%X{error:-}",
"status":"%X{status:-}",
"thread":"%thread"
}
</pattern>
</provider>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="ASYNC_JSON" />
</root>
</configuration>
[sources.kubernetes_logs]
type = "kubernetes_logs"
labels_to_fields = ["namespace", "pod" ]
[transforms.parse_json]
type = "remap"
inputs = ["kubernetes_logs"]
source = '''
parsed = parse_json!(.message)
.ts = parsed.ts
.level = parsed.level
.app = parsed.app
.env = parsed.env
.traceId = parsed.traceId
.uri = parsed.uri
.uri_group = parsed.uri_group
.duration_ms = to_int!(parsed.duration, 0)
.userId = parsed.userId
.event = parsed.event
.error = parsed.error
.status = parsed.status
.event_class = parsed.event
if starts_with(.uri_group, "/order") {
.event_class = "order"
}
'''
[transforms.filter_levels]
type = "filter"
inputs = ["parse_json"]
condition = '.level != "DEBUG" && .level != "TRACE"'
[sinks.loki]
type = "loki"
inputs = ["filter_levels"]
endpoint = "http://loki:3100"
encoding.codec = "json"
labels.env = "{{ env }}"
labels.app = "{{ app }}"
labels.level = "{{ level }}"
labels.event_class = "{{ event_class }}"
labels.uri_group = "{{ uri_group }}"
labels.status = "{{ status }}"
[transforms.to_metrics]
type = "log_to_metric"
inputs = ["filter_levels"]
[[transforms.to_metrics.metrics]]
type = "counter"
field = "message"
name = "shop_recycle_requests_total"
tags.app = "{{ app }}"
tags.env = "{{ env }}"
tags.uri_group = "{{ uri_group }}"
[[transforms.to_metrics.metrics]]
type = "counter"
field = "message"
name = "shop_recycle_requests_errors_total"
filter.condition = '.status == "error"'
tags.app = "{{ app }}"
tags.env = "{{ env }}"
[[transforms.to_metrics.metrics]]
type = "histogram"
field = "duration_ms"
name = "shop_recycle_request_duration_ms"
tags.app = "{{ app }}"
tags.uri_group = "{{ uri_group }}"
[[transforms.to_metrics.metrics]]
type = "counter"
field = "message"
name = "shop_recycle_orders_total"
filter.condition = '.event_class == "order"'
tags.app = "{{ app }}"
[[transforms.to_metrics.metrics]]
type = "counter"
field = "message"
name = "shop_recycle_orders_failed_total"
filter.condition = '.event_class == "order" && status == "error"'
tags.app = "{{ app }}"
[sinks.prometheus]
type = "prometheus_exporter"
inputs = ["to_metrics"]
address = "0.0.0.0:9598"
default_namespace = "shop_recycle"
@Service
public class OrderService {
private static final Logger log = LoggerFactory.getLogger(OrderService.class);
public void createOrder(OrderCreateRequest req) {
log.info("订单创建逻辑开始");
try {
log.info("订单创建完成");
MDC.put("event", "order_created");
log.info("订单已入库");
} catch (Exception e) {
MDC.put("event", "order_create_failed");
log.error("订单创建失败", e);
throw e;
}
}
}
QPS(按 uri_group):
sum by (uri_group) (
rate({app="shop-recycle-order-center", level!="DEBUG"}[1m])
)
订单成功率:
sum(rate({event_class="order", status="success"}[5m]))
/
sum(rate({event_class="order"}[5m]))
延时分布(P95):
quantile_over_time(0.95, {app="shop-recycle-order-center"} | json | unwrap duration_ms [5m]) by (uri_group)
groups:
- name: shop_recycle_alerts
rules:
- alert: HighErrorRate
expr: (
rate(shop_recycle_requests_errors_total[5m])
/
rate(shop_recycle_requests_total[5m])
) > 0.05
for: 2m
annotations:
summary: "{{ $labels.app }} 错误率超过 5%"
- alert: OrderCreationFailureRate
expr: (
rate(shop_recycle_orders_failed_total[5m])
/
rate(shop_recycle_orders_total[5m])
) > 0.01
for: 2m
annotations:
summary: "订单失败率超过 1%"
- alert: SlowRequests
expr: histogram_quantile(0.99, rate(shop_recycle_request_duration_ms_bucket[5m])) > 2
for: 3m
annotations:
summary: "{{ $labels.uri_group }} P99 延迟超过 2 秒"
parsing_errors。helm rollback 回退。helm rollback vector -n logging
kubectl rollout undo deployment/vector -n logging
| 字段 | 预期基数 | 检测方式 | 上 label 限制 |
|---|---|---|---|
env |
~3 | 枚举 | ✅ |
app |
<50 | 服务列表 | ✅ |
level |
~5 | 枚举 | ✅ |
event_class |
<20 | 业务分类 | ✅ |
uri_group |
<100 | 规则化 | ✅ |
status |
~5 | 枚举 | ✅ |
userId |
~百万 | 保留 JSON | ❌ |
orderId |
~千万 | 保留 JSON | ❌ |
应用端开销(Interceptor + AsyncAppender):
采集端(Vector vs Promtail):