Filebeat 是使用 Golang 实现的轻量型日志采集器,是基于原先 logstash-forwarder 的源码改造出来的,没有任何依赖,可以单独存在的搞性能采集工具。

认识beats

Beats是轻量级(资源高效,无依赖性,小型)采集程序的集合。这些可以是日志文件(Filebeat),网络数据(Packetbeat),服务器指标(Metricbeat)等,Beats建立在名为libbeat的Go框架之上,该框架主要用于数据转发,Elastic和社区开发的越来越多的Beats可以收集的任何其他类型的数据,收集后,数据将直接发送到Elasticsearch或Logstash中进行其他处理。

Filebeat

顾名思义,Filebeat用于收集和传送日志文件,它也是最常用的Beat。 Filebeat如此高效的事实之一就是它处理背压的方式,如果Logstash繁忙,Filebeat会减慢其读取速率,并在减速结束后加快节奏。 Filebeat几乎可以安装在任何操作系统上,包括作为Docker容器安装,还随附用于特定平台(例如Apache,MySQL,Docker等)的内部模块,其中包含这些平台的默认配置和Kibana对象。

Packetbeat

网络数据包分析器Packetbeat是第一个引入的beat。 Packetbeat捕获服务器之间的网络流量,因此可用于应用程序和性能监视。 Packetbeat可以安装在受监视的服务器上,也可以安装在其专用服务器上。 Packetbeat跟踪网络流量,解码协议并记录每笔交易的数据。 Packetbeat支持的协议包括:DNS,HTTP,ICMP,Redis,MySQL,MongoDB,Cassandra等。

Metricbeat

Metricbeat是一种非常受欢迎的beat,它收集并报告各种系统和平台的各种系统级度量。 Metricbeat还支持用于从特定平台收集统计信息的内部模块。您可以使用这些模块和称为指标集的metricsets来配置Metricbeat收集指标的频率以及要收集哪些特定指标。

Heartbeat

Heartbeat是用于“uptime monitoring”的。本质上,Heartbeat是探测服务以检查它们是否可访问的功能,例如,它可以用来验证服务的正常运行时间是否符合您的SLA。 您要做的就是为Heartbeat提供URL和正常运行时间指标的列表。

Auditbeat

Auditbeat可用于操作Linux服务器上的用户和进程活动。 与其他传统的系统工具(systemd,auditd)类似,Auditbeat可用于识别安全漏洞-文件更改,配置更改,恶意行为等。

Winlogbeat

Winlogbeat仅会引起Windows系统管理员或工程师的兴趣,因为它是专门为收集Windows事件日志而设计的。 它可用于分析安全事件,已安装的更新等。

Functionbeat

Functionbeat主要是为“serverless”而设计,可以将其部署为收集数据并将其发送到ELK堆栈的功能。 Functionbeat专为监视云环境而设计,目前已针对Amazon设置量身定制,可以部署为Amazon Lambda函数,以从Amazon CloudWatch,Kinesis和SQS收集数据。

filebeat

为什么选择filebeat

  • 性能好
  • 基于golang的技术站,对于容器生态友好
  • 使用部署方便,功能齐全
  • 其他技术方案,主要是社区的fluentd,使用的ruby+c,使用起来很复杂,各种功能都需要写插件来完成。

基本使用

下载

直接去github上可以下载二进制文件,可以直接运行

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.2-linux-x86_64.tar.gz
tar xzvf filebeat-7.10.2-linux-x86_64.tar.gz

也可以使用源码编译

After installing Go, set the GOPATH environment variable to point to your workspace location, and make sure $GOPATH/bin is in your PATH.

mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats

If you have multiple go paths, use ${GOPATH%%:*} instead of ${GOPATH}.

Then you can compile a particular Beat by using the Makefile. For example, for Packetbeat:

cd beats/packetbeat
make

Some of the Beats might have extra development requirements, in which case you’ll find a CONTRIBUTING.md file in the Beat directory.

We use an EditorConfig file in the beats repository to standardise how different editors handle whitespace, line endings, and other coding styles in our files. Most popular editors have a plugin for EditorConfig and we strongly recommend that you install it.

配置文件

