ELK 日志分析实例

网海过客
www.chinasa.net

ELK 日志分析实例
一、ELK-web日志分析
二、ELK-MySQL 慢查询日志分析
三、ELK-SSH登陆日志分析
四、ELK-vsftpd 日志分析

一、ELK-web日志分析

通过logstash grok正则将web日志过滤出来,输出到Elasticsearch 搜索引擎里,通过Kibana前端展示。

 1.png

1.1、创建logstash grok 过滤规则

#cat ./logstahs/patterns/nginx

NGINXACCESS %{IPORHOST:remote_addr} – – \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}

1.2、创建logstash web日志配置文件

#cat ./logstash/conf/ngx_log.conf

input {
        file {
                type => "nginx_log"
                path => "/opt/nginx/logs/access.log"
        }
}  
filter {
  if [type] == "nginx_log" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
    }
    geoip {
      source => "remote_addr"
      target => "geoip"
      database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat"
      add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
    mutate {
      convert => [ "[geoip][coordinates]","float", "body_bytes_sent","float", \
          "body_bytes_sent.raw","float"]
    }
  }
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "elk.test.com:9200"
        index => "ngx_log-%{+YYYY.MM}"
    }
}

1.3、创建Kibana图形

统计httpcode状态码

选择【Visualize】菜单,选择 Pie chart】选项。字段选择status.raw,如下图所示:

2.png

统计访问前50 IP

选择【Visualize】菜单,选择 Vertical bar chart】选项。字段选择remote_addr.raw,如下图所示:

3.png


统计 403-405 状态码

选择【Visualize】菜单,选择 Line chart】选项。字段选择status.raw,如下图所示:

4.png

其它图形统计,就不详细举例了。

详细图形展示如下:

5.png

6.png

7.png

二、ELK-MySQL 慢查询日志分析

2.1、创建logstash grok 过滤规则

#cat ./logstahs/patterns/mysql_slow

MYSQLSLOW "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOST:client_hostname}|) \[(%{IP:client_ip}|)\]",
"# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*Last_errno: \
%{NUMBER:last_errno:int} \s*Killed: %{NUMBER:killed:int}",
"# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \
\s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}",
"# Bytes_sent: %{NUMBER:bytes_sent:int}",
"(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:mysql_query}"

2.2、创建logstash MySQL-Slow慢查询配置文件

#cat ./logstash/conf/MySQL-Slow.conf

input {
  file {
    type => "mysql-slow"
    path => "/var/log/mysql_slow_log.log"  
  }
}  
filter {
if [type] == "mysql-slow" {  
multiline {
    pattern => "^#|^SET"
    negate => true
    what => "previous"
}  
grok {
    match => { "message" => "%{MYSQLSLOW}"  }
}
mutate {
         gsub => [ "mysql_query", "\n", " " ]
         gsub => [ "mysql_query", "  ", " " ]
         add_tag => "mutated_mysql_query"
}
multiline {
    pattern => "(# User|# Thread|# Query|# Time|# Bytes)"
    negate => false
    what => "next"
}
date {
    match => [ "timestamp","UNIX" ]
}
mutate {
    remove_field => [ "timestamp" ]
}
}
}  
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "elk.test.com:9200"
        index => "mysql_slow_log-%{+YYYY.MM}"
    }
}

2.3、详细图形展示如下:

8.png

三、ELK-SSH登陆日志分析

3.1、创建logstash grok 过滤规则

#cat ./logstahs/patterns/ssh

SECURELOG %{WORD:program}\[%{DATA:pid}\]: %{WORD:status} password for ?(invalid user)? %{WORD:USER} from %{DATA:IP} port

SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})?

SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:


3.2、创建logstash ssh配置文件

#cat ./logstash/conf/ssh.conf

input {
    file {
        type => "seclog"
        path => "/var/log/secure"
   }
}
filter {
if [type] == "seclog" {
    grok {
        match => { "message" => "%{SYSLOGPAMSESSION}" }
        match => { "message" => "%{SECURELOG}" }
        match => { "message" => "%{SYSLOGBASE2}" }
    }
    geoip {
        source => "IP"
        fields => ["city_name"]
        database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat"
    }
    if ([status] == "Accepted") {
        mutate {
        add_tag => ["Success"]
        }
    }
    else if ([status] == "Failed") {
        mutate {
        add_tag => ["Failed"]
        }
    }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "elk.test.com:9200"
        index => "sshd_log-%{+YYYY.MM}"
    }
}

PS:添加状态标签,便于Kibana 统计

