Skip to content

ELK Stack — 日志分析平台

架构概览

ELK = Elasticsearch + Logstash + Kibana,现代化部署通常加入 Beats 和 Kafka:

应用日志
  └──► Filebeat(轻量采集)
  └──► Logstash(重量级处理)


           Kafka(缓冲,防止 ES 过载)


         Logstash(解析、过滤、转换)


       Elasticsearch(存储、索引)


           Kibana(可视化、查询)

Elasticsearch 核心概念

Index(索引):类似数据库的 Table
  └── Shard(分片):数据分布单元
        ├── Primary Shard(主分片)
        └── Replica Shard(副本分片)

Document(文档):JSON 格式的数据单元
Mapping(映射):字段类型定义(类似 Schema)

索引设计

json
// 创建索引(按日期滚动)
PUT /app-logs-2024.01.15
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "level": { "type": "keyword" },
      "service": { "type": "keyword" },
      "message": { "type": "text", "analyzer": "standard" },
      "trace_id": { "type": "keyword" },
      "duration_ms": { "type": "long" },
      "error": {
        "properties": {
          "type": { "type": "keyword" },
          "message": { "type": "text" },
          "stack_trace": { "type": "text", "index": false }
        }
      }
    }
  }
}

ILM(索引生命周期管理)

json
// 创建 ILM 策略
PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "50gb",
            "max_age": "1d"
          },
          "set_priority": { "priority": 100 }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 },
          "set_priority": { "priority": 50 }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {},
          "set_priority": { "priority": 0 }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

Logstash 配置

ruby
# logstash.conf
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs"]
    group_id => "logstash-consumer"
    codec => "json"
  }
}

filter {
  # 解析 JSON 日志
  json {
    source => "message"
    target => "parsed"
  }
  
  # 解析时间戳
  date {
    match => ["[parsed][timestamp]", "ISO8601"]
    target => "@timestamp"
  }
  
  # 解析 Java 异常堆栈
  multiline {
    pattern => "^[[:space:]]"
    what => "previous"
  }
  
  # Grok 解析非结构化日志
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}"
    }
  }
  
  # 添加 GeoIP 信息
  geoip {
    source => "client_ip"
    target => "geoip"
  }
  
  # 删除不需要的字段
  mutate {
    remove_field => ["@version", "host", "path"]
  }
}

output {
  elasticsearch {
    hosts => ["es1:9200", "es2:9200", "es3:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
    template_name => "app-logs"
    ilm_enabled => true
    ilm_rollover_alias => "app-logs"
    ilm_policy => "logs-policy"
  }
}

Filebeat 配置

yaml
# filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/app/*.log
    fields:
      service: order-service
      env: production
    fields_under_root: true
    multiline:
      pattern: '^\d{4}-\d{2}-\d{2}'
      negate: true
      match: after

  # Docker 容器日志
  - type: container
    paths:
      - /var/lib/docker/containers/*/*.log
    processors:
      - add_docker_metadata:
          host: "unix:///var/run/docker.sock"

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

output.kafka:
  hosts: ["kafka:9092"]
  topic: "app-logs"
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

Kibana 查询(KQL)

# 查询错误日志
level: "ERROR"

# 查询特定服务的错误
level: "ERROR" AND service: "order-service"

# 查询包含关键字的日志
message: "NullPointerException"

# 时间范围查询(在 Kibana 时间选择器中设置)

# 查询特定 trace_id(全链路追踪)
trace_id: "abc123def456"

# 查询慢请求(超过1秒)
duration_ms > 1000

# 复合查询
service: "order-service" AND level: "ERROR" AND NOT message: "timeout"

Elasticsearch 查询 DSL

json
// 全文搜索 + 过滤
GET /app-logs-*/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "message": "NullPointerException" } }
      ],
      "filter": [
        { "term": { "level": "ERROR" } },
        { "term": { "service": "order-service" } },
        {
          "range": {
            "@timestamp": {
              "gte": "now-1h",
              "lte": "now"
            }
          }
        }
      ]
    }
  },
  "sort": [{ "@timestamp": "desc" }],
  "size": 20
}

// 聚合:按服务统计错误数
GET /app-logs-*/_search
{
  "size": 0,
  "query": {
    "term": { "level": "ERROR" }
  },
  "aggs": {
    "by_service": {
      "terms": { "field": "service", "size": 10 },
      "aggs": {
        "error_count": { "value_count": { "field": "_id" } }
      }
    }
  }
}

故障处理案例

案例一:ES 集群 Red 状态

现象GET _cluster/health 返回 status: red,部分索引不可用。

排查

bash
# 查看集群健康
GET _cluster/health?pretty

# 查看未分配分片
GET _cat/shards?v&h=index,shard,prirep,state,unassigned.reason

# 查看分片分配失败原因
GET _cluster/allocation/explain

常见原因

  • 节点宕机,主分片无法分配
  • 磁盘空间不足(超过 cluster.routing.allocation.disk.watermark.high
  • 副本数超过节点数

解决

json
// 临时降低副本数
PUT /my-index/_settings
{
  "number_of_replicas": 0
}

// 手动触发分片重新分配
POST _cluster/reroute?retry_failed=true

案例二:写入性能下降

现象:Logstash 写入 ES 延迟增加,队列积压。

排查

bash
# 查看索引写入性能
GET _cat/indices?v&h=index,indexing.index_total,indexing.index_time

# 查看节点性能
GET _nodes/stats/indices/indexing

优化

json
// 增大 refresh_interval(减少 segment 合并频率)
PUT /app-logs-*/_settings
{
  "refresh_interval": "30s"
}

// 增大 bulk 批量大小(Logstash)
output.elasticsearch:
  bulk_max_size: 5000

案例三:磁盘空间告警

bash
# 查看各节点磁盘使用
GET _cat/allocation?v

# 调整磁盘水位线
PUT _cluster/settings
{
  "persistent": {
    "cluster.routing.allocation.disk.watermark.low": "85%",
    "cluster.routing.allocation.disk.watermark.high": "90%",
    "cluster.routing.allocation.disk.watermark.flood_stage": "95%"
  }
}

监控指标

指标说明告警阈值
cluster_status集群状态yellow/red
unassigned_shards未分配分片数> 0
jvm_heap_used_percentJVM 堆内存使用率> 85%
disk_used_percent磁盘使用率> 80%
indexing_rate写入速率接近上限
search_latency查询延迟P99 > 1s

PaaS 中间件生态系统深度学习文档