
把监听日志和脚本放到一个目录
yuyuz-mac:analyze-listenerlog yuyuz$ pwd/Users/yuyuz/python/analyze-listenerlogyuyuz-mac:analyze-listenerlog yuyuz$ lslistener.log lsnrlog.py
yuyuz-mac:analyze-listenerlog yuyuz$ python lsnrlog.py提取到的成功时间戳数量: 1920提取到的失败时间戳数量: 15081提取到的总时间戳数量: 17001HTML 文件生成成功yuyuz-mac:analyze-listenerlog yuyuz$ lsconnection_analysis.html listener.log lsnrlog.py






import refrom datetime import datetimefrom pyecharts import options as optsfrom pyecharts.charts import Line, Pagedef parse_log(log_text):"""解析日志文本,提取包含CONNECT_DATA字段行的时间戳、SERVICE、HOST和IP,并区分成功和失败连接:param log_text: 日志文本:return: 成功时间戳列表、失败时间戳列表、总时间戳列表、SERVICE信息列表、HOST信息列表、IP信息列表"""success_timestamps = []failure_timestamps = []total_timestamps = []services = []hosts = []ips = []pattern = r'(\d{2}-[A-Z]{3}-\d{4} \d{2}:\d{2}:\d{2})|(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})'service_pattern = r'SERVICE_NAME=(\w+)'host_pattern = r'HOST=(\w+)'ip_pattern = r'HOST=(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'for line in log_text.split('\n'):if 'CONNECT_DATA' in line:match = re.search(pattern, line)if match:timestamp_str = match.group(0)try:if '-' in timestamp_str[0:3]:timestamp = datetime.strptime(timestamp_str, '%d-%b-%Y %H:%M:%S')else:timestamp = datetime.strptime(timestamp_str, '%Y-%m-%dT%H:%M:%S')total_timestamps.append(timestamp)if line.strip().endswith('0'):success_timestamps.append(timestamp)else:failure_timestamps.append(timestamp)service_match = re.search(service_pattern, line)host_match = re.search(host_pattern, line)ip_match = re.search(ip_pattern, line)service = service_match.group(1) if service_match else Nonehost = host_match.group(1) if host_match else Noneip = ip_match.group(1) if ip_match else Noneservices.append((timestamp, service))hosts.append((timestamp, host))ips.append((timestamp, ip))except ValueError:print(f"无法解析时间戳: {timestamp_str}")print(f"提取到的成功时间戳数量: {len(success_timestamps)}")print(f"提取到的失败时间戳数量: {len(failure_timestamps)}")print(f"提取到的总时间戳数量: {len(total_timestamps)}")return success_timestamps, failure_timestamps, total_timestamps, services, hosts, ipsdef count_connections(timestamps, interval):"""统计不同时间间隔的连接数:param timestamps: 时间戳列表:param interval: 时间间隔,如 'H' 表示小时,'T' 表示分钟,'S' 表示秒:return: 时间间隔和对应的连接数"""counts = {}for timestamp in timestamps:if interval == 'H':key = timestamp.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:00:00')elif interval == 'T':key = timestamp.replace(second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:00')elif interval == 'S':key = timestamp.replace(microsecond=0).strftime('%Y-%m-%d %H:%M:%S')if key in counts:counts[key] += 1else:counts[key] = 1return sorted(counts.items())def count_by_attribute(data, interval, attribute):"""按指定属性统计不同时间间隔的连接数:param data: 包含时间戳和属性的元组列表:param interval: 时间间隔,如 'H' 表示小时,'T' 表示分钟,'S' 表示秒:param attribute: 属性名称:return: 以属性为键,时间间隔和对应连接数为值的字典"""counts = {}for timestamp, attr in data:if attr is None:continueif interval == 'H':key = timestamp.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:00:00')elif interval == 'T':key = timestamp.replace(second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:00')elif interval == 'S':key = timestamp.replace(microsecond=0).strftime('%Y-%m-%d %H:%M:%S')if attr not in counts:counts[attr] = {}if key in counts[attr]:counts[attr][key] += 1else:counts[attr][key] = 1for attr in counts:counts[attr] = sorted(counts[attr].items())return countsdef create_line_chart(success_data, failure_data, total_data, interval):"""创建线图:param success_data: 成功连接的时间间隔和对应的连接数:param failure_data: 失败连接的时间间隔和对应的连接数:param total_data: 总连接的时间间隔和对应的连接数:param interval: 时间间隔,如 '小时','分钟','秒':return: 线图对象"""x_data = sorted(set([item[0] for item in success_data + failure_data + total_data]))success_y_data = [next((item[1] for item in success_data if item[0] == x), 0) for x in x_data]failure_y_data = [next((item[1] for item in failure_data if item[0] == x), 0) for x in x_data]total_y_data = [next((item[1] for item in total_data if item[0] == x), 0) for x in x_data]line = (Line(init_opts=opts.InitOpts(bg_color="#f5f5f5", width="100%")).add_xaxis(x_data).add_yaxis(series_name=f"成功连接总数/{interval}",y_axis=success_y_data,label_opts=opts.LabelOpts(is_show=False),linestyle_opts=opts.LineStyleOpts(width=2, color="#228B22"),symbol="circle",symbol_size=6,itemstyle_opts=opts.ItemStyleOpts(color="#228B22")).add_yaxis(series_name=f"失败连接总数/{interval}",y_axis=failure_y_data,label_opts=opts.LabelOpts(is_show=False),linestyle_opts=opts.LineStyleOpts(width=2, color="#FF0000"),symbol="circle",symbol_size=6,itemstyle_opts=opts.ItemStyleOpts(color="#FF0000")).add_yaxis(series_name=f"总连接数/{interval}",y_axis=total_y_data,label_opts=opts.LabelOpts(is_show=False),linestyle_opts=opts.LineStyleOpts(width=2, color="#FF6347"),symbol="circle",symbol_size=6,itemstyle_opts=opts.ItemStyleOpts(color="#FF6347")).set_global_opts(title_opts=opts.TitleOpts(title=f"连接情况统计/{interval}",title_textstyle_opts=opts.TextStyleOpts(font_size=20, color="#333")),toolbox_opts=opts.ToolboxOpts(is_show=True),xaxis_opts=opts.AxisOpts(name=interval,axislabel_opts=opts.LabelOpts(rotate=45, font_size=12, color="#666"),name_textstyle_opts=opts.TextStyleOpts(font_size=14, color="#333")),yaxis_opts=opts.AxisOpts(name="连接数",axislabel_opts=opts.LabelOpts(font_size=12, color="#666"),name_textstyle_opts=opts.TextStyleOpts(font_size=14, color="#333")),tooltip_opts=opts.TooltipOpts(trigger="axis"),legend_opts=opts.LegendOpts(type_="scroll",orient="vertical",pos_left="right",pos_top="middle",textstyle_opts=opts.TextStyleOpts(font_size=8, color="#333"))))return linedef create_attribute_line_chart(counts, interval, attribute):"""创建按属性统计的线图:param counts: 以属性为键,时间间隔和对应连接数为值的字典:param interval: 时间间隔,如 '小时','分钟','秒':param attribute: 属性名称:return: 线图对象"""all_x_data = set()for data in counts.values():for x, _ in data:all_x_data.add(x)x_data = sorted(all_x_data)line = Line(init_opts=opts.InitOpts(bg_color="#f5f5f5", width="100%"))line.add_xaxis(x_data)for attr, data in counts.items():y_data = [next((item[1] for item in data if item[0] == x), 0) for x in x_data]line.add_yaxis(series_name=f"{attr}/{interval}",y_axis=y_data,label_opts=opts.LabelOpts(is_show=False),linestyle_opts=opts.LineStyleOpts(width=2),symbol="circle",symbol_size=6)line.set_global_opts(title_opts=opts.TitleOpts(title=f"{attribute} 连接情况统计/{interval}",title_textstyle_opts=opts.TextStyleOpts(font_size=20, color="#333")),toolbox_opts=opts.ToolboxOpts(is_show=True),xaxis_opts=opts.AxisOpts(name=interval,axislabel_opts=opts.LabelOpts(rotate=45, font_size=12, color="#666"),name_textstyle_opts=opts.TextStyleOpts(font_size=14, color="#333")),yaxis_opts=opts.AxisOpts(name="连接数",axislabel_opts=opts.LabelOpts(font_size=12, color="#666"),name_textstyle_opts=opts.TextStyleOpts(font_size=14, color="#333")),tooltip_opts=opts.TooltipOpts(trigger="axis"),legend_opts=opts.LegendOpts(type_="scroll",orient="vertical",pos_left="right",pos_top="middle",textstyle_opts=opts.TextStyleOpts(font_size=8, color="#333")))return linedef main():"""主函数,从文件读取日志并生成包含多个线图的网页"""try:with open('listener.log', 'r', encoding='utf-8') as file:log_text = file.read()success_timestamps, failure_timestamps, total_timestamps, services, hosts, ips = parse_log(log_text)# 统计每小时、每分钟、每秒的成功、失败和总连接数intervals = ['H', 'T', 'S']interval_names = ['小时', '分钟', '秒']page = Page()for interval, interval_name in zip(intervals, interval_names):hourly_success_data = count_connections(success_timestamps, interval)hourly_failure_data = count_connections(failure_timestamps, interval)hourly_total_data = count_connections(total_timestamps, interval)hourly_chart = create_line_chart(hourly_success_data, hourly_failure_data, hourly_total_data, interval_name)page.add(hourly_chart)# 按SERVICE、HOST、IP统计并绘制线图attributes = [('SERVICE', services), ('HOST', hosts), ('IP', ips)]for attribute_name, attribute_data in attributes:for interval, interval_name in zip(intervals, interval_names):counts = count_by_attribute(attribute_data, interval, attribute_name)chart = create_attribute_line_chart(counts, interval_name, attribute_name)page.add(chart)page.render("connection_analysis.html")# 添加版权信息到 HTML 文件copyright_info = '<div style="text-align: center; padding: 20px; font-size: 12px;">声明:仅用作兴趣爱好和测试使用,请勿商用,不承担任何商业责任。--张宇--</div>'with open('connection_analysis.html', 'a', encoding='utf-8') as f:f.write(copyright_info)print("HTML 文件生成成功")except FileNotFoundError:print("未找到 listener.log 文件,请确保文件存在。")except Exception as e:print(f"发生错误: {e}")if __name__ == "__main__":main()


文章转载自godba,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




