轩辕李的博客 轩辕李的博客
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

轩辕李

勇猛精进,星辰大海
首页
  • Java
  • Spring
  • 其他语言
  • 工具
  • HTML&CSS
  • JavaScript
  • 分布式
  • 代码质量管理
  • 基础
  • 操作系统
  • 计算机网络
  • 编程范式
  • 安全
  • 中间件
  • 心得
关于
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 分布式

  • 代码质量管理

  • 基础

  • 操作系统

  • 计算机网络

  • 编程范式

  • 安全

  • 中间件

    • Tomcat体系介绍及应用
    • Redis实践应用
    • Elasticsearch实战
    • 理解MySQL
    • Rabbitmq介绍与应用
    • Kafka实战
    • ELK实践之采集并分析Nginx与应用程序日志
      • 一、前言
      • 二、ELK安装
      • 三、采集Nginx日志
        • 1、filebeat采集Nginx日志
        • 2、Logstash配置
        • 2.1、gork介绍
        • 2.2、重复数据处理
      • 四、采集应用日志
        • 1、定义Logstash Pipeline
        • 2、采集Java应用日志
      • 五、Kibana的使用
        • 1、到期之后,索引并没有自动删除的问题
        • 2、Discover补充说明
      • 六、使用Kafka作为中间件
      • 七、其他资料
      • 八、总结
  • 心得

  • 架构
  • 中间件
轩辕李
2023-05-29
目录

ELK实践之采集并分析Nginx与应用程序日志

# 一、前言

ELK 是一个开源软件栈,由 Elasticsearch、Logstash 和 Kibana 三个主要组件组成,用于处理和可视化大规模的日志数据。

  1. Elasticsearch:Elasticsearch 是一个实时的分布式搜索和分析引擎。它基于 Apache Lucene 引擎构建,具有快速的搜索、分析和聚合能力。Elasticsearch 可以处理海量数据,并提供强大的全文搜索、结构化搜索、地理位置搜索等功能。它支持水平扩展和高可用性,并具有 RESTful API,使其易于集成和使用。
  2. Logstash:Logstash 是一个开源的数据收集和处理工具。它能够从各种来源(如文件、数据库、消息队列等)收集和处理日志数据,并将其转换为可索引的格式,如 JSON。Logstash 提供丰富的插件生态系统,可以进行数据过滤、转换和增强,以适应不同的数据源和需求。
  3. Kibana:Kibana 是一个用于数据可视化和分析的开源工具。它与 Elasticsearch 紧密集成,提供了一个直观的 Web 界面,用于创建交互式的仪表板、图表和可视化报表。Kibana 可以实时地检索和可视化 Elasticsearch 中的数据,帮助用户理解和分析日志数据,发现趋势、异常和关联关系。

ELK 被广泛用于日志管理和分析领域。它可以帮助组织和企业实时监控系统的运行状况、故障排查、安全分析、业务洞察等。通过将 Logstash 用于数据收集和处理,将数据存储在 Elasticsearch 中,并使用 Kibana 进行数据可视化,用户可以轻松地探索和分析日志数据,提取有价值的信息,并进行故障诊断、性能优化和决策制定等工作。

关于Elasticsearch,可以参考我以前的文章: Elasticsearch实战。

本文主要从实操的角度介绍一下ELK收集Nginx日志和应用程序日志,并对日志进行分析的过程。

# 二、ELK安装

实操之前,需要先安装Elasticsearch (opens new window)和Kibana (opens new window)。

# 三、采集Nginx日志

我们需要用到Filebeat,它和 Logstash 都是用于日志数据收集和传输的工具,但它们在功能和设计上有一些区别。

Filebeat 是 Elastic 公司提供的一款轻量级日志数据收集器。它专注于实时读取和传输日志文件,将日志数据发送到指定的目标,通常是通过网络将数据发送给中央日志存储或分析系统。Filebeat 部署简单、资源消耗低,并具有高效的数据传输能力。它支持多种日志文件格式,能够监视文件的变化并实时发送新的日志事件。Filebeat 本身不具备数据处理和转换的能力,它的主要职责是快速、可靠地将日志数据传输到下游处理工具,如 Elasticsearch 或 Logstash。

