Python
2020-03-24 10:00:00 +0800
项目背景
- 采集厂商网络硬件设备基础指标,封装OpenFalcon数据结构
- 参与开发人数:1人
监控系统动态监控
- 需要实现的目标:
- 数据库监控表项动态管理监控项数据处理、封装接口
- 新增监控项由对于处理规则处理,项目的新监控项开发只需要编写规则函数和录入监控项字段即可
1. API 处理和报文解析
- 项目启动,导入文件后,解释器会执行 RouteRegister.registerAPI()
- 文件路经
monitor/monitor-server/monitor/api/metric_api/dynamic_router_api.py
- 从数据库获取动态监控表项的字段
- 根据获取回来的字段生成对于HTTP接口和类
- 此文件的执行入口是
RouteRegister.registerAPI()
,需要关注- 使用了项目统一实现的
@get_mapping
装饰器,实现了RESTfulAPI的路经(路由)注册 - 因为python的type底层为c结构体,此处使用type函数,实现type的new,call,init方法,返回一个类的引用,且该引用会在装饰器中存储在全局的变量中,内存地址在可分配地址段,“堆”;
- 从数据库获取回来的数据保存在每个类的静态变量中
- tornado API 接收socket调用后,会执行process函数
- process函数实现原始数据处理的对象,并将从数据库表项获取的字段做初始化
- 调用DynamicMetricDataHandler的对象报文处理函数(此处http接收报文的载荷部分为json format)
- 使用了项目统一实现的
- cpython source _typeobject
https://github.com/python/cpython/blob/master/Objects/typeobject.c
typedef struct _typeobject { int ob_refcnt; //引用计数 struct _typeobject *ob_type; // 类型对象 int ob_size; //变长对象的长度,如len(list), len(str), len(dict),int类型没有该属性 const char *tp_name; /* For printing, in format "<module>.<name>" */ // 变量的类型名字 如<class 'int'> <class 'type'> Py_ssize_t tp_basicsize, tp_itemsize; /* For allocation */ // ....... }
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019-10-30 15:32
# @Author : NAYAN
# @Site :
# @File : snmp_metric_api_v2.py
# @Software : PyCharm
"""
DES:
监控项目数据处理入口
@API
1. 从数据库t_snmp_task中全量拉取监控项
2. 根据拉取得数据生成对应的数据上报API
3. API关联到相应的数据处理函数
"""
MONITOR_ITEM_TB = "t_monitor_item"
from tornado import escape
from monitor.core.handlers import get_mapping, BaseHandler
from monitor.pkg.utils.logger import LOG
from monitor.core.metric.dynamic.dynamic import DynamicMetricDataHandler
from monitor.core.handlers.agent_error_handler import MetricErrorHandler
from monitor.db import api
class RouteRegister(object):
metric_taks = {
}
@staticmethod
def bindRouteToMetircDataHandler(__route, __des, monitor_item):
"""
:param __route:
:param __des:
:param monitor_item:
:return:
"""
@get_mapping(__route, __des)
class MonitorMetricAPI(BaseHandler):
_metric = ""
def process(self):
"""
:return:
"""
try:
body = self.request.body
content_type = self.request.headers.get("Content-Type", "")
if content_type.startswith('application/json'):
data = escape.json_decode(body)
try:
dynamic= DynamicMetricDataHandler(monitor_item=MonitorMetricAPI._metric)
did_send_metric = dynamic.handler_distributor(metric_raw=data)
if did_send_metric:
return 1, 'success', {}
else:
return 0, 'failed', {}
except Exception as e:
LOG.error("Dynamic metric data handler bind filed! error=%s" % (e))
return 0, 'failed', {}
except Exception as e:
return 0, 'failed', {"Message": str(e)}
if not monitor_item:
return None
MonitorMetricAPI._metric = monitor_item
new_class_name = "".join([v[0].upper() + v[1:] for v in __route.split("/")[-1].split("_")])
return type(new_class_name, (MonitorMetricAPI,), {})
@staticmethod
def registerAPI():
raw_tasks = RouteRegister.loadAllAPIFromInventory()
LOG.info("[Register Monitor Item API From DB...]")
for raw in raw_tasks:
if raw and raw.get('OpenFalcon_upload'): #
__route = str(raw["upload_api"])
__des = str(raw["desc"])
LOG.info("[Upload API]: %s" % __route)
RouteRegister.bindRouteToMetircDataHandler(__route, __des, raw)
@staticmethod
def loadAllAPIFromInventory():
data = api.pg_query(MONITOR_ITEM_TB)
LOG.info("[ITEM DATA] %s" % data)
return data
@get_mapping('/monitor_upload/metric_error_handler.do', '')
class ErrorHandler(BaseHandler):
def process(self):
try:
body = self.request.body
content_type = self.request.headers.get("Content-Type", "")
if content_type.startswith('application/json'):
data = escape.json_decode(body)
MetricErrorHandler.process_metric_error(data)
return 1, 'success', {}
except Exception as e:
return 0, 'failed', {"Message": str(e)}
RouteRegister.registerAPI()
2. 规则映射数据处理
- 实现了对不同类型规则的处理逻辑;
- 单OID处理:一个OID为采集对象,采集结果为单一类型数据,例如整型、浮点类型、字符、字符串、数组
- 多OID处理:一个OID为采集对象,采集结果为多行数据,数据类型一致,例如带索引的分组交换机接口或硬件端口
- 多OID多处理:多个OID为采集对象,采集结果可能为每个OID一个单一结果,或者每个OID多个结果,具体规则函数会对这样的报文做处理;
动态监控项表的字段结构
+-------------+---------------------+------+-----+---------+---------------------------------+
| Field | Type | Null | Key | description |
+-------------+---------------------+------+-----+---------+---------------------------------+
| id | varchar | NO | PRI | 主键,唯一监控项名称,包含小写字母和下划线 |
| vendor | varchar | NO | | 厂商 |
| type | varchar | NO | | 设备类型 (链路交换机,路由器,防火墙) |
| model | jsonb | YES | | 设备型号 厂商定义 |
| series | jsonb | YES | | 设备系列 厂商定义 |
| metric | varchar | NO | | 采集数据名称,元数据描述 |
| metric_type | int4 | NO | | 采集数据类型(处理) 0表示普通数据,1表示增量数据 |
| upload_api | varchar | NO | | 原始数据上报RESTful接口 |
| task | jsonb | NO | | agent端执行的采集任务定义 |
| desc | varchar | YES | | 描述 |
| OpenFalcon_upload| bool | NO | | 是否将数据发送到 OpenFalcon 平台 |
| update_time | timestamp | NO | | 记录更新时间 |
| interval | int4 | YES | | agent执行采集的时间间隔 |
+-------------+---------------------+------+-----+---------+---------------------------------+
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019-10-31 14:57
# @Author : NAYAN
# @Site :
# @File : dynamic.py
# @Software : PyCharm
"""
动态监控项处理
1. 从数据库t_monitor_item表拉取监控项
2. 由监控项生成API和监控处理对象(class)
3. 监控处理对象将调用相应的动态处理函数
4. 动态处理函数会根据 task_operation 判断数据处理函数的类型,包括单数据处理,批量数据处理,表达式处理三种
5. 监控数据部分会有两个处理函数,第一个是tags的配置,第二个是value的处理
"""
from monitor.core.handlers.device_info_handler import DeviceInfo
from monitor.pkg.utils.import_util import LOG
from monitor.core.model.monitor_data_model import MonitorItemDataModel
from monitor.core.model import monitor_data_model as DModle
from monitor.core.metric.dynamic.dynamic_rules import ValueRules
from monitor.core.metric.dynamic.dynamic_rules import TagRules
from monitor.core.metric.dynamic.dynamic_rules import OperationRules
from monitor.core.metric.publisher import Falcon
LogPrefix = "[Dynamic Monitor]"
DefaultValue = -1
Metric_Value_Type = {
1: "int",
0: "float",
-1: "counter"
}
Metric_Type = {
1: "GAUGE",
0: "GAUGE",
-1: "COUNTER"
}
class DynamicMetricDataHandler(object):
"""
@metric_item
监控项数据结构,用来生成监控项上报API和处理函数
@metric_raw
Agent上报的原始监控数据
"""
def __init__(self, monitor_item=None):
"""
:param monitor_item:
"""
mon_object = MonitorItemDataModel(**monitor_item)
self._mon_object = mon_object # @监控对象
self._vendor = mon_object.vendor # @设备厂商
self._type = mon_object.type # @设备类型
self._model = mon_object.model # @设备型号
self._series = mon_object.series # @设备型号所属系列
self._metric = mon_object.metric # @监控项名称m
self._metric_type = mon_object.metric_type # @是否计数器
self._upload_api = mon_object.upload_api # @上报接口
self._OpenFalcon_upload = mon_object.OpenFalcon_upload # @是否上报OpenFalcon
self._des = mon_object.description # @监控项描述
self._task = mon_object.task # @监控项对应监控任务
self.manage_ip = DModle.DEFAULT_STRING
self.raw_data = DModle.DEFAULT_JSON_DIC
self.timestamp = DModle.DEFAULT_INTEGER
self.sysname = DModle.DEFAULT_STRING
self.step = DModle.DEFAULT_INTEGER
self.oid_data = {}
def _parse_metric_raw(self, metric_raw):
"""
@@@ Handle Agent upload original DATA
:param metric_raw:
@example:
{
"content": null,
"host": null,
"protocol": null,
"scheduler": null,
"timestamp": 1572574430,
"company": null,
"interval": null,
"id": null,
"response": {
"get": {
"1.3.6.1.4.1.2636.3.1.13.1.8.9.1.0.0": {
"oid_index": "",
"oid": "enterprises.2636.3.1.13.1.8.9.1.0.0",
"snmp_type": "GAUGE",
"value": "1"
}
}
}
}
@@ Agent采集数据结构
"content": @采集job内容
"host": @采集对象管理ip
"protocol": @采集使用的网络协议
"scheduler": @采集是否为周期interval || once
"timestamp": @采集数据的时间戳
"company": @采集设备的厂商
"interval": @采集的时间间隔
"id": @采集任务的唯一ID
:return:
"""
try:
if not self._metric: ##TODO Value Check and LOG IT!!!!!!!!!!!!!!!!!!!!!!!!
LOG.error("%s metric item data parse failed, error=" % LogPrefix)
return
if not metric_raw:
LOG.error("%s metric raw parse failed, error=" % LogPrefix)
return
self.manage_ip = metric_raw.get("host")
self.raw_data = metric_raw.get("response")
self.timestamp = metric_raw.get("timestamp")
self.step = int(metric_raw.get("interval"))
self.sysname = DeviceInfo.devs[self.manage_ip].get('sysname')
# self.sysname = "abcd"
if not self.sysname:
raise Exception("get sysname failed!")
return True
except Exception as e:
LOG.exception(self._error_handler(err_des="parse raw metric error", exp=e))
return False
def _error_handler(self, err_des="", exp=None):
"""
@@@ Error handler
:param e:
:return: Error msg
# LOG.error(self._error_handler(err_des="", exp=e))
"""
e_msg = '%s monitor item data process error, device=[ip=%s, sysname=%s, vendor=%s, type=%s, model=%s, series=%s, metric=%s], err_msg=%s, exp=%s' \
% (LogPrefix, self.manage_ip, self.sysname, self._vendor, self._type, self._model, self._series,
self._metric, err_des, exp)
return e_msg
def _is_value_valid(self, value):
"""
@@@ SNMP error handler
@TIMEOUT
@UNKNOWN
@NO_SUCH_INSTANCE
@UNDETERMINED_TYPE_ERROR
@Connection_Error
@Undetermined_Type_Error
:param value:
:return:
"""
if value or isinstance(value, (int, float)):
return True
elif isinstance(value, (str, unicode)):
return True
# TODO check snmp error value TIMEOUT UNKNOWN OBJECTID NO_SUCH_INSTANCE NO_SUCH_OBJECTID
# except EasySNMPTimeoutError:
# LOG.error("%s request snmp error TIMEOUT.oid=%s" % (self.hostname, oids))
# except EasySNMPUnknownObjectIDError:
# LOG.error("%s request snmp error UNKNOWN OBJECTID.oid=%s" % (self.hostname, oids))
#
# except (EasySNMPNoSuchObjectError, EasySNMPNoSuchInstanceError):
# LOG.error("%s request snmp error NO_SUCH_INSTANCE OR NO_SUCH_OBJECTID.oid=%s" % (self.hostname, oids))
#
# except EasySNMPUndeterminedTypeError:
# LOG.error("%s request snmp error UNDETERMINED_TYPE_ERROR.oid=%s" % (self.hostname, oids))
# except EasySNMPConnectionError:
# LOG.error("%s request snmp error Connection_Error.oid=%s" % (self.hostname, oids))
# except EasySNMPUndeterminedTypeError:
# LOG.error("%s request snmp error Undetermined_Type_Error.oid=%s" % (self.hostname, oids))
else:
return False
def _process_value_rule(self, rule_name, **kwargs):
"""
@@@ Matching the value process rule
:param rule_name:
:param kwargs:
:return:
"""
## ToDO NO SUCN rule
r = ValueRules()
func = getattr(r, rule_name)
return func(manage_ip=self.manage_ip, **kwargs)
def _process_tag_rule(self, rule_name, **kwargs):
"""
@@@ Tags handler
@ Tags get attr from raw content
:param tags:
:return:
"""
r = TagRules()
func = getattr(r, rule_name)
return func(manage_ip=self.manage_ip, **kwargs)
def _process_opera_rule(self, rule_name, **kwargs):
"""
@@@ Matching the operation process rule
:param rule_name:
:param kwargs:
:return:
"""
r = OperationRules()
func = getattr(r, rule_name)
return func(manage_ip=self.manage_ip, **kwargs)
def _raw_data_wrapper(self):
"""
:wrapper: wrap raw data to a dic, oid as key
:return:
{
"1.3.6.1.4.1.2636.3.1.13.1.8.9.1.0.0": [
{
"oid_index": "",
"oid": "enterprises.2636.3.1.13.1.8.9.1.0.0",
"snmp_type": "GAUGE",
"value": "1"
}...
],
""1.3.6.1.4.1.2636.3.1.13.1.8.9.1.0.1": [
{
"oid_index": "",
"oid": "enterprises.2636.3.1.13.1.8.9.1.0.1",
"snmp_type": "GAUGE",
"value": "1"
}...
]...
}
"""
oid_data = {}
for method, oid_dic in self.raw_data.items():
for oid, oid_value in oid_dic.items():
if not isinstance(oid_value, (list)):
oid_value = [oid_value]
oid_data.update({oid: oid_value})
self.oid_data = oid_data
def handler_distributor(self, metric_raw=None):
"""
@self._task
{
"snmp_task":{
"oid": {
'get': [],
'bulk': [],
'walk': [],
'getnext': []
},
"value_rule": "",
"tag_rule": "",
"opera_rule": ""
}
}
@@@ Rules
:rule:三种采集数据处理规则,规则都在task_content中
1. value_rule 值处理规则,对上报数值进行处理,比如从字符串中截取监控指标
2. tag_rule 标签处理规则,如对根据接口索引获取接口速率和名称并添加到上报tags中
3. opera_rule 运算操作规则,对多个oid取回值进行运算处理获取监控指标,比如对metric_raw中的值进行加减乘除
@@@ Handlers
:handler: 三种OID处理规则
1. snmp_single 单值处理,task中只有一个操作方法和一个oid,采集结果也只有单一值(例如:get操作单一oid)
2. snmp_batch 批量处理,task中只有一个操作方法和一个oid,采集结果会有索引(例如:walk操作单一oid)
3. snmp_multi 多值处理,task中有多个操作方法或多个oid,采集结果会有多种值或多种索引(例如:风别walk操作三个oid)
:param metric_raw:
:return:
@@True 数据处理成功发送OpenFalcon
@@False 数据处理失败写入日志
"""
try:
did_parse_data = self._parse_metric_raw(metric_raw)
if not did_parse_data:
return False
if not self._task:
LOG.error(self._error_handler(err_des="monitor item task empty"))
return False
# @ task handler caller
for task_type, task_content in self._task.iteritems():
# @SNMP task
if task_type == "snmp_task":
self._raw_data_wrapper()
if len(self.oid_data.values()):
# 多oid多值
if len(self.oid_data) > 1:
return self.snmp_multi_monitor_item_handler(task_content)
# 单oid多值
if len(self.oid_data.values()[0]) > 1:
return self.snmp_batch_monitor_item_handler(task_content)
else:
return self.snmp_single_monitor_item_handler(task_content)
# @Netconf task
elif task_type == "netconf_task":
pass
else:
LOG.error("%s monitor task type not supported!" % LogPrefix)
except Exception as e:
LOG.exception(self._error_handler(exp=e))
def snmp_single_monitor_item_handler(self, task_content):
"""
:param metric_raw:
@metric_raw["response"]
"response": {
"get": {
"1.3.6.1.4.1.2636.3.1.13.1.8.9.1.0.0": {
"oid_index": "",
"oid": "enterprises.2636.3.1.13.1.8.9.1.0.0",
"snmp_type": "GAUGE",
"value": "1"
}
}
}
:return: Bool
"""
task_value_rule = task_content.get("value_rule")
task_tag_rule = task_content.get("tag_rule")
try:
if len(self.raw_data) > 1 or len(self.raw_data.values()) > 1:
raise Exception("upload data is not single value")
else:
oid_dic = self.raw_data.values()[0]
oid_content_dic = oid_dic.values()[0]
if isinstance(oid_content_dic, dict):
value = oid_content_dic.get("value")
elif isinstance(oid_content_dic, list) and len(oid_content_dic) > 0:
value = oid_content_dic[0].get("value")
oid_content_dic = oid_content_dic[0]
else:
value = None
tags = None
if value and self._is_value_valid(value):
# process rules
# 1. value rule
if task_value_rule:
value = self._process_value_rule(task_value_rule, value=value, sysname=self.sysname)
# 2. tag rule
if task_tag_rule:
tags = self._process_tag_rule(task_tag_rule, oid_content=oid_content_dic)
return self.metrics_sender(value, tags)
else:
return False
except Exception as e:
LOG.exception(self._error_handler(err_des="single value process failed", exp=e))
def snmp_batch_monitor_item_handler(self, task_content):
"""
:rule:三个处理规则,规则都在task_content中
1. value_rule 值处理规则,对上报数值进行处理,比如从字符串中截取监控指标
2. tag_rule 标签处理规则,如对根据接口索引获取接口速率和名称并添加到上报tags中
3. operation_rule 运算操作规则,对多个oid取回值进行运算处理获取监控指标,比如对metric_raw中的值进行加减乘除
@@ 基于metric索引批量处理监控数据
:param metric_raw:
@example
@metric_raw["response"]
"response": {
"walk": {
"1.3.6.1.2.1.2.2.1.14": [
{
"oid_index": "1",
"oid": "ifInErrors",
"value": "0",
"snmp_type": "COUNTER"
},
{
"oid_index": "4",
"oid": "ifInErrors",
"value": "0",
"snmp_type": "COUNTER"
},
{
"oid_index": "5",
"oid": "ifInErrors",
"value": "0",
"snmp_type": "COUNTER"
}
]
}
}
:param task_content:
:return:
"""
task_value_rule = task_content.get("value_rule")
task_tag_rule = task_content.get("tag_rule")
try:
if len(self.raw_data) > 1:
raise Exception("upload data format error")
else:
oid_dic = self.raw_data.values()[0]
oid_content_list = oid_dic.values()[0]
ret_res = []
for oid_content_dic in oid_content_list:
if isinstance(oid_content_dic, dict):
value = oid_content_dic.get("value")
if value == "2147483647":
value = "0"
tags = "index={}".format(oid_content_dic.get("oid_index"))
if value and self._is_value_valid(value):
# process rules
# 1. value rule
if task_value_rule:
value = self._process_value_rule(task_value_rule, value=value, sysname=self.sysname)
# 2. tag rule
if task_tag_rule:
tags = self._process_tag_rule(task_tag_rule, oid_content=oid_content_dic)
if not tags:
continue
res = self.metrics_sender(value, tags)
ret_res.append(res)
if False in ret_res or len(ret_res) == 0:
return False
else:
return True
except Exception as e:
LOG.exception(self._error_handler(err_des="batch value process failed", exp=e))
def snmp_multi_monitor_item_handler(self, task_content):
"""
:param metric_raw:
@example
@metric_raw["response"]
"response": {
"walk": {
"1.3.6.1.2.1.2.2.1.14": [
{
"oid_index": "1",
"oid": "ifInErrors",
"value": "0",
"snmp_type": "COUNTER"
}...
],
""1.3.6.1.2.1.2.2.1.15": [
{
"oid_index": "1",
"oid": "",
"value": "0",
"snmp_type": "COUNTER"
}...
], ...
}
}
:param task_content:
:return:
[{"value": "", "tags": ""}, {"value": "", "tags": ""}, ...]
"""
task_opera_rule = task_content.get("opera_rule")
try:
if task_opera_rule:
result = self._process_opera_rule(task_opera_rule, raw_data=self.raw_data.values()[0])
#LOG.debug("multi task name=" + task_opera_rule + ",result metric length=" + str(len(result)))
did_send = False
for v in result:
did_send = self.metrics_sender(v["value"], v["tags"])
return did_send
return False
except Exception as e:
LOG.exception(self._error_handler(err_des="get multi value process failed", exp=e))
def metrics_sender(self, value, tags):
"""
@@Tags
1. 检查tag字段,如果有值则调用 _match_tags_handler
2. 如果为空,侧检查index_tag是否有值
3. 如果有值,侧使用tags="%s/%s" %(index_tag, index)
4. 如果为空,侧无tags
:param metric:
:param title:
:return:
"""
__OpenFalcon_metric_type = Metric_Type[self._metric_type]
__v_type = Metric_Value_Type[self._metric_type]
if __v_type == "float":
value = float(value)
else:
value = int(value)
try:
m = {
"endpoint": self.sysname,
"metric": self._metric,
"timestamp": self.timestamp,
"step": self.step,
"value": value,
"counterType": __OpenFalcon_metric_type,
"tags": tags
}
# LOG.info("[DEBUG Send] %s" % m)
Falcon.send([m])
except Exception as e:
LOG.exception("parser OpenFalcon metric failed, e=%s" % e)
return True
@staticmethod
def common_data_handler(func):
"""
:param func:
:return:
"""
def monitor_wrapper(*args, **kwargs):
"""
:param args:
:param kwargs:
:return:
"""
try:
value = func(*args, **kwargs)
data_dic = dict()
data_dic["type"] = args[1]
value = value if value else -1
data_dic["val"] = value if isinstance(value, (float, int)) else int(value)
return data_dic
except Exception as e:
LOG.exception('get error, e=%s' % e)
raise e
return monitor_wrapper
3. 执行任务和处理规则
- agent所要执行的任务定义和处理规则定义
MonitorTaskJsonDic = {
"snmp_task": { #@SNMP采集任务
"oid": { #@采集OID和执行方式-同步到job.content中并关联设备
'get': [],
'bulk': [],
'walk': [],
'getnext': []
},
"tag_rule": "", #@数标签处理规则
"opera_rule": "", #@多数据处理规则
"value_rule": "" #@采集值处理规则-如:从字符串中匹配整形并转为百分比
}
}
- 此处定义一个snmp_task,agent根据此task来执行采集和上报数据
- 规则处理函数会接收 agent采集返回报文的json对象的 response 部分
"response": { "walk": { "1.3.6.1.2.1.2.2.1.14": [ { "oid_index": "1", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" }, { "oid_index": "4", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" }, { "oid_index": "5", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" } ] } }
- 规则定义
- 在task中定义tag_rule, value_rule, opera_rule,处理函数会自动将数据和处理规则进行动态匹配,当有新报文段交付后,会触发对应的规则函数;
- 规则匹配顺序为:ValueRules, TagRules, OperationRules
- 规则选用建议
- 如果采集结果为单一结果,例如”value”: “35”,使用ValueRules,将该字符串进行处理,比如”35%”,或0.35;
- 如果采集结果携带索引,对其值的处理结果可以参考1,对索引值可以使用TagRules,将索引处理为特定的标签,标识一条唯一的数据,并且该标签还可以携带其他和数据相关的属性;
- 对于多种采集OID对象和多种结果的类型,只使用自定义的OperationRules
部分ValueRules,TagRules可以对数据做通用处理
class ValueRules(object): class TagRules(object): class OperationRules(object):
4. 新监控项开发
- 如何添加一个新的监控项 (测试环境)
- 添加数据记录到动态监控表,根据每个字段名称定义相应的字段;
- vendor,type,model,series 设备相关,用来定义那些厂商哪些类型哪些型号的设备要”被agent执行新增的监控项任务“
- id,metric,metric_type,upload_api,用来定义一个唯一的监控项,和监控项在dashboard和rrd数据库中的名称,且包含唯一一个为此监控项创建的上传RESTful接口
- task 定义采集任务,agent执行什么样的任务;定义处理规则,采集回来的数据要被映射到哪些自定义的规则函数上
- interval agent对设备多久进行一次采集,”UDP“报文段交互
- 数据都定义完成后,编写你的规则函数
- 函数名称要和task中定义的函数名称一致
- 函数所属要的类,要和task中给定的一致
- 函数入参固定
- ValueRules manage_ip=None, value=None, sysname=None
- TagRules manage_ip=None, oid_content=None
- OperationRules manage_ip=None, raw_data=None
- 处理逻辑,manage_ip为设备在各自AS的子网中的管理IP地址,采集回来的数据为其他参数
- value 采集唯一值
- sysname 设备名称
- oidcontent python列表类型,列表中包含多个采集结果字典,例如
[ { "oid_index": "1", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" }, { "oid_index": "4", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" }, { "oid_index": "5", "oid": "ifInErrors", "value": "0", "snmp_type": "COUNTER" } ]
- raw_data 报文中json对象的response部分的全部数据
- 处理规则的返回值
- ValueRules 返回处理后的value
- TagsRules 返回固定格式的Tags字符串,字符串中的标签字符用”=”和”,”分隔
- OperationRules 返回处理完成后的所有数据的列表,列表中为相同结构的字典 包含”value”和”tags”,例如
[{"value": , "tags"},{"value": , "tags"},{"value": , "tags"}]
其中tags格式依然固定,value可以是整型或者浮点类型
- 完成规则函数编写后,数据上报部分有独立线程来自动完成,无需关注;
5. 调试
- agent和server的调试,请移步单元测试文档