日志分析项目

日志分析

123456

# 日志分析完整代码

import random
import datetime
import time
from queue import Queue
import threading
import re
from pathlib import Path

# 数据源
PATTERN = '''(?P<remote>[\d\.]{7,})\s-\s-\s-[(?P<datetime>[^\[\]]+)]\s\
"(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s\
(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"'''

regex = re.compile(PATTERN)  # 编译
from user_agents import parse

ops = {
    'datetime': lambda datestr: datetime.datetime.strptime(datestr , '%d/%b/%Y:%H:%M:%S %z') ,
    'status': int ,
    'size': int ,
    'useragent': lambda ua: parse(ua)
}


def extract(line: str) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {name: ops.get(name , lambda x: x) for name , data in matcher.groupdict().items()}


# 装载文件
def openfile(path: str):
    with open(path) as f:
        for line in f:
            fields = extract(line)
            if fields:
                yield fields
            else:
                continue  # TODO 解析失败则抛弃或记录日志


def load(*paths):
    for item in paths:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():
                    yield from openfile(str(file))
                elif p.is_file():
                    yield from openfile(str(p))


# 数据处理
def source(second=1):
    """生成数据"""
    while True:
        yield {
            'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))) ,
            'value': random.randint(1 , 100)
        }
        time.sleep(second)


# 滑动窗口函数
def window(src: Queue , handler , width: int , interval: int):
    """
    窗口函数

    :param src: 数据源,缓存队列,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒
    """

    start = datetime.datetime.strptime('20170101 000000 +0800' , '%Y%m%d %H%M%S %z')
    current = datetime.datetime.strptime('20170101 010000 +0800' , '%Y%m%d %H%M%S %z')
    buffer = []  # 窗口中的待计算数据
    delta = datetime.timedelta(seconds=width - interval)

    while True:
        # 从数据源获取数据
        data = src.get()
        if data:
            buffer.append(data)  # 存入临时缓冲等待计算
            current = data['datetime']

        # 每隔interval计算buffer中的数据一次
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{}'.format(ret))
            start = current

            # 清除超出width的数据
            buffer = [x for x in buffer if x['datetime'] > current - delta]


# 随机数平均数测试函数
def handler(iterable):
    return sum(map(lambda x: x['value'] , iterable)) / len(iterable)


# 测试函数
def donothing_handler(iterable):
    return iterable


# 状态码占比
def status_handler(iterable):
    # 时间窗口内的一批数据
    status = {}
    for item in iterable:
        key = item['status']
        status[key] = status.get(key , 0) + 1
    # total = sum(status.values())
    total = len(iterable)
    return {k: status[k] / total for k , v in status.items()}


allbrowsers = {}


# 浏览器分析
def browser_handler(iterable):
    browsers = {}
    for item in iterable:
        ua = item['useragent']

        key = (ua.browser.family , ua.browser.version_string)
        browsers[key] = browsers.get(key , 0) + 1
        allbrowsers[key] = allbrowsers.get(key , 0) + 1

    print(sorted(allbrowsers.items() , key=lambda x: x[1] , reverse=True)[:10])
    return browsers


# 分发器
def dispatcher(src):
    # 分发器中记录handler, 同时保存各自的队列
    handlers = []
    queues = []

    def reg(handler , width: int , interval: int):
        """
        注册窗口处理函数

        :param handler: 注册的数据处理函数
        :param width: 时间窗口宽度
        :param interval: 时间间隔
        """
        q = Queue()
        queues.append(q)

        h = threading.Thread(target=window , args=(q , handler , width , interval))
        handlers.append(h)

    def run():
        for t in handlers:
            t.start()  # 启动线程处理数据

        for item in src:  # 数据源取到的数据分发到所有队列中
            for q in queues:
                q.put(item)

    return reg , run


if __name__ == "__main__":
    import sys

    # path = sys.argv[1]
    path = 'test.log'

    reg , run = dispatcher(load(path))
    reg(status_handler , 10 , 5)  # 注册
    reg(browser_handler , 5 , 5)
    run()  # 运行

本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/97752

(2)
JacoJaco
上一篇 2018-05-07
下一篇 2018-05-07

相关推荐

  • python– 简单的排序

    冒泡排序, 简单选择排序, 插入排序

    Python笔记 2018-04-15
  • ss

    sas

    Python笔记 2018-05-10
  • Python函数返回值、作用域以及树的概念

    Python函数返回值、作用域以及树的概念

    Python笔记 2018-04-15
  • Python 部分知识点总结(七)

    此篇博客只是记录第九周未掌握或不熟悉的知识点,用来加深印象。

    Python笔记 2018-05-06
  • 函数与生成器

    函数 由若干语句组成的语句块,函数名,参数列表构成,是组织代码的最小单元,完成一定的功能,结构化编程对代码的最基本的封装,封装的目的是为了复用 函数中没有return,隐式会返回一个none值 定义中的参数列表成为形式参数,只是一种形式表达,简称形参 调用时写的参数是实际参数,是实实在在传入的值,简称实参 函数的定义,只是声明了参数,不会执行,可以进行调用,…

    2018-04-16
  • 文件IO常用操作

    io模块 StringIO BytesIO 类
    pathlib模块 路径操作 3.4版本开始
    os模块 更改权限、显示信息 3.4版本之前路径操作
    shuril 模块(高级文件操作
    csv 文件
    configparser模块 ini文件处理

    2018-05-02