自动化运维,你玩转了吗?
【摘要】
经过前面几个章节的学习,总算是初步上手了文本的解析功能,那么这一章节中我们就来完善巡检框架,让它具备解析能力,这样才是一个完整巡检模块。
【模版管理线上化】
Action 模型改造
上一章节中提到,考虑到程序设计的可扩展性,我们希望把解析模板做线上管理,这里采用的方式是把模板的内容保存到数据库中,以供我们在需要的时候查询检索。
巡检框架已存在的实体中,有一个 Action 实体,这个实体用来表示执行的巡检动作,它所对应的表结构如下:
class Action(db.Model):
__tablename__ = "action"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String(64), nullable=False, comment="动作名称")
description = db.Column(db.String(256), comment="动作描述")
vendor = db.Column(db.String(64), comment="厂商")
model = db.Column(db.String(64), comment="型号")
cmd = db.Column(db.String(256), nullable=False, comment="命令行")
type = db.Column(db.String(8), comment="命令类型[show|config]")
parse_type = db.Column(db.String(8), comment="解析类型[regexp|textfsm]")
parse_content = db.Column(db.String(1024), comment="解析内容")上述 Action 的属性中有两个属性就是专门预留来做解析时用的,因为每一个巡检动作都会对应一条具体的命令,而解析功能必然是和命令挂钩的,所以解析的相关属性理所当然应该与 Action 存放在一起。
parse_content 属性就是用来保存模板内容(或者正则表达式)的,但之前的设计中,把这一列设置成了 Varchar(1024),那么通过上一章节的学习,大家应该会发现在 ntc-template 库中提供的解析模板会存在较长的情况,所以我们需要将该列在数据库中改为 Text,使其可以保存更多的字符(65535个字符)。
修改数据库列字段的 DDL 如下:
ALTER TABLE action MODIFY COLUMN parse_content TEXT COMMENT '解析内容';同时 Action Model 中的 parse_content 修改如下:
parse_content = db.Column(db.Text, comment="解析内容")
Action 录入
之前的章节中已经实现巡检动作的增删改查,所以我们可以通过接口来进行 Action 的操作,其中就可以包含对解析模板的管理。
比如增加一个巡检动作的请求如下:
POST http://127.0.0.1:5000/action/add
Content-Type: application/json
[
{
"name": "version_check",
"description": "版本检查",
"vendor": "cisco",
"model": "ios",
"cmd": "show version",
"type": "show",
"parse_type": "textfsm",
"parse_content": "{模版内容}"
}
或者对已存在的巡检动作来修改其解析模板,请求如下:
POST http://127.0.0.1:5000/action/update
Content-Type: application/json
[
{
"id": "1" ,
"parse_type": "textfsm",
"parse_content": "{模版内容}"
}
]
通过以上的方式就可以将模板与巡检动作+执行的命令进行绑定,统一进行管理,而不是通过额外的文件形式来做。
【执行+解析】
在之前章节中我们已经实现了基于SSH的执行器,并且可以在初始化执行器时,传入 ActionHandler 与 DeviceHandler 进行后续的创建连接和执行命令使用,其中用于执行命令的 execute() 方法代码如下:
class SSHExecutor:
...
def execute(self, action: Optional[Action] = None, action_condition: Optional[Dict] = None, read_timeout: int = 10) -> str:
action_condition.update({"vendor": self.device.vendor.lower(), "model": self.device.model.lower()})
if action is None:
action = self.fetch_action(action_condition)
if action.type == CommandType.Config:
output = self.conn.send_config_set(action.cmd, read_timeout=read_timeout)
else:
self.conn.enable()
output = self.conn.send_command(action.cmd, read_timeout=read_timeout)
# TODO parse_result = self.parse()
parse_result = ""
self.save(action.cmd, output, parse_result)
return output
上述代码中我们预留了一个 TODO 部分,用来添加解析逻辑,现在的目标就是将解析功能添加在这里。
解析参数
因为现在默认执行 execute 后是不进行解析的,所以需要在该方法中设置一个参数 parse,来表示是否在执行完命令后,对输出结果进行解析。
大家可以回想一下上一章节中介绍的 Netmiko 里自带的 send_command 函数也有解析的功能,并且还支持三种解析方式,分别是 TextFSM、TTP、Genie,只需传入对应的 use_textfsm、use_ttp、use_genie 即可;我们执行器中的 execute 也可以支持多种解析,目前支持 regexp 和 textfsm,如果后续想支持 ttp 或者其他也是可以的,但为什么只需要设置一个 parse 参数呢?
原因是 execute 方法中传入的 action 参数是一个巡检项模型,这个对象里就已经包含了其所对应的执行命令和解析方式以及解析模板,所以就不需要像 send_command 一样将解析的参数与执行的命令拆开传递,这也是大型项目的设计与工具包的设计之间的差异,大家可以下来仔细体会一下。
解析逻辑
ActionHandler 改造
这里的 parse 参数是一个 bool 类型的变量,需要解析时该变量为 True,所以在执行完命令之后需要加一段逻辑如下:
if parse:
# parse
现在需要考虑的就是解析的逻辑应该属于哪个对象?
既然解析类型和模板内容是属于 Action 对象的,那么解析的逻辑其实顺理成章的应该属于 ActionHandler 对象,因为这个对象就是用来处理 Action 相关的衍生逻辑的。
所以我们在 ActionHandler 类中增加解析的逻辑,代码如下:
from ntc-template import parse_output
from ..models.action import ParseTypeEnum
class ActionHandler(abc.ABC):
""" 该类为 Handler 的抽象类 """
...
@abc.abstractmethod
def parse(self, device_type: str, action: Any, output: str) -> List[Dict]:
""" 抽象方法,继承了 ActionHandler 的子类必须实现该方法 """
pass
class ActionJSONHandler(ActionHandler):
""" 基于 json 文件实现的 Handler 类 """
...
def parse(self, device_type: str, action: Dict, output: str) -> List[Dict]:
""" 使用 ntc-template 进行解析 """
try:
if action["parse_type"] == ParseTypeEnum.TextFSM.value:
return parse_output(platform=device_type, command=action["cmd"], data=output)
except Exception:
pass
return []
class ActionORMHandler(ActionHandler):
""" 基于 ORM 实现的 Handler 类 """
...
def parse(self, device_type: str, action: Action, output: str) -> List[Dict]:
""" 使用基于字符串的 TextFSM 进行解析 """
try:
if action.parse_type == ParseTypeEnum.TextFSM.value:
fsm = TextFSM(StringIO(action.parse_content))
return fsm.ParseTextToDicts(output)
except Exception:
pass
return []大家可以回想之前我们实现了两种 ActionHandler,第一种是基于 JSON 文件的,可以处理 action 不存在数据库的情况,第二种是基于 ORM 的,处理在数据库中保存 action 的情况,同时还定义了一个抽象的 ActionHandler,利用 abc.ABC 来实现对其子类方法的约束。
当需要增加 parse 逻辑时,应该先在抽象类 ActionHandler 中增加该抽象方法,以此来约束 ActionJSONHandler 和 ActionORMHandler 必须实现 parse 逻辑。
对于 ActionJSONHandler 来说,适用于小范围的场景,action 和 device 都通过文件形式管理,所以 parse 逻辑就恰好适合使用通过文件管理模板的 ntc-template。
对于 ActionORMHandler 来说,所有的数据都通过数据库进行存储,所以 parse 逻辑中通过基于字符串的 TextFSM 模板进行解析。
有一个小细节就是在解析的时候判断了一下 parse_type,判断的时候引用了 model.py 中定义的枚举类 ParseTypeEnum,关于为什么使用枚举类而不直接用 if parse_type == "textfsm" 来判断,之前的文章中也提到过;
在程序中尽量不使用字面量,其一是因为字面量不易读,其二是因为字面量扩展性差,不易维护。
Execute 改造
现在已经实现了 parse 逻辑,那么 execute 函数要如何补全逻辑呢?代码如下:
class SSHExecutor:
...
def execute(
self, action: Optional[Action] = None,
action_condition: Optional[Dict] = None,
read_timeout: int = 10, parse: bool = False) -> Union[List, str]:
...
parse_result = ""
if parse:
parse_result = self.action_handler.parse(self.device_type, action, output)
...
return parse_result if parse_result else output上述代码省略了 execute 方法中的其他逻辑,解析的部分实际上只增加了一行;在初始化 SSHExecutor 的时候已经传入了 ActionHandler,所以这里只需要调用 self 本身的 action_handler 中的 parse 方法,传入 device_type、action、output 参数即可完成解析。
该函数的返回值也做了一下处理,在可以正确解析的情况下返回 parse_result,否则直接返回 output。
【执行接口】
我们已经实现了通过接口来触发执行器,所以还需要改造一下路由函数,如下:
@executor_blueprint.route("/execute", methods=["POST"])
def execute():
data = request.get_json()
try:
device_condition = data.get("device_condition")
action_condition = data.get("action_condition")
action = None
if "action" in data:
action = Action.to_model(**data.get("action"))
device_handler = DeviceORMHandler(db.session())
action_handler = ActionORMHandler(db.session())
with SSHExecutor(
username=current_app.config.get("SSH_USERNAME"),
password=current_app.config.get("SSH_PASSWORD"),
secret=current_app.config.get("SSH_SECRET"),
device_condition=device_condition,
device_handler=device_handler,
action_handler=action_handler,
logger=current_app.logger) as ssh:
output = ssh.execute(
action=action,
action_condition=action_condition,
parse=data.get("parse", False)
)
return Success(data=output)
except Exception as e:
raise ExecutorError(message=str(e))
【健壮性】
执行器作为提供底层支持能力的模块,在健壮性方面必须足够强,所以现在还需要做一些额外的处理以应对异常状况。
重试
创建连接是最容易发生错误的地方,可能会由于网络抖动或者设备繁忙导致连接失败,所以重试是非常有必要的,代码如下:
class SSHExecutor:
...
self.retry_times = 3
self.conn = self.connect(self.retry_times)
def connect(self, retry: int) -> BaseConnection:
if retry == 0:
raise Exception("Retry to connect over maximum")
try:
conn = ConnUnify(host=self.host, port=self.port, username=self.username, password=self.password,
secret=self.secret, device_type=self.device_type, conn_timeout=self.conn_timeout,
auth_timeout=self.auth_timeout, banner_timeout=self.banner_timeout, session_log="netmiko.log",
session_log_file_mode="append")
msg = f"Netmiko connection successful to {self.host}:{self.port}"
self.logger.info(msg)
return conn
except Exception as e:
self.logger.error(str(e))
return self.connect(retry - 1)
上述代码中,定义了一个最大重试次数,并且在调用 ConnUnify 失败后重试次数减一再次调用自身,直至重试次数为 0 后抛出连接异常。
这里抛出异常是非常有必要的,因为如果没有成功创建连接,那么 self.conn 属性将会是 None,但其他实例方法使用 self.conn 时是默认该属性有值的。
所以我们要在连接异常的情况下,及时抛出异常,避免将异常“吞掉”而导致下游方法发生不可预知的错误。
懒加载
在 execute 方法中使用了创建好的连接,大家可以一种场景:初始化执行器的时候连接创建成功,但由于长时间未使用,或者网络质量不佳,导致连接丢失或被关闭,那此时 execute 执行命令就会出现异常。
所以最佳的做法是在 execute 中使用连接前获取连接即可,而不需要在初始化执行器的时候就去创建连接,代码如下:
class SSHExecutor:
...
self.conn = None
@property
def connection(self) -> BaseConnection:
if not self.conn or not self.conn.is_alive():
self.conn = self.connect(self.retry_times)
return self.conn
def execute(
self, action: Optional[Action] = None,
action_condition: Optional[Dict] = None,
read_timeout: int = 10, parse: bool = False) -> Union[List, str]:
action_condition.update({"vendor": self.device.vendor.lower(), "model": self.device.model.lower()})
if action is None:
action = self.fetch_action(action_condition)
if action.type == CommandType.Config:
output = self.connection.send_config_set(action.cmd, read_timeout=read_timeout)
else:
self.connection.enable()
output = self.connection.send_command(action.cmd, read_timeout=read_timeout)
parse_result = ""
if parse:
parse_result = self.action_handler.parse(self.device_type, action, output)
self.save(action.cmd, output, parse_result)
return parse_result if parse_result else output上述代码中用到了 Python 中的属性装饰器 @property,被该装饰器装饰的实例方法可以直接作为实例属性使用,因此在 execute 方法中将 self.conn 均改为 self.connection。
而 connection() 的逻辑是,判断此刻如果不存在 self.conn 或者 self.conn 不是可用状态,则发起连接获取 self.conn,最后将 self.conn 返回。
这种模式是在大型项目中非常常见的懒加载模式,有两点好处,一是可以避免在初始化对象时耗时过多或者产生当下的资源;二是在需要时获取可以保证资源的实时性。
【演示】
执行器到目前为止已经改造完成,我们发送请求测试一下:
POST http://127.0.0.1:5000/executor/execute
Content-Type: application/json
{
"device_condition": {
"ip": "192.168.31.149"
},
"action_condition": {
"name": "version_check",
"model": "cisco"
},
"parse": true
}
在 execute 请求中传入 parse 参数,代表在执行完命令后进行解析,返回结构化数据。
这里我们将模拟器关闭,测试连接失败的情况,返回内容如下:
{
"data": null,
"status_code": 520,
"message": "Retry to connect over maximum",
"request": "POST executor/execute"
}
模拟器正常运行时,再次发起请求,返回如下:
{
"data": [
{
"VERSION": "",
"ROMMON": "Bootstrap",
"HOSTNAME": "r1",
"UPTIME": "0 minutes",
"UPTIME_YEARS": "",
"UPTIME_WEEKS": "",
"UPTIME_DAYS": "",
"UPTIME_HOURS": "",
"UPTIME_MINUTES": "0",
"RELOAD_REASON": "Unknown reason",
"RUNNING_IMAGE": "/opt/unetlab/addons/iol/bin/i86bi-linux-l3-adventerprisek9-15.4",
"HARDWARE": [],
"SERIAL": [
"67108896"
],
"CONFIG_REGISTER": "0x0",
"MAC": [],
"RESTARTED": ""
}
],
"status_code": 200,
"message": "success",
"request": "POST executor/execute"
}
【总结】
到目前为止,我们已经完成了执行器的全部设计,但是对于巡检模块来说,还缺少一个重要的功能,那就是巡检项的结果校验以及导出,在后面的章节,我会带大家逐步进行这一功能的设计和实现。
【附录】

yuefeiyu1024
添加作者微信加入专属学习交流群,获取更多干货秘籍




