# 日志分析完整代码 import random import datetime import time from queue import Queue import threading import re from pathlib import Path # 数据源 PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\ "(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\ (?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"''' regex = re.compile(PATTERN) # 编译 from user_agents import parse ops = { 'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') , 'status': int , 'size': int , 'useragent': lambda ua: parse(ua) } def extract(line: str) -> dict: matcher = regex.match(line) if matcher: return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()} # 装载文件 def openfile(path: str): with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue # TODO 解析失败则抛弃或记录日志 def load(*paths): for item in paths: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): yield from openfile(str(file)) elif p.is_file(): yield from openfile(str(p)) # 数据处理 def source(second=1): """生成数据""" while True: yield { 'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) , 'value': random.randint(1 , 100) } time.sleep(second) # 滑动窗口函数 def window(src: Queue , handler , width: int , interval: int): """ 窗口函数 :param src: 数据源,缓存队列,用来拿数据 :param handler: 数据处理函数 :param width: 时间窗口宽度,秒 :param interval: 处理时间间隔,秒 """ start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z') current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z') buffer = [] # 窗口中的待计算数据 delta = datetime.timedelta(seconds=width - interval) while True: # 从数据源获取数据 data = src.get() if data: buffer.append(data) # 存入临时缓冲等待计算 current = data['datetime'] # 每隔interval计算buffer中的数据一次 if (current - start).total_seconds() >= interval: ret = handler(buffer) print('{}'.format(ret)) start = current # 清除超出width的数据 buffer = [x for x in buffer if x['datetime'] > current - delta] # 随机数平均数测试函数 def handler(iterable): return sum(map(lambda x: x['value'] , iterable)) / len(iterable) # 测试函数 def donothing_handler(iterable): return iterable # 状态码占比 def status_handler(iterable): # 时间窗口内的一批数据 status = {} for item in iterable: key = item['status'] status[key] = status.get(key , 0) + 1 # total = sum(status.values()) total = len(iterable) return {k: status[k] / total for k , v in status.items()} allbrowsers = {} # 浏览器分析 def browser_handler(iterable): browsers = {} for item in iterable: ua = item['useragent'] key = (ua.browser.family , ua.browser.version_string) browsers[key] = browsers.get(key , 0) + 1 allbrowsers[key] = allbrowsers.get(key , 0) + 1 print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10]) return browsers # 分发器 def dispatcher(src): # 分发器中记录handler, 同时保存各自的队列 handlers = [] queues = [] def reg(handler , width: int , interval: int): """ 注册窗口处理函数 :param handler: 注册的数据处理函数 :param width: 时间窗口宽度 :param interval: 时间间隔 """ q = Queue() queues.append(q) h = threading.Thread(target=window , args=(q , handler , width , interval)) handlers.append(h) def run(): for t in handlers: t.start() # 启动线程处理数据 for item in src: # 数据源取到的数据分发到所有队列中 for q in queues: q.put(item) return reg , run if __name__ == "__main__": import sys # path = sys.argv[1] path = 'test.log' reg , run = dispatcher(load(path)) reg(status_handler , 10 , 5) # 注册 reg(browser_handler , 5 , 5) run() # 运行
本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97752