linux系统elk组件logstash部署

小明 2025-04-29 17:32:32 5

Logstash部署

      • 安装配置Logstash
      • 测试文件
        • 配置
        • 手动输入日志数据
          • 数据链路
          • 手动输入数据,并存储到es
            • 数据链路
            • 自定义日志1
              • 数据链路
              • 自定义日志2
                • 数据链路
                • nginx access 日志
                  • 数据链路
                  • nginx error日志
                    • 数据链路
                    • filebate 传输给 logstash
                    • filebeat 日志模板

                      安装配置Logstash

                      Logstash运行同样依赖jdk

                      ()
                      tar zxf /usr/local/package/logstash-7.13.2.tar.gz -C /usr/local/
                      ./logstash-7.13.2/bin/logstash -f logstash-7.13.2/conf/set.conf    //启动
                      

                      测试文件

                      标准输入=>标准输出

                      1、启动logstash:-f 指定配置文件

                      ()

                      2、logstash启动后,直接进行数据输入

                      3、logstash处理后,直接进行返回

                      input {
                      	stdin {}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      }
                      

                      标准输入=>标准输出及es集群

                      1、启动logstash

                      2、启动后直接在终端输入数据

                      3、数据会由logstash处理后返回并存储到es集群中

                      input {
                      	stdin {}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      	elasticsearch {
                            hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
                            index => 'logstash-debug-%{+YYYY-MM-dd}'
                          }
                      }
                      

                      端口输入=>字段匹配=>标准输出及es集群

                      1、由tcp 的8888端口将日志发送到logstash

                      2、数据被grok进行正则匹配处理

                      3、处理后,数据将被打印到终端并存储到es

                      input {  #输入
                      	tcp {
                      		port => 8888
                      	}
                      }
                      filter {  #数据处理
                      	grok {
                      		match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} 
                      			
                      	}
                      }
                      output { #输出
                      	stdout {
                      		codec => rubydebug
                      	}
                      	elasticsearch {
                            hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
                            index => 'logstash-debug-%{+YYYY-MM-dd}'
                          }
                      }
                      # yum install -y nc
                      # free -m |awk 'NF==2{print ,}' |nc logstash_ip 8888
                      

                      文件输入=>字段匹配及修改时间格式修改=>es集群

                      1、直接将本地的日志数据拉去到logstash当中

                      2、将日志进行处理后存储到es

                      input {
                      	file {
                      		type => "nginx-log"
                      		path => "/var/log/nginx/error.log"
                      		start_position => "beginning" # 此参数表示在第一次读取日志时从头读取
                      		# sincedb_path => "自定义位置"  # 此参数记录了读取日志的位置,默认在 data/plugins/inputs/file/.sincedb*
                      	}
                      }
                      filter {
                          grok {
                              match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    
                          }    
                          date {
                              match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
                          }    
                      }
                      output {
                        if [type] == "nginx-log" {
                              elasticsearch {
                            		hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
                            		index => 'logstash-audit_log-%{+YYYY-MM-dd}'
                            }
                          }
                        }
                      

                      filebeat => 字段匹配 => 标准输出及es

                      input {
                        beats {
                          port => 5000
                        }
                      }
                      filter {
                      	grok {
                      		match => {"message" => "%{IPV4:cip}"}	
                      	}
                      }
                      output {
                              elasticsearch {
                            		hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
                            		index => 'test-%{+YYYY-MM-dd}'
                            }
                      	stdout { codec => rubydebug }
                      }
                      
                      配置

                      创建目录,我们将所有input、filter、output配置文件全部放到该目录中。

                      [root@elk ~]# mkdir -p /usr/local/logstash-7.13.2/etc/conf.d
                      [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/input.conf
                      input { 
                      kafka {
                          type => "audit_log"
                          codec => "json"
                          topics => "nginx"
                          decorate_events => true
                          bootstrap_servers => "10.3.145.41:9092, 10.3.145.42:9092, 10.3.145.43:9092"
                        }
                      }
                      [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/filter.conf
                      filter {
                      	json { # 如果日志原格式是json的,需要用json插件处理
                      		source => "message"
                      		target => "nginx" # 组名
                      	}
                      }
                      [root@elk ~]# vim /usr/local/logstash-7.13.2/etc/conf.d/output.conf
                      output {
                        if [type] == "audit_log" {
                            elasticsearch {
                            hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
                            index => 'logstash-audit_log-%{+YYYY-MM-dd}'
                            }
                          }
                        }
                      

                      启动

                      [root@elk ~]# cd /usr/local/logstash-7.13.2
                      [root@elk ~]# nohup bin/logstash -f etc/conf.d/  --config.reload.automatic &
                      
                      手动输入日志数据
                      数据链路

                      1、启动logstash

                      2、logstash启动后,直接进行数据输入

                      3、logstash处理后,直接进行返回

                      input {
                      	stdin {}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      }
                      
                      手动输入数据,并存储到es
                      数据链路

                      1、启动logstash

                      2、启动后直接在终端输入数据

                      3、数据会由logstash处理后返回并存储到es集群中

                      input {
                      	stdin {}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      	elasticsearch {
                            hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
                            index => 'logstash-debug-%{+YYYY-MM-dd}'
                          }
                      }
                      
                      自定义日志1
                      数据链路

                      1、由tcp 的8888端口将日志发送到logstash

                      2、数据被grok进行正则匹配处理

                      3、处理后,数据将被打印到终端并存储到es

                      input {
                      	tcp {
                      		port => 8888
                      	}
                      }
                      filter {
                      	grok {
                      		match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} 	
                      	}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      	elasticsearch {
                            hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
                            index => 'logstash-debug-%{+YYYY-MM-dd}'
                          }
                      }
                      # yum install -y nc
                      # free -m |awk 'NF==2{print ,}' |nc logstash_ip 8888
                      
                      自定义日志2
                      数据链路

                      1、由tcp 的8888端口将日志发送到logstash

                      2、数据被grok进行正则匹配处理

                      3、处理后,数据将被打印到终端

                      input {
                      	tcp {
                      		port => 8888
                      	}
                      }
                      filter {
                      	grok {
                      		match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"}
                      			
                      	}
                      }
                      output {
                      	stdout {
                      		codec => rubydebug
                      	}
                      }
                      # cat /etc/passwd | nc logstash_ip 8888
                      
                      nginx access 日志
                      数据链路

                      1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中

                      2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic

                      3、logstash 配置filter 对数据进行清洗

                      4、将数据通过 output 存储到es指定index当中

                      5、kibana 添加es 索引,展示数据

                      input {
                          kafka {
                              type => "audit_log"
                              codec => "json"
                              topics => "haha"    #与filebeat中配置kafka一致
                              #decorate_events => true
                              #enable_auto_commit => true
                              auto_offset_reset => "earliest"
                              bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"]
                            }
                      }
                      filter {
                          grok {
                              match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"}    
                          }    
                          date {
                              match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
                          }    
                          geoip {
                              source => "lan_ip"    
                          }
                      }
                      output {
                        if [type] == "audit_log" {
                              stdout {
                                      codec => rubydebug
                              }
                            elasticsearch {
                            hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"]
                            index => 'tt-%{+YYYY-MM-dd}'
                            }
                          }
                        }
                        
                      #filebeat 配置
                        filebeat.prospectors:
                      - input_type: log
                        paths:
                          -  /opt/logs/server/nginx.log
                        json.keys_under_root: true
                        json.add_error_key: true
                        json.message_key: log
                      output.kafka:   
                        hosts: ["10.12.153.8:9092","10.12.153.9:9092","10.12.153.10:9092"]
                        topic: 'nginx'
                      # nginx 配置
                        log_format main        '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}';
                          access_log  /var/log/nginx/access.log  main;
                      
                      nginx error日志
                      数据链路

                      1、直接将本地的日志数据拉去到logstash当中

                      2、将日志进行处理后存储到es

                      input {
                      	file {
                      		type => "nginx-log"
                      		path => "/var/log/nginx/error.log"
                      		start_position => "beginning"
                      	}
                      }
                      filter {
                          grok {
                              match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    
                          }    
                          date {
                              match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    
                          }    
                      }
                      output {
                        if [type] == "nginx-log" {
                              elasticsearch {
                            		hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
                            		index => 'logstash-audit_log-%{+YYYY-MM-dd}'
                            }
                          }
                        }
                      
                      filebate 传输给 logstash
                      input {
                        beats {
                          port => 5000
                        }
                      }
                      filter {
                      	grok {
                      		match => {"message" => "%{IPV4:cip}"}	
                      	}
                      }
                      output {
                              elasticsearch {
                            		hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]
                            		index => 'test-%{+YYYY-MM-dd}'
                            }
                      	stdout { codec => rubydebug }
                      }
                      filebeat.inputs:
                      - type: log
                        enabled: true
                        paths:
                          - /var/log/nginx/access.log
                      output.logstash:
                        hosts: ["192.168.52.134:5000"]
                      
                      filebeat 日志模板
                      filebeat.inputs:
                      - type: log
                        enabled: true
                        paths:
                          - /var/log/nginx/access.log
                          
                          
                      output.kafka:
                        hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"]
                        topic: haha
                        partition.round_robin:
                          reachable_only: true
                        required_acks: 1
                      
The End
微信