Logstash 是 Elastic 公司提供的一个功能强大的数据收集、处理和传输工具。它是一个可扩展的数据管道,支持从多种数据源(包括文件、数据库、消息队列等)收集数据,进行丰富的数据处理和转换,然后将数据发送到多个目标,如 Elasticsearch、文件、消息队列等。Logstash 的数据处理能力非常丰富,可以进行数据过滤、解析、转换、增强等操作,还支持插件机制,可以通过插件扩展其功能。Logstash 提供了多种输入和输出插件,使其与各种数据源和目标兼容。

# 1、filebeat采集Nginx日志

先创建/etc/filebeat/filebeat.yml文件,内容如下:

filebeat.inputs:
- type: log
  paths:
    - /usr/local/nginx/logs/access.log  
  fields:
     input_type_source: nginx
output.logstash:
  hosts: ["172.28.0.1:5044"]

假如你的Nginx在/usr/local/nginx/目录下,那么你需要将/usr/local/nginx/目录挂载到容器中。

启动filebeat容器:

docker run -d --name filebeat-nginx -v /usr/local/nginx/:/usr/local/nginx/ -v /etc/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml docker.elastic.co/beats/filebeat:7.3.1 filebeat -e  -d "publish"

这样我们把Nginx的日志采集到了Logstash中,接下来我们需要配置Logstash。

# 2、Logstash配置

首先,我们需要创建一个/etc/logstash/pipeline/nginx.conf文件,内容如下:

input {
    beats {
        port => "5044"
    }
}
filter {
    // 使用正则表达式模式匹配来解析日志中的字段。匹配到的字段会被添加到事件中,并从事件中移除原始的message字段。
    grok {
        match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{NUMBER:request_time}\" \"%{NUMBER:upstream_response_time}?\" \"%{DATA:http_host}\" \"%{DATA:request_uri}\" %{NUMBER:status} %{NUMBER:body_bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\" \"%{WORD:request_method}\" \"%{GREEDYDATA:request_body}\""}
        remove_field => "message"
    }
    // 用于添加和转换字段
    mutate {
        // 添加自定义字段,这里是添加了一个字段,值为@timestamp
        add_field => { "read_timestamp" => "%{@timestamp}" }
        convert => {
            "request_time" => "float"
            "upstream_response_time" => "float"
            "status" => "integer"
            "body_bytes_sent" => "integer"
        }
    }
    // 用于解析http_user_agent字段,获取用户代理信息
    useragent {
        source => "http_user_agent"
        target => "nginx_http_user_agent"
    }
    // 会把time_local字段的值转换为@timestamp的值
    date {
        match => [ "time_local", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "time_local"
    }
    // 用于通过客户端IP地址获取地理位置信息
    geoip {
        source => "remote_addr"
    }
}
output {
    elasticsearch {
        hosts => [ "elasticsearch:9200" ]
        user => "elastic"
        password => "elastic"
        index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
    }
}

创建/etc/logstash/logstash.yml文件,内容如下:

http.host: "0.0.0.0"
# 禁用了 X-Pack Monitoring,你可以通过配置 xpack.monitoring.enabled: true 来启用它
xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]
#xpack.monitoring.elasticsearch.username: elastic
#xpack.monitoring.elasticsearch.password: elastic
# 批处理大小
pipeline.batch.size: 1000
# 批处理延时(ms)
pipeline.batch.delay: 500

启动Logstash容器:

docker run --name logstash --restart=on-failure:3 --net=host -d -v /etc/logstash/pipeline/:/usr/share/logstash/pipeline/ -v /etc/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml docker.elastic.co/logstash/logstash:7.3.1

我们创建了一个 Logstash Pipeline,在 Logstash 中,Pipeline(管道)是数据处理的核心概念。它定义了一系列的数据处理步骤,用于接收、处理和转发数据。每个 Pipeline 包含一组插件,按照定义的顺序依次处理数据。

