你的位置:首页 > 操作系统

[操作系统]基础监控


  基础监控的同比告警主要是针对服务器监控采集的指标,包括负载(load1/load5/load15)、平均CPU使用率、内存使用率、内外网流量、端口数量等,具体采集方法可参考《基础监控-服务器监控》。

一、告警原理

  多个指标每分钟1个数据,比较当前分钟的前10分钟的7天平均值,如果幅度超过100%并且绝对值相差达到M则算是一次异常,包括上升/下降异常,如果一个指标持续两次上升异常或者持续两次下降异常(不包括先上升后下降或者先下降后上升情况)则开始告警给机器的对应负责人(运维/开发)。例如当前是10号11:00,则比较的是10:59,10:58...10:50共10分钟的最近7天的平均值,例如10:59则是10号、9、8...4共7天的10:59这个点的数据的平均值。实际情况一般是第二分钟才解析了前一分钟的数据,相当于当前是11点,则解析的是10:59这个点和它之前的10分钟的7天平均值的比较。

二、数据来源

  基础监控-服务器监控每分钟会采集一份数据保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},则每分钟一个redis hash,共7天 7*1440=10080个数据。实际情况保存的时候会多保留10分钟的数据,即7天+10分钟;由于reportTime是根据机器的实际时间来上报(这样画图才能保证是准确的),而某些机器没有NTP服务器或者其他原因导致时间不准,则reportTime则又会多种多样,所以导致的结果是redis的hash会变多一些,当然这并不影响我们的数据获取,因为整个同比告警就是根据画图来比较的,画图采用的是reportTime。采用hash保存到redis则不用每个ip读取一次redis,可以减少N次网络IO,大大提高程序速度。由于redis占据内存较多,大概10G左右,需要调整redis配置文件maxmemory的大小,不然redis会随机删除设置自动过期的key。

三、程序设计

1、DB设计

需要数据来源保存(redis)、异常展示表(mysql)、阈值配置表(mysql)、上次状态(redis)。异常展示表保存所有ip的异常描述信息、异常持续时间,可在页面展示;阈值配置表保存所有ip的阈值配置信息,每个ip的异常比率、各个指标的绝对值差、是否需要监控等;上次状态则用来判断是否需要告警的,持续2次同类异常则告警,使用redis保存即可。

mysql> show tables;+-----------------------------------+| Tables_in_machineMonitor_overLast |+-----------------------------------+| currentDisplay          || monitorConf            |+-----------------------------------+

2、导入测试数据

测试数据是使用mysql和redis将数据从mysql导入redis中,线上数据是等程序完成后修改"服务器监控"的上报CGI来导入的。测试导入的时候遇到的坑参考《python处理json和redis hash的坑》

def initRedis(client):  if client not in CONF.redisInfo:    raise RedisNotFound("can not found redis %s in config" % client)  try:    pool = redis.ConnectionPool(**CONF.redisInfo[client]) # 线程安全    red = redis.Redis(connection_pool=pool)    red.randomkey() # check  except Exception, e:    raise RedisException("connect redis %s failed: %s" % (client, str(e)))  return reddef initDb(client):  if client not in CONF.dbInfo:    raise MysqlNotFound("can not found mysql db %s in config" % client)  try:    db = opMysql.OP_MYSQL(**CONF.dbInfo[client])  except Exception, e:    raise MysqlException("connect mysql %s failed: %s" % (client, str(e)))  code, errMsg = db.connect()  if 0 != code:    raise MysqlException("connect mysql %s failed: %s" % (client, errMsg))  return db

3、告警接口

调用"告警平台"接口,项目配置后可发送RTX/SMS/Wechat,可在页面查看历史记录,轻松修改配置和临时屏蔽等。调用接口直接使用urllib/urllib2库即可

postData = urllib.urlencode(values)req = urllib2.Request(CONF.amcUrl, postData)response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT)amcDict = json.loads(response.read())code = int(amcDict["returnCode"])errMsg = amcDict["errorMsg"]

4、解析来源的数据

将数据转成可识别字典,并做排错处理,将错误数据拒绝掉

5、使用numpy和panda求平均值

将来源数据解析后存入panda.DataFrame中,如果数据不存在则使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值并保留2位数。如果某一分钟的7天数据全部获取不到则拒绝解析当前值,如果是7天数据的部分数据获取不到则剔除该点并在平均值的除数相对减一,使用NAN代替该点则在mean中可以解决这种情况。增加自定义函数比较每个列是否满足幅度上升/下降100%(可配置)并且绝对值差达到M(可配置),是则返回True,否则返回False,判断所有返回值是否均为True或者均为False,是则符合异常场景。

初始化DataFrame

for item in vd:  if value is None or value[item] is None:    vd[item][lastDayKey] = numpy.nan  else:    vd[item][lastDayKey] = value[item]      vf = pandas.DataFrame(vd)      columns.append(vf.mean().round(2)) indexes.append(lastMinKey)  self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)

panda自定义函数比较并且判断是否需要告警

for item in curValue:  if curValue[item] is None: # error    continue  else:    curValue[item] = round(curValue[item], 2)      def overLastCompute(v2, absSub):      """      :param v2: float       :param absSub: absolute subtract       :return: high/low/null      """      v1 = curValue[item]      v2 = round(v2, 2)      if 0 == v2:        if v1 > absSub:          return "HIGH"        if v1 < -absSub:          return "LOW"        return "NULL"      subVal = abs(v1 - v2)      if subVal / v2 > CONF.RATIO and subVal > absSub:        if v1 > v2:          return "HIGH"        return "LOW"      return "NULL"    self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item])    res = self.ipInfo[ip]["result"][item] == "HIGH" # Series    if all(i for i in res):      resErr[item] = CONF.HIGH_ERR      if CONF.HIGH_ERR == self.lastCache[str(ip)][item]:        # will Alert if switch on         pass    else:      res = self.ipInfo[ip]["result"][item] == "LOW"      if all(i for i in res):        resErr[item] = CONF.LOW_ERR        if CONF.LOW_ERR == self.lastCache[str(ip)][item]:          # will Alert if switch on           pass

6、由于IP较多,并且主要逻辑在解析数据和panda的计算上,使用CPU比较多,则需要使用多进程,并且结合线程池将进程跑满,别浪费进程资源。

step = ipNums / multiprocessing.cpu_count()ipList = list()i = 0j = 1processList = list()for ip in self.ipInfo:  ipS = str(ip)  if ipS not in self.lastCache:    self.lastCache[ipS] = copy.deepcopy(self.value)  ipList.append(ip)  i += 1  if i == step * j or i == ipNums:    j += 1    def innerRun():      wm = Pool.ThreadPool(CONF.POOL_SIZE)      for myIp in ipList:        kw = dict(ip=myIp, handlerKey=myIp)        wm.addJob(self.handleOne, **kw)      wm.waitForComplete()      ipListNums = len(ipList)      for tmp in xrange(ipListNums):        res = wm.getResult()        if res:          handlerKey, code, handlerRet, errMsg = res          if 0 != code:            continue          self.lastCache[str(handlerKey)] = handlerRet    process = multiprocessing.Process(target=innerRun)    process.start()    processList.append(process)    ipList = list()for process in processList:  process.join()

四、优化

指标监控v2则是同比告警的升级版,数据量将会大好几倍,目前想到的优化如下

1、使用hbase替代redis

2、将程序做京广容灾,改成分布式运行,横向扩展多套程序并列运行