ELK Stack — 日志分析平台
架构概览
ELK = Elasticsearch + Logstash + Kibana,现代化部署通常加入 Beats 和 Kafka:
应用日志
└──► Filebeat(轻量采集)
└──► Logstash(重量级处理)
│
▼
Kafka(缓冲,防止 ES 过载)
│
▼
Logstash(解析、过滤、转换)
│
▼
Elasticsearch(存储、索引)
│
▼
Kibana(可视化、查询)Elasticsearch 核心概念
Index(索引):类似数据库的 Table
└── Shard(分片):数据分布单元
├── Primary Shard(主分片)
└── Replica Shard(副本分片)
Document(文档):JSON 格式的数据单元
Mapping(映射):字段类型定义(类似 Schema)索引设计
json
// 创建索引(按日期滚动)
PUT /app-logs-2024.01.15
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"index.refresh_interval": "5s"
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"level": { "type": "keyword" },
"service": { "type": "keyword" },
"message": { "type": "text", "analyzer": "standard" },
"trace_id": { "type": "keyword" },
"duration_ms": { "type": "long" },
"error": {
"properties": {
"type": { "type": "keyword" },
"message": { "type": "text" },
"stack_trace": { "type": "text", "index": false }
}
}
}
}
}ILM(索引生命周期管理)
json
// 创建 ILM 策略
PUT _ilm/policy/logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
},
"set_priority": { "priority": 100 }
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"set_priority": { "priority": 50 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {},
"set_priority": { "priority": 0 }
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}Logstash 配置
ruby
# logstash.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-logs"]
group_id => "logstash-consumer"
codec => "json"
}
}
filter {
# 解析 JSON 日志
json {
source => "message"
target => "parsed"
}
# 解析时间戳
date {
match => ["[parsed][timestamp]", "ISO8601"]
target => "@timestamp"
}
# 解析 Java 异常堆栈
multiline {
pattern => "^[[:space:]]"
what => "previous"
}
# Grok 解析非结构化日志
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}"
}
}
# 添加 GeoIP 信息
geoip {
source => "client_ip"
target => "geoip"
}
# 删除不需要的字段
mutate {
remove_field => ["@version", "host", "path"]
}
}
output {
elasticsearch {
hosts => ["es1:9200", "es2:9200", "es3:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
template_name => "app-logs"
ilm_enabled => true
ilm_rollover_alias => "app-logs"
ilm_policy => "logs-policy"
}
}Filebeat 配置
yaml
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app/*.log
fields:
service: order-service
env: production
fields_under_root: true
multiline:
pattern: '^\d{4}-\d{2}-\d{2}'
negate: true
match: after
# Docker 容器日志
- type: container
paths:
- /var/lib/docker/containers/*/*.log
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
output.kafka:
hosts: ["kafka:9092"]
topic: "app-logs"
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000Kibana 查询(KQL)
# 查询错误日志
level: "ERROR"
# 查询特定服务的错误
level: "ERROR" AND service: "order-service"
# 查询包含关键字的日志
message: "NullPointerException"
# 时间范围查询(在 Kibana 时间选择器中设置)
# 查询特定 trace_id(全链路追踪)
trace_id: "abc123def456"
# 查询慢请求(超过1秒)
duration_ms > 1000
# 复合查询
service: "order-service" AND level: "ERROR" AND NOT message: "timeout"Elasticsearch 查询 DSL
json
// 全文搜索 + 过滤
GET /app-logs-*/_search
{
"query": {
"bool": {
"must": [
{ "match": { "message": "NullPointerException" } }
],
"filter": [
{ "term": { "level": "ERROR" } },
{ "term": { "service": "order-service" } },
{
"range": {
"@timestamp": {
"gte": "now-1h",
"lte": "now"
}
}
}
]
}
},
"sort": [{ "@timestamp": "desc" }],
"size": 20
}
// 聚合:按服务统计错误数
GET /app-logs-*/_search
{
"size": 0,
"query": {
"term": { "level": "ERROR" }
},
"aggs": {
"by_service": {
"terms": { "field": "service", "size": 10 },
"aggs": {
"error_count": { "value_count": { "field": "_id" } }
}
}
}
}故障处理案例
案例一:ES 集群 Red 状态
现象:GET _cluster/health 返回 status: red,部分索引不可用。
排查:
bash
# 查看集群健康
GET _cluster/health?pretty
# 查看未分配分片
GET _cat/shards?v&h=index,shard,prirep,state,unassigned.reason
# 查看分片分配失败原因
GET _cluster/allocation/explain常见原因:
- 节点宕机,主分片无法分配
- 磁盘空间不足(超过
cluster.routing.allocation.disk.watermark.high) - 副本数超过节点数
解决:
json
// 临时降低副本数
PUT /my-index/_settings
{
"number_of_replicas": 0
}
// 手动触发分片重新分配
POST _cluster/reroute?retry_failed=true案例二:写入性能下降
现象:Logstash 写入 ES 延迟增加,队列积压。
排查:
bash
# 查看索引写入性能
GET _cat/indices?v&h=index,indexing.index_total,indexing.index_time
# 查看节点性能
GET _nodes/stats/indices/indexing优化:
json
// 增大 refresh_interval(减少 segment 合并频率)
PUT /app-logs-*/_settings
{
"refresh_interval": "30s"
}
// 增大 bulk 批量大小(Logstash)
output.elasticsearch:
bulk_max_size: 5000案例三:磁盘空间告警
bash
# 查看各节点磁盘使用
GET _cat/allocation?v
# 调整磁盘水位线
PUT _cluster/settings
{
"persistent": {
"cluster.routing.allocation.disk.watermark.low": "85%",
"cluster.routing.allocation.disk.watermark.high": "90%",
"cluster.routing.allocation.disk.watermark.flood_stage": "95%"
}
}监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
cluster_status | 集群状态 | yellow/red |
unassigned_shards | 未分配分片数 | > 0 |
jvm_heap_used_percent | JVM 堆内存使用率 | > 85% |
disk_used_percent | 磁盘使用率 | > 80% |
indexing_rate | 写入速率 | 接近上限 |
search_latency | 查询延迟 | P99 > 1s |