日志分析项目

日志分析

123456

# 日志分析完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

# 数据源
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\
(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

regex = re.compile(PATTERN)  # 编译
from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') ,
    'status': int ,
    'size': int ,
    'useragent': lambda ua: parse(ua)
}


def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()}


# 装载文件
def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或记录日志


def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
                elif p.is_file():
                    yield from openfile(str(p))


# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) ,
            'value': random.randint(1 , 100)
        }
        time.sleep(second)


# 滑动窗口函数
def window(src: Queue , handler , width: int , interval: int):
    """
    窗口函数

    :param src: 数据源,缓存队列,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒
    """

    start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z')
    buffer = []  # 窗口中的待计算数据
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        # 每隔interval计算buffer中的数据一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            # 清除超出width的数据
            buffer = [x for x in buffer if x['datetime'] > current - delta]


# 随机数平均数测试函数
def handler(iterable):
    return sum(map(lambda x: x['value'] , iterable)) / len(iterable)


# 测试函数
def donothing_handler(iterable):
    return iterable


# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key , 0) + 1
    # total = sum(status.values())
    total = len(iterable)
    return {k: status[k] / total for k , v in status.items()}


allbrowsers = {}


# 浏览器分析
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family , ua.browser.version_string)
        browsers[key] = browsers.get(key , 0) + 1
        allbrowsers[key] = allbrowsers.get(key , 0) + 1

    print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10])
    return browsers


# 分发器
def dispatcher(src):
    # 分发器中记录handler, 同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler , width: int , interval: int):
        """
        注册窗口处理函数

        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window , args=(q , handler , width , interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()  # 启动线程处理数据

        for item in src:  # 数据源取到的数据分发到所有队列中
            for q in queues:
                q.put(item)

    return reg , run


if __name__ == "__main__":
    import sys

    # path = sys.argv[1]
    path = 'test.log'

    reg , run = dispatcher(load(path))
    reg(status_handler , 10 , 5)  # 注册
    reg(browser_handler , 5 , 5)
    run()  # 运行

本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97752

(2)
JacoJaco
上一篇 2018-05-07
下一篇 2018-05-07

相关推荐

  • 递归函数

    递归函数 def foo(b,b1=3):print(“foo1 called “,b,b1)def foo2(c):foo3(c)print(“foo2 called”,c)def foo3(d):print(“foo3 called”)def mian():print(“…

    2018-04-16
  • 封装与解构 集合

    封装和解构 封装:将多个值进行分割,结合在一起,本质上返回元组,只是省掉了小括号 ‘==‘意思为内容一致,‘=’意思为内存空间一致 解构:把线性结构的元素解开,并顺序的赋值给其他变量,左边接纳的变量数要和左边解开的元素数量一致 集合不是非线性 解构中使用*变量名接收,但不能单独使用,被*变量名收集后组成一个列表 第一个下划线为9,结果被第二个下划线重新赋值为…

    Python笔记 2018-04-01
  • 函数

    函数 数学定义:y=f(x),y是x的函数,x是自变量 Python函数 有若干个语句块,函数名称,参数列表构成,它是组织代码的最小单元 完成一定作用 函数的作用 结构化编程对代码的最基本的封装,一般按照功能组织一段代码 封装的目的为了复用,减少了冗余代码 代码更加简洁美观,更加易读 函数的分类 内建函数,如max(),reversed()等 库函数,如ma…

    2018-04-16
  • python学习第七周个人总结

    LEGB、递归函数、匿名函数、生成器函数、生成器应用、高阶函数、柯里化、装饰器、类型注解、functools.个人总结,加深印象。

    2018-04-22
  • Python 部分知识点总结(二)

    此篇博客只是记录第四周未掌握或不熟悉的知识点,用来加深印象。

    Python笔记 2018-03-30