滑动窗口

数据载入

def load(path:str):
    with open(path) as f:
        for line in f:
            tmp = extract(line)
            if tmp:
                yield tmp
            else:
                # TODO 解析失败就抛弃,或者打印日志
                continue

时间窗口分析

概念

  • 很多数据,例如日志,都和时间相关的,都是按照时间顺序产生的。
  • 产生的数据分析的时候,要按照时间求值
  • interval 表示每一次求值的时间间隔
  • width 时间窗口宽度,指的一次求值的时间窗口宽度

当width > interval

59fc4cf3f13bb34ae2000000

  • 数据求值是会有重叠

当width = interval

59fc4d26f13bb34ae2000001

  • 数据求值没有重叠

当width < interval

  • 一般不采纳,因为这样会有数据流失

时序数据

  • 运行环境中,日志、监控等产生的数据都是与时间相关的数据,按照时间先后产生并记录下来的数据,所以一般按照时间对数据进行分析

时序数据分析的节本程序结构

  • 随机生成几个数,产生时间相关的数据,返回 时间 + 随机数
  • 每次取三个值,求平均值
    import random
    import datetime
    import time
    
    def f():
        while True:
            yield {'value':random.randrange(100), 'time':datetime.datetime.now()}
            time.sleep(1)
    
    src = f()
    items = [next(src) for _ in range(3)]
    
    def handler(iterable):
        vals = [x['value'] for x in iterable]
        return sum(vals) / len(vals)
    
    print(items)
    print(handler(items))
    

59fc5ab3f13bb34ae2000002


窗口函数实现

import random
import datetime
import time

# 数据源函数
def f():
    while True:
        yield {'value':random.randrange(100), 'time':datetime.datetime.now()}
        time.sleep(5)

def window(src, handler, width:int, interval:int):
    """
    窗口函数
    :param src: 数据源,生成器,用来拿数据
    :param handler: 数据处理函数
    :param width: 时间窗口宽度,秒
    :param interval: 处理时间间隔,秒
    """

    # 初始两个时间段
    start = datetime.datetime.strptime('20170101 00:00:00', '%Y%m%d %H:%M:%S')
    current = datetime.datetime.strptime('20170101 00:01:00', '%Y%m%d %H:%M:%S')

    buffer = [] # 窗口中待计算的数据
    delta = datetime.timedelta(seconds = width - interval)

    while True:
        # 从数据源获取数据
        data = next(src)

        # 存入临时缓冲等待计算
        if data: # 筛掉不符合的数据
            buffer.append(data)
            current = data['time']

        # 进入循环开始操作
        if (current - start).total_seconds() >= interval:
            ret = handler(buffer)
            print('{:.2f}'.format(ret))
            start = current

            # 处理重叠的数据
            buffer = [x for x in buffer if x['time'] > current - delta]


def handler(iterable):
    vals = [x['value'] for x in iterable]
    return sum(vals) / len(vals)
  • 第41行current – delta是因为现在的current还没有更新,而current的时间值到当前current时间值之间的数据正好是重叠的数据
  • widthinterval给一样的时候,那么delta为0,所以不会有重复数据

59fd5b032bd5a743d1000004
59fd5b2f2bd5a743d1000005

  • 相当于用给定的width往后滑动,一下走这么多interval
  • 比如这个,是时间宽为4往下走,两个两个的往后走,所以每次会有两个重复的数据

本文来自投稿,不代表Linux运维部落立场,如若转载,请注明出处:http://www.178linux.com/88218

(1)
nolannolan
上一篇 2017-11-04
下一篇 2017-11-05

相关推荐