它通常包含以下三个阶段:

  1. 输入阶段(Input Stage):在输入阶段,Logstash 从指定的数据源接收数据。数据源可以是各种来源,如文件、消息队列、网络服务等。Logstash 提供了多种输入插件,用于接收不同类型的数据。输入插件负责接收数据,并将其传递到下一个阶段。
  2. 过滤阶段(Filter Stage):在过滤阶段,Logstash 对输入的数据进行处理、过滤和转换。您可以使用各种过滤器插件,如 grok、mutate、date 等,对数据进行解析、添加字段、修改字段值、删除字段等操作。通过配置不同的过滤器插件和定义适当的过滤规则,您可以根据需求对数据进行结构化处理和准备,使其适应后续的操作和存储需求。
  3. 输出阶段(Output Stage):在输出阶段,经过过滤处理后的数据被发送到指定的目标。目标可以是 Elasticsearch、文件、消息队列、数据库等。Logstash 提供了多种输出插件,用于将数据发送到不同的目标。输出插件负责将数据传递给指定的目标,并完成数据的最终存储或转发。

Logstash Pipeline 的配置文件通常以 .conf 后缀名保存,其中定义了输入插件、过滤器插件和输出插件的配置以及它们的顺序。在运行 Logstash 时,它会加载配置文件,并按照定义的顺序逐步处理数据。

通过定义和配置 Logstash Pipeline,您可以实现灵活的数据处理和转发流程,将数据从不同的来源收集、加工和分发到指定的目标,满足数据集成、清洗、转换和分析的需求。

在上面的例子中,我们定义了一个输入插件,用于接收来自 Filebeat 的数据。然后,我们使用 grok 插件对日志进行解析,使用 mutate 插件对字段进行添加和转换,使用 useragent 插件对用户代理信息进行解析,使用 date 插件对时间进行转换,使用 geoip 插件对客户端 IP 地址进行解析。最后,我们使用 elasticsearch 插件将数据发送到 Elasticsearch 中。

注意:需要开启Elasticsearch的自动创建索引功能,否则logstash会报错。

如果要修改logstash的jvm配置,可以通过-v /etc/logstash/jvm.options:/usr/share/logstash/config/jvm.options挂载配置文件。前提是你需要先创建/etc/logstash/jvm.options文件。

# 2.1、gork介绍

在 Logstash 中,Grok 是一种用于日志解析和结构化的强大模式匹配工具。它允许您根据自定义的模式来解析和提取非结构化的日志数据,并将其转换为结构化的字段。Grok 通过匹配文本模式,将日志中的各个部分解析为具有特定含义的字段,使得日志数据更易于搜索、过滤和分析。

Grok 使用一种简洁而强大的语法来定义模式,其中包含预定义的模式和正则表达式。您可以使用预定义的模式(如数字、IP 地址、日期等)或编写自定义的模式来匹配日志中的特定部分。Grok 的模式匹配基于正则表达式,通过定义模式和命名捕获组,可以将匹配到的文本部分提取为字段。

示例中用到的gork模式:

  • %{IPORHOST:remote_addr}:匹配并提取 IP 地址或主机名,将其作为 remote_addr 字段。
  • %{DATA:remote_user}:匹配并提取任意非空白字符序列,将其作为 remote_user 字段。
  • %{HTTPDATE:time_local}:匹配并提取 HTTP 格式的日期和时间,将其作为 time_local 字段。
  • %{NUMBER:request_time}:匹配并提取数字,将其作为 request_time 字段。
  • %{NUMBER:upstream_response_time}?:匹配并提取数字,将其作为 upstream_response_time 字段。这里的问号表示该字段是可选的,因为有时候会不存在这个值。
  • %{DATA:http_host}:匹配并提取任意非空白字符序列,将其作为 http_host 字段。注意:因为logstash会自动收集host信息,所以这里一定不能配置为host字段,否则将会被覆盖。
  • %{DATA:request_uri}:匹配并提取任意非空白字符序列,将其作为 request_uri 字段。
  • %{NUMBER:status}:匹配并提取数字,将其作为 status 字段。
  • %{NUMBER:body_bytes_sent}:匹配并提取数字,将其作为 body_bytes_sent 字段。
  • %{DATA:http_referer}:匹配并提取任意非空白字符序列,将其作为 http_referer 字段。
  • %{DATA:http_user_agent}:匹配并提取任意非空白字符序列,将其作为 http_user_agent 字段。
  • %{WORD:request_method}:匹配并提取由字母组成的单词,将其作为 request_method 字段。
  • %{GREEDYDATA:request_body}:匹配并提取任意字符序列,将其作为 request_body 字段。

