linux系统elk组件logstash部署
Logstash部署
- 安装配置Logstash
- 测试文件
- 配置
- 手动输入日志数据
- 数据链路
- 手动输入数据,并存储到es
- 数据链路
- 自定义日志1
- 数据链路
- 自定义日志2
- 数据链路
- nginx access 日志
- 数据链路
- nginx error日志
- 数据链路
- filebate 传输给 logstash
- filebeat 日志模板
安装配置Logstash
Logstash运行同样依赖jdk
()tar zxf /usr/local/package/logstash-7.13.2.tar.gz -C /usr/local/ ./logstash-7.13.2/bin/logstash -f logstash-7.13.2/conf/set.conf //启动
测试文件
标准输入=>标准输出
1、启动logstash:-f 指定配置文件
()2、logstash启动后,直接进行数据输入
3、logstash处理后,直接进行返回
input { stdin {} } output { stdout { codec => rubydebug } }
标准输入=>标准输出及es集群
1、启动logstash
2、启动后直接在终端输入数据
3、数据会由logstash处理后返回并存储到es集群中
input { stdin {} } output { stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } }
端口输入=>字段匹配=>标准输出及es集群
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端并存储到es
input { #输入 tcp { port => 8888 } } filter { #数据处理 grok { match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} } } output { #输出 stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } } # yum install -y nc # free -m |awk 'NF==2{print ,}' |nc logstash_ip 8888
文件输入=>字段匹配及修改时间格式修改=>es集群
1、直接将本地的日志数据拉去到logstash当中
2、将日志进行处理后存储到es
input { file { type => "nginx-log" path => "/var/log/nginx/error.log" start_position => "beginning" # 此参数表示在第一次读取日志时从头读取 # sincedb_path => "自定义位置" # 此参数记录了读取日志的位置,默认在 data/plugins/inputs/file/.sincedb* } } filter { grok { match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } } output { if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'logstash-audit_log-%{+YYYY-MM-dd}' } } }
filebeat => 字段匹配 => 标准输出及es
input { beats { port => 5000 } } filter { grok { match => {"message" => "%{IPV4:cip}"} } } output { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'test-%{+YYYY-MM-dd}' } stdout { codec => rubydebug } }
配置
创建目录,我们将所有input、filter、output配置文件全部放到该目录中。
[root@elk ~]# mkdir -p /usr/local/logstash-7.13.2/etc/conf.d [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/input.conf input { kafka { type => "audit_log" codec => "json" topics => "nginx" decorate_events => true bootstrap_servers => "10.3.145.41:9092, 10.3.145.42:9092, 10.3.145.43:9092" } } [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/filter.conf filter { json { # 如果日志原格式是json的,需要用json插件处理 source => "message" target => "nginx" # 组名 } } [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/output.conf output { if [type] == "audit_log" { elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-audit_log-%{+YYYY-MM-dd}' } } }
启动
[root@elk ~]# cd /usr/local/logstash-7.13.2 [root@elk ~]# nohup bin/logstash -f etc/conf.d/ --config.reload.automatic &
手动输入日志数据
数据链路
1、启动logstash
2、logstash启动后,直接进行数据输入
3、logstash处理后,直接进行返回
input { stdin {} } output { stdout { codec => rubydebug } }
手动输入数据,并存储到es
数据链路
1、启动logstash
2、启动后直接在终端输入数据
3、数据会由logstash处理后返回并存储到es集群中
input { stdin {} } output { stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } }
自定义日志1
数据链路
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端并存储到es
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } } # yum install -y nc # free -m |awk 'NF==2{print ,}' |nc logstash_ip 8888
自定义日志2
数据链路
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"} } } output { stdout { codec => rubydebug } } # cat /etc/passwd | nc logstash_ip 8888
nginx access 日志
数据链路
1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中
2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic
3、logstash 配置filter 对数据进行清洗
4、将数据通过 output 存储到es指定index当中
5、kibana 添加es 索引,展示数据
input { kafka { type => "audit_log" codec => "json" topics => "haha" #与filebeat中配置kafka一致 #decorate_events => true #enable_auto_commit => true auto_offset_reset => "earliest" bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"] } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } geoip { source => "lan_ip" } } output { if [type] == "audit_log" { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"] index => 'tt-%{+YYYY-MM-dd}' } } } #filebeat 配置 filebeat.prospectors: - input_type: log paths: - /opt/logs/server/nginx.log json.keys_under_root: true json.add_error_key: true json.message_key: log output.kafka: hosts: ["10.12.153.8:9092","10.12.153.9:9092","10.12.153.10:9092"] topic: 'nginx' # nginx 配置 log_format main '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}'; access_log /var/log/nginx/access.log main;
nginx error日志
数据链路
1、直接将本地的日志数据拉去到logstash当中
2、将日志进行处理后存储到es
input { file { type => "nginx-log" path => "/var/log/nginx/error.log" start_position => "beginning" } } filter { grok { match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } } output { if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'logstash-audit_log-%{+YYYY-MM-dd}' } } }
filebate 传输给 logstash
input { beats { port => 5000 } } filter { grok { match => {"message" => "%{IPV4:cip}"} } } output { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'test-%{+YYYY-MM-dd}' } stdout { codec => rubydebug } } filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.logstash: hosts: ["192.168.52.134:5000"]
filebeat 日志模板
filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.kafka: hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"] topic: haha partition.round_robin: reachable_only: true required_acks: 1