FileBeat 的配置文件定义了在读取文件的位置,输出流的位置以及相应的性能参数,本实例是以 Kafka 消息中间件作为缓冲,所有的日志收集器都向 Kafka 输送日志流,相应的最简单的配置项如下:

$ vim fileat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /wls/applogs/rtlog/app.log
  fields:
    log_topic: appName
  multiline:
        # pattern for error log, if start with space or cause by
        pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
        negate:  false
        match:   after

output.kafka:
   enabled: true
   hosts: ["kafka-1:9092","kafka-2:9092"]
   topic: applog
   version: "0.10.2.0"
   compression: gzip

processors:
- drop_fields:
   fields: ["beat", "input", "source", "offset"]

logging.level: error
name: app-server-ip
  • paths:定义了日志文件路径,可以采用模糊匹配模式,如*.log
  • fields:topic 对应的消息字段或自定义增加的字段。
  • output.kafka:filebeat 支持多种输出,支持向 kafka,logstash,elasticsearch 输出数据,此处设置数据输出到 kafka。
  • enabled:这个启动这个模块。
  • topic:指定要发送数据给 kafka 集群的哪个 topic,若指定的 topic 不存在,则会自动创建此 topic。
  • version:指定 kafka 的版本。
  • drop_fields:舍弃字段,filebeat 会 json 日志信息,适当舍弃无用字段节省空间资源。
  • name:收集日志中对应主机的名字,建议 name 这里设置为 IP,便于区分多台主机的日志信息。

每一个不同的输出都用不同的配置项,还有很多功能性能的配置项,比如

合并规则

multiline:
        pattern: '^[[:space:]]+(at|\.{3})\b|^Caused by:'
        negate:  false
        match:   after

常用的合并规则