其他常用的gork模式:

  • %{PATTERN:fieldname}:将匹配的文本部分提取为名为 fieldname 的字段。
  • %{NUMBER:bytes:int}:将匹配的数字提取为整数类型的字段 bytes。
  • %{TIMESTAMP_ISO8601:timestamp}:将匹配的 ISO 8601 格式的时间戳提取为字段 timestamp。
  • %{USERNAME:username}:匹配并提取用户名,将其作为 username 字段。
  • %{HOSTNAME:hostname}:匹配并提取主机名,将其作为 hostname 字段。
  • %{EMAILADDRESS:email}:匹配并提取电子邮件地址,将其作为 email 字段。
  • %{URI:uri}:匹配并提取 URI(统一资源标识符),将其作为 uri 字段。

前面的nginx pipeline配置的gork,对应的nginx日志的格式如下:

    log_format main escape=json '$remote_addr - $remote_user [$time_local] "$request_time" "$upstream_response_time" "$host" "$request_uri" '
    '$status $body_bytes_sent "$http_referer" '
    '"$http_user_agent" "$request_method" "$request_body" ';

nginx日志输出:

59.11.219.0 -  [29/May/2023:17:30:35 +0800] "0.002" "0.002" "m.x.cn" "/apis/test?Id=29493" 200 8 "https://m.x.cn/auction/live/29493?s=11f231ff" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) HeadlessChrome/99.0.4844.51 Safari/537.36" "GET" "" 
42.80.16.1 -  [29/May/2023:17:30:35 +0800] "0.002" "0.002" "m.x.cn" "/apis/live/test?Id=29567" 200 8 "https://m.x.cn/auction/live/29567?s=5a9ac8c2" "Mozilla/5.0 (iPhone; CPU iPhone OS 15_7_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148" "GET" "" 
45.13.18.1 -  [29/May/2023:17:37:52 +0800] "0.000" "0.000" "m.x.cn" "/apis/code" 403 37 "" "LangShen" "POST" "{\"mobile\":\"13700000000\",\"zoneCode\":\"86\"}" 

