大数据日志分析统计

815℃

日志概述

日志来源于第三方CDK厂商,主要内容为推流和拉流的日志,包括rtmp推拉流、flv拉流、hls拉流,flv和hls拉流的协议为http,rtmp推拉流的协议为rtmp,大概的格式如下:

01

分析指标为在线人数、总时长、总流量(客户端到服务器的流量和服务器到客户端到流量之和)。需要用要到的数据也在上图中标红显示,初一乍看,distinct和sum就可以搞定,但实际上上述流日志在纵向方面显示较为一致,在横向方面上反差较大。本次统计的粒度要求精确到频道及分钟,频道在日志数据中不能直接获取。RTMP数据在一个会话中(具备同一个会话ID)大概有10来条,但是只有2条数据能间接得到;FLV数据一条数据代表一个会话,能较为轻松的获取到频道;HLS数据是客户端主动向服务器先获取一条Meta信息,比如有多少条data信息数据可以获取,然后不停的获取data数据。一个会话可能产生数以万计的日志记录。

日志分析特点:数据量大、数据内容格式不一、延时性(只能获取隔天的数据)、统计的粒度较细(后续可能5分钟、1小时、日、月等)。一天的数据量4000万左右,完全不均衡分布在大概250个域名上,1个月左右数据量翻倍(取决新增用户及转化率);在单位分钟内进行统计,向后扩展兼容其他粒度。

 

分析历程

阶段一:数据模型分析

02

从CDN厂商收集日志,解析,存储到MySQL,发起分析统计任务。该阶段为需求调研,到建立数据模型的阶段,并不涉及到数据量大小。

阶段二:数据清洗、MongoDB存储及MapReduce

03

在模型分析阶段,随着数据量的增大和对分析指标的聚合操作,MySQL存储在检索、聚合、及后续的再度聚合,已经满足不了系统需求,更换存储已经势在必行。MongoDB因其文档方式存储、API简易(JSON)、高性能、集群Sharding方便、内置MapReduce等功能,成为本系统的替代数据库,网易在MongoDB方面也具备较为资深的实践经验。

通过对存储数据的反复分析对比发现,Uri、Url相关的数据未产生任何价值,极大的增加了存储容量,故此对原始数据进行清洗,只保留客户端IP、日期、时间、时长、流量等对结果产生影响的字段,并且记录不符合原始日志分析规则的数据,以便后续进行数据订正。根据不同的协议规则(Rtmp除外),设置其频道ID,方便后续在频道的基础上进行分析。Rtmp数据保存在单独的表中,根据同一会话,同一频道,对rtmp数据进行订正。此时,日志数据都已经被矫正到同一平面,编写相关维度的map、reduce、finalize函数进行MapReduce计算。作业调度器(schduler)对任务执行单元(worker)进行协调管理,根据任务链判断是否派发下一任务,及任务状态的变更。

 

总体架构

即紧接上文的阶段三,在阶段二的基础上,针对系统的高可用性、分布式、容错等维度考虑,采用无中心化方案设计,依赖于RabbitMQ进行水平扩展,增加了对外接口层及任务的手动发布处理。

04

1、collector

日志数据采集器(定时),从CDN方获取各域名对应的数据包下载地址,把具备同样后缀的域名归类为一个整体,建立一个任务(Job)。根据下载地址下载日志包(原始数据包经过gzip压缩),该过程中为防止并发访问过高,遭受CDN方请求拒绝,进行熔断处理,即在指定的时间内,抑制访问次数。

获得数据包后,调用linux系统命令gzip命令进行解压;顺序读取日志文件,若单个日志文件较大,则进行分包处理,带上包头号;对该批次的数据进行gzip压缩,扔到Rabbitmq中。以上建立Job,一个Job最多对应4个域名,分别建立sub job。每次的下载,解压缩,数据发送,分别记录状态(起始时间、耗时、失败/成功),方便后续的失败任务检测和性能分析。

2、worker

05

