网海过客
www.chinasa.net
ELK 日志分析实例
一、ELK-web日志分析
二、ELK-MySQL 慢查询日志分析
三、ELK-SSH登陆日志分析
四、ELK-vsftpd 日志分析
一、ELK-web日志分析
通过logstash grok正则将web日志过滤出来,输出到Elasticsearch 搜索引擎里,通过Kibana前端展示。
1.1、创建logstash grok 过滤规则
#cat ./logstahs/patterns/nginx
NGINXACCESS %{IPORHOST:remote_addr} – – \[%{HTTPDATE:time_local}\] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{INT:status} %{INT:body_bytes_sent} %{QS:http_referer} %{QS:http_user_agent}
1.2、创建logstash web日志配置文件
#cat ./logstash/conf/ngx_log.conf
input { file { type => "nginx_log" path => "/opt/nginx/logs/access.log" } } filter { if [type] == "nginx_log" { grok { match => { "message" => "%{NGINXACCESS}" } } geoip { source => "remote_addr" target => "geoip" database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat" add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] } mutate { convert => [ "[geoip][coordinates]","float", "body_bytes_sent","float", \ "body_bytes_sent.raw","float"] } } } output { stdout { codec => rubydebug } elasticsearch { hosts => "elk.test.com:9200" index => "ngx_log-%{+YYYY.MM}" } }
1.3、创建Kibana图形
统计httpcode状态码
选择【Visualize】菜单,选择 【Pie chart】选项。字段选择status.raw,如下图所示:
统计访问前50 IP
选择【Visualize】菜单,选择 【Vertical bar chart】选项。字段选择remote_addr.raw,如下图所示:
统计 403-405 状态码
选择【Visualize】菜单,选择 【Line chart】选项。字段选择status.raw,如下图所示:
其它图形统计,就不详细举例了。
详细图形展示如下:
二、ELK-MySQL 慢查询日志分析
2.1、创建logstash grok 过滤规则
#cat ./logstahs/patterns/mysql_slow
MYSQLSLOW "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOST:client_hostname}|) \[(%{IP:client_ip}|)\]", "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*Last_errno: \ %{NUMBER:last_errno:int} \s*Killed: %{NUMBER:killed:int}", "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \ \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}", "# Bytes_sent: %{NUMBER:bytes_sent:int}", "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:mysql_query}"
2.2、创建logstash MySQL-Slow慢查询配置文件
#cat ./logstash/conf/MySQL-Slow.conf
input { file { type => "mysql-slow" path => "/var/log/mysql_slow_log.log" } } filter { if [type] == "mysql-slow" { multiline { pattern => "^#|^SET" negate => true what => "previous" } grok { match => { "message" => "%{MYSQLSLOW}" } } mutate { gsub => [ "mysql_query", "\n", " " ] gsub => [ "mysql_query", " ", " " ] add_tag => "mutated_mysql_query" } multiline { pattern => "(# User|# Thread|# Query|# Time|# Bytes)" negate => false what => "next" } date { match => [ "timestamp","UNIX" ] } mutate { remove_field => [ "timestamp" ] } } } output { stdout { codec => rubydebug } elasticsearch { hosts => "elk.test.com:9200" index => "mysql_slow_log-%{+YYYY.MM}" } }
2.3、详细图形展示如下:
三、ELK-SSH登陆日志分析
3.1、创建logstash grok 过滤规则
#cat ./logstahs/patterns/ssh
SECURELOG %{WORD:program}\[%{DATA:pid}\]: %{WORD:status} password for ?(invalid user)? %{WORD:USER} from %{DATA:IP} port
SYSLOGPAMSESSION %{SYSLOGBASE} (?=%{GREEDYDATA:message})%{WORD:pam_module}\(%{DATA:pam_caller}\): session %{WORD:pam_session_state} for user %{USERNAME:username}(?: by %{GREEDYDATA:pam_by})?
SYSLOGBASE2 (?:%{SYSLOGTIMESTAMP:timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
3.2、创建logstash ssh配置文件
#cat ./logstash/conf/ssh.conf
input { file { type => "seclog" path => "/var/log/secure" } } filter { if [type] == "seclog" { grok { match => { "message" => "%{SYSLOGPAMSESSION}" } match => { "message" => "%{SECURELOG}" } match => { "message" => "%{SYSLOGBASE2}" } } geoip { source => "IP" fields => ["city_name"] database => "/opt/logstash-2.0.0/conf/GeoLiteCity.dat" } if ([status] == "Accepted") { mutate { add_tag => ["Success"] } } else if ([status] == "Failed") { mutate { add_tag => ["Failed"] } } } output { stdout { codec => rubydebug } elasticsearch { hosts => "elk.test.com:9200" index => "sshd_log-%{+YYYY.MM}" } }
PS:添加状态标签,便于Kibana 统计
if ([status] == "Accepted") { #判断字段[status]值,匹配[Accepted] mutate { add_tag => ["Success"] #添加标签[Success] } } else if ([status] == "Failed") { #判断字段[status]值,匹配[Failed] mutate { add_tag => ["Failed"] #添加标签[Failed] } }
3.3、详细图形展示如下:
四、ELK-vsftpd 日志分析
4.1、创建logstash grok 过滤规则
#cat ./logstahs/patterns/vsftpd
VSFTPDCONNECT \[pid %{WORD:pid}\] %{WORD:action}: Client \"%{DATA:IP}\"
VSFTPDLOGIN \[pid %{WORD:pid}\] \[%{WORD:user}\] %{WORD:status} %{WORD:action}: Client \"%{DATA:IP}\"VSFTPDACTION \[pid %{DATA:pid}\] \[%{DATA:user}\] %{WORD:status} %{WORD:action}: Client \"%{DATA:IP}\", \"%{DATA:file}\", %{DATA:bytes} bytes, %{DATA:Kbyte_sec}Kbyte/sec
4.2、创建logstash vsftpd配置文件
#cat ./logstash/conf/vsftpd.conf
input { file { type => "vsftpd_log" path => "/var/log/vsftpd.log" } } filter { if [type] == "vsftpd_log" { grok { match => { "message" => "%{VSFTPDACTION}" } match => { "message" => "%{VSFTPDLOGIN}" } match => { "message" => "%{VSFTPDCONNECT}" } } } } output { stdout { codec => rubydebug } elasticsearch { hosts => "elk.test.com:9200" index => "vsftpd_log-%{+YYYY.MM}" } }
4.3、详细图形展示如下:
原创文章,作者:wubin,如若转载,请注明出处:http://www.178linux.com/17395