你可以使用[Grok Constructor](https://grokconstructor.appspot.com/do/match 验证)来验证你的gork是否正确。

更多gork信息,参考 Grok filter plugin (opens new window)

# 2.2、重复数据处理

在采集Nginx日志的时候,发现导入到ES的数据存在重复。可以通过Logstash的fingerprint插件来解决问题,需要在配置文件的filter部分增加如下配置:

filter{
    ...
    # 指纹插件
    fingerprint {
      source => "message"
      target => "[@metadata][fingerprint]"
      method => "MD5"  # 签名方法,可以是MD5、SHA1
    }
}

然后在output的elasticsearch增加document_id配置:

output {
    elasticsearch {
        ...
        document_id => "%{[@metadata][fingerprint]}"  # 使用指纹作为文档的唯一标识符
    }
}

# 四、采集应用日志

# 1、定义Logstash Pipeline

首先,我们需要创建一个/etc/logstash/pipeline/app.conf文件,内容如下:

input {
  tcp {
    port => 5045
    codec => json_lines
  }
}
output {
    elasticsearch {
        hosts => [ "elasticsearch:9200" ]
        user => "elastic"
        password => "elastic"
        index => "logstash-%{[input][type]}-%{[fields][input_type_source]}-%{+YYYY.MM.dd}"
    }
}

这个pipeline非常简单,它从TCP端口5055接收JSON格式的日志,然后将日志输出到Elasticsearch中。

# 2、采集Java应用日志

要将Logback的日志输出发送到Logstash,可以通过使用Logstash的Logback插件来实现。以下是配置步骤:

  1. 在项目的依赖管理中添加Logstash Logback插件的依赖。示例中使用Maven:
<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>7.3</version>
</dependency>
  1. 在Logback的配置文件中进行相应的配置,以将日志输出发送给Logstash。示例配置如下:
<configuration>
    <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>172.28.0.1:5045</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>
    
    <root level="INFO">
        <appender-ref ref="LOGSTASH" />
    </root>
</configuration>

在配置中,your-logstash-host和your-logstash-port需要替换为实际的Logstash主机和端口。

  1. 重启Logstash,确保pipeline起作用。

如果你觉得这样的配置太麻烦,可以使用logstash-logging-spring-boot-starter (opens new window)来简化配置。

# 五、Kibana的使用

进入管理页面,需要先创建index templates,以nginx为例,配置如下:
image

创建完成后,启动logstash,可以看到index已经创建成功了,如下图所示:
image

创建索引模式:
image

进入Kibana的首页,点击左侧的Discover,可以看到nginx日志信息,如下图所示:
image

还可以为index templates创建index lifecycle policy(生命周期策略),以nginx为例,配置如下:
image

表示当index的日期超过30天时,就会被删除。

还需要把index lifecycle policy(生命周期策略)关联到对应的index template上,操作policy,将策略 “logstash-nginx-lifecycle” 添加到索引模板 “logstash-log-nginx” 上。

你还可以创建Dashboard,将多个图表组合在一起,以便更好地展示数据。参考:在Kibana中可视化NGINX访问日志 (opens new window)

注意:最好先在Kibana中创建index template,这样新建的index就会自动应用index template的配置。

# 1、到期之后,索引并没有自动删除的问题

实践中发现,索引到了生命周期删除阶段,并没有被实际删除,这应该是因为Kibana的Bug,它请求的时候没有定义action。

请参考 Elastic生命周期策略索引不删除的问题 (opens new window) 来解决此问题。

# 2、Discover补充说明

如果日志搜索的结果和你期望的不符,你可以在右上角的”检查“看到完整的请求,比如:
image 可以看到这里默认按照时间倒序排序。你可以点击“Time”字段切换为正序排序。

默认情况下,text字段会使用standard分词器进行分词。你也可以在索引模式处对某个字段进行单独设置。

比如对message字段设置了IK分词器,不过因为Discover中默认是有排序的(且无法更改),可能结果并不符合你的期望。你可以在开发工具中进行手动查询,但这很不方便。

GET /logstash-applog-*/_search
{
  "size": 500,
  "query": {
    "match": {
      "message": "全失败"
    }
  }
}

不管是standard分词还是IK分词,在Discover中都无法实现精确匹配,因为KQL默认使用了Terms查询 (opens new window),而Terms查询是不支持分词的,所以无法实现精确匹配。

即使你在Discover中使用Lucene查询,比如直接输入Query DSL:

    "match": {
      "message": "全失败"
    }

这样也不行,因为排序会覆盖match搜索的评分,可能也不符合期望。

有一种变通的方式,就是将message字段设置为wildcard类型,然后使用wildcard查询,比如:

message : *全失败*

这样的话我们就能得到精确匹配。

# 六、使用Kafka作为中间件

如果日志的数据量很大,可以考虑使用Kafka作为中间件,将日志数据先缓存到Kafka中,然后再由Logstash从Kafka中读取数据,这样可以有效地减轻Logstash的压力。

filebeat的output配置修改如下:

output.kafka:	
  hosts: ["192.168.0.1:9092"]
  topic: 'nginx'

前提是你已经安装了Kafka,参考:Kafka安装

logstash的input配置修改如下:

input {
  kafka {
    type => "kafka"
    bootstrap_servers => "192.168.0.1:9092"
    topics => "nginx"
    group_id => "logstash"
    consumer_threads => 2
  }
}

具体参考:亿级 ELK 日志平台构建实践 (opens new window) 和 使用Logstash从Kafka到Elasticsearch (opens new window)

# 七、其他资料

  • 日志的艺术 (opens new window) 补充:文中提到超大型集群和面向分析的场景,其实Doris 2.0开始支持倒排索引 (opens new window),可以考虑把Clickhouse换成Doris。

# 八、总结

本文从实操角度介绍了如何使用ELK搭建日志平台,主要讲解了采集nginx日志和Java应用日志的方法,以及如何使用Kibana进行可视化展示。希望对你有所帮助。

祝你变得更强!

编辑 (opens new window)
#ELK
上次更新: 2025/04/01
Kafka实战
关于“编程的本质”的探讨

← Kafka实战 关于“编程的本质”的探讨→

最近更新
01
Spring Boot版本新特性
09-15
02
Spring框架版本新特性
09-01
03
Spring Boot开发初体验
08-15
更多文章>
Theme by Vdoing | Copyright © 2018-2025 京ICP备2021021832号-2 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式