if ([status] == "Accepted") {              #判断字段[status]值,匹配[Accepted]
        mutate {
        add_tag => ["Success"]      #添加标签[Success]
        }
}
else if ([status] == "Failed") {           #判断字段[status]值,匹配[Failed]
        mutate {
        add_tag => ["Failed"]       #添加标签[Failed]
        }
}


3.3、详细图形展示如下:

9.png

四、ELK-vsftpd 日志分析

4.1、创建logstash grok 过滤规则

#cat ./logstahs/patterns/vsftpd

VSFTPDCONNECT \[pid %{WORD:pid}\] %{WORD:action}: Client \"%{DATA:IP}\"
VSFTPDLOGIN \[pid %{WORD:pid}\] \[%{WORD:user}\] %{WORD:status} %{WORD:action}: Client \"%{DATA:IP}\"VSFTPDACTION \[pid %{DATA:pid}\] \[%{DATA:user}\] %{WORD:status} %{WORD:action}: Client \"%{DATA:IP}\", \"%{DATA:file}\", %{DATA:bytes} bytes, %{DATA:Kbyte_sec}Kbyte/sec 

4.2、创建logstash vsftpd配置文件

#cat ./logstash/conf/vsftpd.conf

input {
    file {
        type => "vsftpd_log"
        path => "/var/log/vsftpd.log"
    }
}
filter {
    if [type] == "vsftpd_log" {
        grok {
            match => { "message" => "%{VSFTPDACTION}" }
            match => { "message" => "%{VSFTPDLOGIN}" }
            match => { "message" => "%{VSFTPDCONNECT}" }
        }
    }
}
output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "elk.test.com:9200"
        index => "vsftpd_log-%{+YYYY.MM}"
    }
}

4.3、详细图形展示如下:

10.png

原创文章,作者:wubin,如若转载,请注明出处:http://www.178linux.com/17395

(24)
wubinwubin
上一篇 2016-06-03 09:19
下一篇 2016-06-03

相关推荐

  • RAID

    RAID:       Redunant ARRAYS OF Inexpensive Disks       廉价磁盘阵列 Independent        Berkeley: A case for Redundent Arrays of Inexpens…

    Linux干货 2016-12-23
  • 深入理解java异常处理机制

     1. 引子        try…catch…finally恐怕是大家再熟悉不过的语句了,而且感觉用起来也是很简单,逻辑上似乎也是很容易理解。不过,我亲自体验的“教训”告诉我,这个东西可不是想象中的那么简单、听话。不信?那你看看下面的代码,“猜猜”它执行后的结果会是什么?不要往后看答案、也不许执行代码看真正…

    Linux干货 2015-04-12
  • PHP进阶知识总结

    周末梳理了下这段时间看书的一些知识点,进步的过程不仅要实践,还要安排多看书、思考、总结。 只针对知识点进行了罗列和简单说明,很多细节还未整理好,待后面再专门详细写。   基础易忽略概念   PHP是一个支持面向对象开发的语言,而不是一个纯面向对象的语言 PHP5中保留了对var的支持,但会将var自动转换为public 类型检查函数: i…

    Linux干货 2015-03-10
  • 抓包获取QQ好友IP地址

    作者:网海过客 原文连接:https://www.chinasa.net/archives/326.html 原理:通过抓包软件,抓取QQ进程,向QQ好向发送UDP数据包,获取QQ好友IP地址 抓包软件:科来网络分析系统 步骤: 1、打开抓包软件,选择网卡,本地进程分析。 2、向QQ好友发起语音通话 3、在抓包软件里,找到QQ进程,数据包,过滤UDP协议,在…

    Linux干货 2017-06-30
  • Linux系统的基础命令及事例讲解

    Linux操作系统是系统运维领域里逐步强大,逐步受到关注的一款开源系统,它包含里大量的命令及程序,以下是我在这一周内学到的几种命令及获取帮助命令的方法: 1  tty    tty是查看终端设备的一个命令,输入之后系统会提示如下:    /dev/pts/1   其中pts代表是在命令行接口下终端类…

    Linux干货 2016-10-30
  • corosync+pacemaker+pcs 使用ansible配置高可用LAMP构架

    前言: 这篇博客的实验主要是配置两个节点基于corosync + pacemaker的高考用lamp, 是我搞得最痛苦的一次,并且结果还不稳定。主要问题是corosync 1.x + pacemaker 时,如果把pacemaker当成插件使用,尝试很多次都不成功,后来把pacemaker当成半独立的服务进行配置。 但是如此一来crm就没办法进行资源配置,只…

    Linux干货 2016-01-27