任务执行单元。从rabbitmq处订阅获取日志包数据,如果为HTTP协议(上文中的hls、flv),则先对数据进行清洗,再根据频道维度进行java方式的map reduce计算,ip集合数据结构使用hashset进行ip去重,最后把结果存于mongodb中;若是RTMP协议则对数据进行更新(设置频道),运用mongodb对数据进行聚合,产生的数据格式与HTTP日志相同,方便后续的频道分天聚合运算。在聚合运算中涉及到按分钟粒度进行分析,这里运用了一个小技巧,把日期date和time时间合并增加一个字段ymdhm,该字段表示分,该字段数据极度离散,大大提高了map reduce的运行效率。

MongoDB中的MapReduce相当于关系数据库中的group by。使用MapReduce要实现两个函数Map和Reduce函数。Map函数调用emit(key,value),遍历Collection中所有的记录,将key与value传递给Reduce函数进行处理。本文中使用javascript代码编写map和reduce函数,主要处理了ip集合去重和流量的累加。

3、scheduler

worker执行调度器,负责对各个子任务的状态记录,判断子任务是否作业完成,如果执行完成,从作业链中获取下一子任务的队列,扔到rabbitmq中供worker消费。此外还包括失败任务检测,对当天的任务列表进行轮询判断任务是否执行完成,若是有任务执行失败,则从分析表中攫取数据扔到临时集合中,重新进行聚合分析,最后将结果归档。

4、mongodb

MongoDB的版本为3.0.1,对应的java客户端为mongo-java-driver3.0.1。高可用性方面使用官方推荐的主从复制和高可用方案Replicat Set,Replicat Set具有自动切换功能,当Primary挂掉之后,可以自动由Replica Set中的某一个Secondary来切换到Primary,以实现高可用的目的。比如配置了一个由3台服务器组成的mongo集群,1个primary和2个secondry,客户端在连接的时候需要把全部的IP都写上,并且设置读操作从副本集读取,达到主从分离,减少主节点的访问压力。

06

5、rabbitmq

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕,RabbitMQ才可以删除它。如果一个消费者宕机没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,依赖与RabbitMQ的所有节点都不会丢失消息,保证了整个分析过程的完整性。如果没有任何消息超时限制,那么只有当消费者宕机时,RabbitMQ才会重新投递,即便处理一条消息会花费很长的时间。

Mirror queue是RabbitMQ高可用的一种,queue中的消息每个节点都会存在一份copy,这个在单个节点失效的情况下,整个集群仍旧可以提供服务。但是由于数据需要在多个节点复制,在增加可用性的同时,系统的吞吐量会有所下降。在实现机制上,mirror queue内部实现了一套选举算法,有一个master和多个slave,queue中的消息以master为主。对于publish,可以选择任意一个节点进行连接,rabbitmq内部若该节点不是master,则转发给master,master向其他slave节点发送该消息,后进行消息本地化处理,并组播复制消息到其他节点存储;对于consumer,可以选择任意一个节点进行连接,消费的请求会转发给master,为保证消息的可靠性,consumer需要进行ack确认,master收到ack后,才会删除消息,ack消息会同步(默认异步)到其他各个节点,进行slave节点删除消息。若master节点失效,则mirror queue会自动选举出一个节点(slave中消息队列最长者)作为master,作为消息消费的基准参考;若slave节点失效,mirror queue集群中其他节点的状态无需改变。

Mirror queue使用较为简单,先把当前节点加入之前已经启动的RabbitMQ节点,再设置HA的策略,如下图为镜像节点的启动脚本:

07

镜像节点设置成功之后,可以看到整个集群状态。

08

 

运行状况

部署到线上一共使用了7台服务器,其中2台云主机,每一台云主机都是4核8G,部署schduler、collector和rabbitmq;3台云主机,每一台也是4核8G,部署mongodb集群Replica Set;2台云主机,每台8核32G,部署16个worker实例,4000万的数据,20分钟完成分析。