[0-9]{4}-[0-9]{2}-[0-9]{2} 按照yyyy-mm-dd时间戳合并,日志不是以这个时间戳开头的都合并
[[0-9]{4}-[0-9]{2}-[0-9]{2} 按照[yyyy-mm-dd时间戳合并,日志不是以这个时间戳开头的都合并
* 不合并----不合并最后处理出来的结果就是没有这一段的配置

过滤规则

include_lines: ["FATAL","ERROR","WARN","INFO","fatal","error","warn","info","Fatal","Error","Warn","Info"]

正常用于日志级别的选择,比如上面只是采集包含这些字段的日志,其实也就是INFO级别的日志的采集。当然还是使用exclude_lines,不包含,和白名单黑名单一样的概念。

资源限制

logging.level: debug    日志级别
max_procs: 2            cpu核数限制

queue:
      mem:
        events: 32768               pipeline队列长度
        flush.min_events: 1024

采集配置

close_eof: false
close_inactive: 5m(5分钟没有活跃,就会停止采集)
close_removed: false
close_renamed: false
ignore_older: 48h(即将开启的采集文件如果大于48H,就不要采集了)
# State options
clean_removed: true
clean_inactive: 72h(如果文件72h没有活跃就删除采集记录)

输出配置

output.kafka:
      topic: "%{[topic]}"
      version: "0.8.2.2"
      codec.format:
        ignoreNotFound: true
        string: '%{[message]}'
      metadata:
        retry.max: 2
        full: true
      worker: 10(The number of concurrent load-balanced Kafka output workers.)
      channel_buffer_size: 30000(每一个连接可以缓存消息的长度,默认是256)
      ##bulk_max_size: 20480(一次发送kafka请求最多的事件数量,默认是2048)
      #keep_alive: 0(是否保持连接,默认0,不保持)
      ##required_acks: 0(代表kafka是否需要等待回复,有1,0,-1,默认是1,需要等待主节点回复)
      compression: none(代表压缩级别,none代表不压缩,默认压缩是gzip)

更多详细的配置可以去查看完整的配置文件说明。

启动

调试模式下采用:终端启动(退出终端或 ctrl+c 会退出运行)

./filebeat -e -c filebeat.yml

线上环境配合 error 级别使用:以后台守护进程启动启动 filebeats

nohup ./filebeat -e -c filebeat.yml &

零输出启动(不推荐):将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。

nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &

停止运行 FileBeat 进程

ps -ef | grep filebeat
Kill -9 线程号

特性

  1. 采集路径可以使用正则表达式,来采集当前目录下所有的文件,包括子目录下,比如/k8s_log/*/.log*

  2. 可以动态加载,可以在配置文件中配置reload的时间,filebeat本身自动加载,但是这个加载不能更新output

  3. filebeat本身日志支持备份切换,默认一个文件10M,保留8个文件。

  4. filebaet支持句柄保持和checkpoint功能。

  5. filebeat输出到kafka可以不支持多个kafka集群,可以改造多pipeline来发送到不同的Kafka集群。

原理

filebeat创建一个beater

filebaet创建了一个beater实例启动

  • 1.启动了一个Crawler,用于
    • 1.启动静态的 input (写在主配置里的)
    • 2.启动 reloader,动态的 input 由 reloader 管理。
    • 3.启动Registrar
  • 2.每个input又启动了Harvester,Harvester就是负责采集日志
  • 3.Harvester 连接 pipeline 时,调用 outlet factory 创建一个 outleter,Outleter 封装了 pipeline 的 producer,调用 outleter OnEvent 方法发送数据到 pipeline
  • 4.Registrar 负责 checkpoint 文件的更新
  • 5.启动Pipeline 模块,Pipeline 是一个大的功能模块,包含 queue, outputController, consumer, output
    • 1.Queue (memqueue) 真正的 queue 对象时 Broker。创建 broker 时启动事件循环,负责处理 publish 和 consume 等各种请求
      • 创建 broker
      • 事件循环
      • Consumer
      • Producer 真实创建的是 ackProducer
    • 2.Output Controller 负责管理 consumer。Consumer 负责从 queue 中获取 batch,然后发送到 workQueue(chan publisher.Batch)。
    • 3.Consumer Consumer 会启动多个 outputWorker。outputWorker 负责从 workQueue 获取 batch,然后调用 output client 的 Publish 方法发送出去。
    • 4.Retryer Retry 负责重试发送失败的请求
    • 5.Output(kafka) Connect 调用 sarama 库,创建一个 kafka AsyncProducer。连接时会启动两个循环,处理成功响应和失败响应。
    • 6.msgRef msgRef 注入到发送给 AsyncProducer 的事件中。在处理响应的时候回调。如果成功就调用 batch.ACK()。失败就调用 batch.OnRetry 重试。

核心代码模块

filebeat

beater 启动 https://github.com/elastic/beats/blob/v6.3.2/filebeat/beater/filebeat.go#L273

Crawler 启动时加载配置文件中的 inputs。启动 reloader,定期加载动态配置文件目录中的 inputs。启动Registrar,负责checkpoint https://github.com/elastic/beats/blob/v6.3.2/filebeat/crawler/crawler.go#L33

  • 启动静态的 input (写在主配置里的)
  • 启动 reloader,动态的 input 由 reloader 管理

InputInput 是一个用来包装 harvester 的数据结构,对外提供生命周期管理接口。Input 在建立起来时,会调用 pipeline 的 ConnectWith 方法获取一个 client,用于发送 events。 https://github.com/elastic/beats/blob/v6.3.2/filebeat/input/input.go#L35

Harvester 负责采集日志 https://github.com/elastic/beats/blob/v6.3.2/filebeat/input/log/harvester.go#L88

Outleter: Harvester 连接 pipeline 时,调用 outlet factory 创建一个 outleter Outleter 封装了 pipeline 的 producer,调用 outleter OnEvent 方法发送数据到 pipeline

Outleter 创建 https://github.com/elastic/beats/blob/v6.3.2/filebeat/channel/factory.go#L70

Registrar 负责 checkpoint 文件的更新 https://github.com/elastic/beats/blob/v6.3.2/filebeat/registrar/registrar.go#L19

Libbeat

Pipeline 模块启动 https://github.com/elastic/beats/blob/v6.3.2/libbeat/publisher/pipeline/module.go#L30

加载 output https://github.com/elastic/beats/blob/v6.3.2/libbeat/publisher/pipeline/module.go#L85

Pipeline Pipeline 包含 queue, output controller, consumer, output

工作机制:

  • 队列容量为 Events
  • 当队列中的 events 数量大于 FlushMinEvents 开始 flush
  • 当队列中有 events 并且离上一次 flush 过了 FlushTimeout 时间,开始 flush

这边简单梳理了filebeat的模块,核心的原理包括一些beats的原理可以看我写的另一篇filebeat原理