一、日志分析项目
1、概述
生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载,健康情况,可以分析客服的分部情况、客户行为,还可以做出预测等。
一般采集流程
日志产出——》采集(logstash、flume、scribe)——》存储——》分析——》存储(数据库、NoSQL)——》可视化
开源实时日志分析ELK平台
Logstash收集日志,并存放到ElasticSearch集群中,Kibana则是从ES集群中查询数据生成图表,返回到浏览器的端。
2、分析的前提
1)半结构化数据
日志是半结构化数据,是有组织的,有格式的数据,可以分割成行和列,就可以当做表理解和处理,当然也可以分析里面的数据。
2)结构化数据
结构化数据就是数据库里面的数据,定义行列还可以定义数据的类型。Html。
3)结构化数据
非结构化数据,音频和视频等。
3、文本分析
日志是文本文件,需要依赖文件IO、字符串操作、正则表达式技术。
通过这些就可以将日志中的数据提取出来。
183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)”
里面的数据对后期的分析都是必须的。
4、提取数据
- 按照空格分隔
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
for word in line.split():
print(word)
切割情况:
183.60.212.153
–
–
[19/Feb/2013:10:23:29
+0800]
“GET
/o2o/media.html?menu=3
HTTP/1.1”
200
16691
“-”
“Mozilla/5.0
(compatible;
EasouSpider;
+http://www.easou.com/search/spider.html)”
缺点:没有按照要求的格式分隔好,所需要的数据多都是按照空格分隔开了。所以,定义的时候不选用在文件中出现的字符就可以省下好多事。
改进:依旧按照空格分隔,但是遇到双引号、中括号特殊处理一下。
先按照空格切分,然后迭代一个个字符,如果发现是[ 或者” ,则就不判断是是否是空格,直到发现] 或者”结尾等,这个区间获取的就是时间等数据。
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” \t”)
def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : #遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: #遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
print(list(makekey(line)))
[‘183.60.212.153’, ‘-‘, ‘-‘, ’19/Feb/2013:10:23:29 +0800’, ‘GET /o2o/media.html?menu=3 HTTP/1.1’, ‘200’, ‘16691’, ‘-‘, ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’]
5、类型转换
文件中的数据是有类型的,例如时间、状态吗,对不同的文件进行不同的类型转换.自定义转换等。
1)时间转换
19/Feb/2013:10:23:29 +0800 对应的格式是
%d/%b/%Y:%H:%M:%S %z
使用的函数应该是datetime类中的strptime方法。
import datetime
timestr = ’19/Feb/2013:10:23:29 +0800′
def conver_time(timestr):
return datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
print(conver_time(timestr))
转换结果:
2013-02-19 10:23:29+08:00
利用lanbda可以转换为一行的函数。
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’)
2)状态码和字节数
都是int整型,使用int函数进行转换。
3)请求信息的解析
‘GET /o2o/media.html?menu=3 HTTP/1.1′
request = ‘GET /o2o/media.html?menu=3 HTTP/1.1’
def get_request(request:str):
return dict(zip([‘method’,’url’,’protocol’],request.split()))
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split()))
利用zip函数组建字典,三项利用split()空格进行分割。
输出的结果:
{‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1′}
4)映射
对每一个字段进行命名,然后与值和类型转换的方法对应,解析每一行是必须要有顺序的。
import datetime
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
chars = set(” \t”)
def makekey(line:str):
start = 0
skip =False
for i,c in enumerate(line):
if not skip and c in ‘”[‘ : #遇到” 或者[
start = i + 1
skip = True
elif skip and c in ‘”]’: #遇到 ” 或者]
skip = False
yield line[start:i]
start = i + 1
continue
if skip:
continue
if c in chars:
if start == i:
start = i + 1
continue
yield line[start:i]
start = i + 1
else:
if start < len(line):
yield line[start:]
# print(list(makekey(line)))
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))
{‘remote’: ‘183.60.212.153’, ”: ‘-‘, ‘dateime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘request’: {‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘protocol’: ‘HTTP/1.1’}, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}
names = (‘remote’,”,”,’dateime’,’request’,’status’,’size’,”,’useragent’)
ops = (None,None,None,
lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
int,int,None,None
)
def extract(line:str):
return dict(map(lambda item:(item[0],item[2](item[1])if item[2] is not None else item[1]),zip(names,makekey(line),ops)))
print(extract(line))
6、正则表达式提取:
1)构造一个正则表达式提取需要的字段,
pattern = ”'([\d.]{7,}) – – \[([/\w +:]+)\] “(\w+) (\S+) ([\w/\d.]+)” (\d+)(\d+).+”(.+)” ”’
2)进一步改造pattern分组,ops和名词对象,不需要names了。
pattern = ”'(?P<remote>[\d.]{7,}) – – \[(?P<datetime>[/\w +:]+)\] “(?P<method>\w+) (?P<url>\S+) (?P<procotol>[\w/\d.]+)” (?P<status>\d+)(?P<size>\d+).+”(?P<useragent>.+)” ”’
命名分组:
ops = (
‘dateime’:lambda timestr:datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
‘request’:lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’:int,
‘size’:int
)
3)完整代码:
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
# mathcer = re.match(pattern,line)
# if mathcer:
# print(mathcer.groupdict())
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
{‘remote’: ‘183.60.212.153’, ‘datetime’: datetime.datetime(2013, 2, 19, 10, 23, 29, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), ‘method’: ‘GET’, ‘url’: ‘/o2o/media.html?menu=3’, ‘procotol’: ‘HTTP/1.1’, ‘status’: 200, ‘size’: 16691, ‘useragent’: ‘Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)’}
7、异常处理:
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
# mathcer = re.match(pattern,line)
# if mathcer:
# print(mathcer.groupdict())
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
日志中出现一些不匹配的行,需要处理。
regex.match()可能匹配不上,所以增加一个判断,采用抛出异常等形式。或者返回一个特殊值得方式,告知调用者没有匹配。
8、异常处理:
1) 数据载入
对于本项目,数据就是日志的一行行记录,载入数据就是文件IO的读取,将或者数据的方法封装成函数。
import datetime
import re
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,’%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
regex = re.compile(pattern)
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def load(path):
“””装载日志文件”””
with open(path)as f:
for line in f:
filds = extract(line)
if fields:
yield fields
else:
continue
9、时间窗口分析:
1) 概念
许多数据,例如日志,都是和时间相关的,都是按照时间顺序产生的。
产生的数据分析的时候,要按照时间求值。
Interval表示每一次求值的时间间隔。
Width时间窗口的宽度,指的是一次求值的时间窗口宽度。
2) 当width > interval
数据求值的时候会有重叠
3) 当width = interval
4) 当width < interval
一般不采纳这种方案,会有数据缺失。
5) 时序数据
运维环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间的先后产生并记录下来数据,所以一般按照时间对数据进行分析。
6) 数据分析基本程序结构
无限的生成随机函数,产生时间相关的数据,返回时间和随机数的字典。
每次取3个数据,求平均值。
import random
import datetime
import time
def source():
while True:
yield {‘value’:random.randint(1,100),’datetime’:datetime.datetime.now()}
time.sleep(1)
#获取数据
s = source()
items = [next(s)for _ in range(3)]
#处理函数
def handler(iterable):
return sum(map(lambda item:item[‘value’],iterable)) / len(iterable)
print(items)
print(“{:.2f}”.format(handler(items)))
[{‘value’: 87, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 29, 556430)}, {‘value’: 32, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 30, 557127)}, {‘value’: 2, ‘datetime’: datetime.datetime(2018, 5, 3, 19, 33, 31, 557792)}]
40.33
上面代码模拟一段时间内产生了数据,等一段固定时间取数据来计算平均值。
7) 窗口函数实现
将上面的获取数据的程序拓展为window函数,使用重叠的方案。
import random
import datetime
import time
def source(second=1):
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #获取数据
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))
def window(iterator,handler,width:int,interval:int):
“””
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒。
:param interval: 处理时间间隔,秒
:return:
“””
start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
window(source(),handler,10,5)
时间计算:
10、分发:
1)生产者消费者模型
对于一个监控系统,需要处理很多数据,包括日志,对其中已有的数据采集、分析。被监控对象就是数据的生产者producer,数据的处理程序就是数据的消费者consumer。
生产者消费者传统模型。
传统的生产者消费者模式,生产者生产,消费者消费,这种模型存在问题,开发代码的耦合性太高,如果生成规模扩大,不易扩展,生产和消费的速度很难匹配等。
解决的办法就是——队列queue
作用——解耦、缓冲。
生产者往往会部署好几个程序,日志也会产生好多,而消费者也会有多个程序,去提取日志分析处理。
数据的生产是不稳定的,会造成短时间数据的”潮涌”,需要缓冲。
消费者的消费能力不一样,有快有慢,消费者可以自己决定消费缓冲区中的数据。
单机可以使用queue内建的模块构件进程内的队列,满足多个线程之间的消费需要。
大型系统可以使用第三方消息中间件:RabbitMQ RocketMQ Kafka.
2)queue模块–队列
queue模块提供了一个先进先出的队列Queue。
Queue.Queue(maxsize=0)
创建FIFO队列,返回Queue对象。
maxsixe小于等于0,队列长度没有限制。
Queue.get(block=True,timeout=None)
从队列中移除元素并返回这个元素。
Block为阻塞。Timeout为超时。
如果block为True,是阻塞,timeout为None就是一直阻塞。
如果block为True,是阻塞,timeout有值的话就会阻塞到一定秒数抛出异常。
Block为False,是非阻塞,timeout就被忽略,要么成功返回一个元素,严么抛出empty异常。
Queue.get_nowait()
等价于get(False),也就是说要么成功返回一个元素,要么抛出异常。
但是queue的这种阻塞效果,需要多线程的时候演示。
Queue.put(item,block=True,timeout=None)
把一个元素加入到队列中去。
Block=True,timeout=None,一直阻塞至有空位置防元素。
Block=True,timeout=5,阻塞5秒就抛出full异常。
Block=True,timeout实效,立即返回,能塞进去就塞,不能塞就返回抛出异常。
Queue.put_nowait(item)
等价于put(item,False),也就是能塞进去就塞,不能就抛出full异常。
#Queue测试。
from queue import Queue
import random
q = Queue()
q.put(random.randint(1,100))
q.put(random.randint(1,100))
print(q.get())
print(q.get())
#print(q.get())
print(q.get(timeout=3))
第一个print : 68
第二个print:15
第三个print :阻塞
第四个print:超过timeout报错,empty。
11、分发器实现:
生产者(数据源)生产数据,缓冲到消息队列中。
数据处理流程:
数据加载-》 提取-》 分析(滑动窗口函数)
处理大量数据的时候,对于一个数据源来说,需要多个消费者处理,但是如果分配就是个问题了。
需要一个分发器(调度器),把数据分发给不同的消费者处理。
每一个消费者拿到数据后,有自己的处理函数,要有注册机制。
数据加载——》提取——》分发——》 分析函数1
| —–》 分析函数2
分析1 和分析2是不同的handler,不同的窗口宽度,间隔时间。
如何分发?
轮询策略。
一对多的副本发送,一个数据通过分发器,发送到n个消费者。
消息队列
在生产者和消费者之间使用消费队列,那么所有消费者公用一个消息队列,还是各自拥有一个队列呢?
共用一个队列也是可以的,但是需要解决争抢的问题,相对来说每个消费者自己拥有一个队列,何为容易。
如何注册;
在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。
线程。
由于一个数据会被多个不同的注册过的handler处理,最好的方式就是线程。
线程使用举例。
线程使用举例:
import threading
#定义线程
# target线程中运行的函数,args这个啊哈双女户运行时候需要的实参元组。
t = threading.Thread(target=window,args=(src,handler,width,interval))
#启动线程
t.start()
12、分发器实现代码:
import random
import datetime
import time
import threading
from queue import Queue
def source(second=1):
“””
生成数据
:param second:
:return:
“””
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
# #获取数据
# s = source()
# items = [next(s)for _ in range(3)]
# print(items)
# print(“{:.2f}”.format(handler(items)))
def window(iterator,handler,width:int,interval:int):
“””
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒。
:param interval: 处理时间间隔,秒
:return:
“””
start = datetime.datetime.strptime(‘20170101 000000 +0800′,’%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800′,’%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
window(source(),handler,10,5)
def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
注册窗口处理函数
:param handler:注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
:return:
“””
q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() #启动线程处理数据
for item in src: #将数据源提取到的数据分发到所有队列中。
for q in queues:
q.put(item)
return reg,run
reg,run = dispatcher(source())
reg(handler,10,5) #注册
run() #运行
13、整合代码
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) #编译
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def load(path):
“””装载日志文件”””
with open(path)as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
#数据处理
def source(second=1):
“””
生成数据
:param second:
:return:
“””
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒。
:param interval: 处理时间间隔,秒
:return:
“””
start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数
#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
注册窗口处理函数
:param handler:注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
:return:
“””
q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() #启动线程处理数据
for item in src: #将数据源提取到的数据分发到所有队列中。
for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
import sys
path = ‘test.log’
reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行
reg(handler,10,5) #注册
run() #运行
14、完成分析功能
分析日志很重要,通过海量数据分析就能够知道是否遭受了攻击,是否被爬取及爬取高峰期,是否有盗链等。
百度(baidu)爬虫名称(baiduspider)
谷歌(goole)爬虫名称(Googlebot)
15、状态码分析
状态码分析:
304 服务器收到客户端提交的请求参数,发现资源未变化,要求浏览器使用静态资源的缓存。
404 服务器找不到请求的资源。
304占比大,说明静态缓存效果明显,404占比大,说明网站出现了问题。或者尝试嗅探资源。
如果400、500占比突然开始增大,网站一定出问题了。
def status_hanler(iterable):
#时间窗口内的一批函数
status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1
total = len(iterable)
return {k:status[k]/total for k,v in status.items()}
16、日志文件的加载
目前实现的代码中,只能接受一个路径,修改为一批路径。
可以约定一下路径下文件的存放方式:
如果送来的是一批路径,就迭代其中的路径。
如果路径是一个普通文件,就按照行读取文件。
如果路径是一个目录,就遍历路径下所有普通文件,每一个文件按照行处理,不递归处理子目录:
from pathlib import Path
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
elif p.is_file():
yield from openfile(str(p))
17、完整代码
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) #编译
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
elif p.is_file():
yield from openfile(str(p))
#数据处理
def source(second=1):
“””
生成数据
:param second:
:return:
“””
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒。
:param interval: 处理时间间隔,秒
:return:
“””
start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数
#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
注册窗口处理函数
:param handler:注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
:return:
“””
q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() #启动线程处理数据
for item in src: #将数据源提取到的数据分发到所有队列中。
for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
import sys
path = ‘test.log’
reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行
reg(handler,10,5) #注册
run() #运行
可以指定文件或目录,对日志进行数据分析。
分析函数可以动态注册
数据可以分发给不同的分析处理程序处理。
18、浏览器分析
1)Useragent
这里指的是,软件按照一定的格式想远端的服务器提供一个标识自己的字符串。
在HTTP协议中,使用user-agent字段传送这个字符串。
2)信息提取
Pyyaml uaparser user-agent模块
安装 pip install Pyyaml uaparser user-agent
使用
from user_agents import parse
useragents = [
“Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)”\
“Dalvik/2.1.0 (Linux; U; Android 5.1.1; SM-G9250 Build/LMY47X)”\
“Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0”
]
for uastring in useragents:
ua = parse(uastring)
print(ua.brower,ua.brower.family,ua.brower.version,ua.brower.version_string)
#运行结构
数据分析代码:
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)
}
增加浏览器分析函数:
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda useragent: parse(useragent)
}
from user_agents import parse
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
‘request’: lambda request:dict(zip([‘method’,‘url’,‘protocol’],request.split())),
‘status’: int,
‘size’: int,
‘useragent’:lambda ua: parse(ua)
}
#浏览器分析
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
return browers
统计所有浏览器:
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1
print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers
19、完整版代码(最终版)
import random
import datetime
import time
import threading
from queue import Queue
line = ”’183.60.212.153 – – [19/Feb/2013:10:23:29 +0800] \
“GET /o2o/media.html?menu=3 HTTP/1.1” 200 16691 “-” \
“Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)””’
regex = re.compile(pattern) #编译
ops = {
‘datetime’: lambda timestr : datetime.datetime.strptime(timestr,‘%d/%b/%Y:%H:%M:%S %z’),
# ‘request’: lambda request:dict(zip([‘method’,’url’,’protocol’],request.split())),
‘status’: int,
‘size’: int
}
pattern = ”'(?P<remote>[\d.]{7,})\s-\s-\s\[(?P<datetime>[^\[\]]+)\]\s”(?P<method>\w+)\s(?P<url>\S+)\s(?P<procotol>[\w/\d.]+)”\s(?P<status>\d+)\s(?P<size>\d+).+\s”(?P<useragent>.+)””’
def extract(line:str):
matcher = regex.match(line)
if matcher:
return {k:ops.get(k,lambda x:x)(v)for k,v in matcher.groupdict().items()}
print(extract(line))
def openfile(path:str):
with open(str(p))as f:
for line in f:
fields = extract(line)
if fields:
yield fields
else:
continue
def load(*paths):
for item in paths:
p = Path(item)
if not p.exists():
continue
if p.is_dir():
for file in p.iterdir():
if file.is_file():
pass
elif p.is_file():
yield from openfile(str(p))
#数据处理
def source(second=1):
“””
生成数据
:param second:
:return:
“””
while True:
yield {
‘datetime’:datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),
‘value’: random.randint(1, 100)
}
time.sleep(second)
#滑动窗口函数
def window(iterator,handler,width:int,interval:int):
“””
窗口函数
:param iterator: 数据源,生成器,用来拿数据
:param handler: 数据处理函数
:param width: 时间窗口宽度,秒。
:param interval: 处理时间间隔,秒
:return:
“””
start = datetime.datetime.strptime(‘20170101 000000 +0800’,‘%Y%m%d %H%M%S %z’)
current = datetime.datetime.strptime(‘20170101 010000 +0800’,‘%Y%m%d %H%M%S %z’)
buffer = []#窗口中的待计算数据
delta = datetime.timedelta(seconds=width-interval)
while True:
#从数据源获取数据
data = next(iterator)
if data:
buffer.append(data)
current = data[‘datetime’]
#每隔interval计算buffer中的数据一次。
if (current – start).total_seconds() >= interval:
ret = handler(buffer)
print(‘{:.2f}’.format(ret))
start = current
#清除超出width数据
buffer = [x for x in buffer if x[‘datetime’]>current – delta]
# 处理函数
#随机数平均数测试函数
def handler(iterable):
return sum(map(lambda item: item[‘value’], iterable)) / len(iterable)
def donothing_handler(iterable):
return iterable
def status_hanler(iterable):
#时间窗口内的一批函数
status = {}
for item in iterable:
key = item[‘status’]
status[key] = status.get(key,0)+1
total = len(iterable)
return {k:status[k]/total for k,v in status.items()}
#浏览器分析
allbrowers = {}
def browser_handler(iterable):
browers = {}
for item in iterable:
us = item[‘useragent’]
key = (ua.brower.family,ua.brower.version_string)
browers[key] = browers.get(key,0)+1
allbrowers[key] = allbrowers.get(key,0)+1
print(sorted(allbrowers.items(),key =lambda x:x[1],reversed=True))[:10]
return browers
def dispatcher(src):
#分发器中记录handler,同时保存各自的队列
handlers = []
queues = []
def reg(handler,width:int,interval:int):
“””
注册窗口处理函数
:param handler:注册的数据处理函数
:param width: 时间窗口宽度
:param interval: 时间间隔
:return:
“””
q = Queue()
queues.append(q)
h = threading.Thread(target=window,args=(q,handler,width,interval))
handlers.append(h)
def run():
for t in handlers:
t.start() #启动线程处理数据
for item in src: #将数据源提取到的数据分发到所有队列中。
for q in queues:
q.put(item)
return reg,run
# reg,run = dispatcher(source())
if __name__ == “__main__”
import sys
path = ‘test.log’
reg,run = dispatcher(load(path))
reg(donothing_handler,10,5)
run() #运行
reg(handler,10,5) #注册
run() #运行
本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97632