暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

手写一个查 IP 备注的小工具

生有可恋 2024-01-26
249

在我们排查网络日志时,经常需要反查日志中的 IP 到底是什么服务器。如果有好几百台服务器,每次查询 IP 都要一个一个核实,是比较麻烦的。

最终想实现的效果是自动替换掉日志中的IP为 "IP [备注]"的形式,比如:

IP 备注信息通过文本文件输入至 Python Pickle 数据文件中作为长期存储,设计了一个 IP 字典用来存 IP 备注,如果备注信息有更新,则将新值写入字典中,旧值作为历史数据保存在集合中。

IP 字典可以用 key:value 的形式对 IP 进行索引,从而提高 IP 检索效率。历史 IP 数据存在集合中,使用集合的特性去重。IP 字典和历史数据集合最终通过 Pickle 实现数据持久化处理。重新运行命令行时不需要从纯文本文件中加载数据,因为数据量不大,最多6万多个IP,不需要上数据库。

命令行的界面为:

    $ ./ip_notes.py -h
    usage: ip_notes.py [-h] [--ip_file IP_FILE] [--data_file DATA_FILE]
    [--interactive] [--list] [--erase]


    IP 备注


    optional arguments:
    -h, --help show this help message and exit
    --ip_file IP_FILE, -i IP_FILE
    IP 文件路径,文件内容格式:IP 备注
    --data_file DATA_FILE, -d DATA_FILE
    数据文件路径,默认数据文件:ip.pkl
    --interactive, -a 读取管道中的内容,并进行IP替换
    --list, -l 显示IP字典中的内容
    --erase, -e 清空数据文件内容

    初次使用通过文本文件初始化数据文件,默认会使用 ip.pkl 作为数据文件存 IP 备注信息。

      # 数据初始化
      $ python3 ip_notes.py -i ip.txt




      # 显示格式化后的字典内容
      $ python3 ip_notes.py -l
      IP dict:
      ------------------------------
      192.168.10.23: ('院内网盘',)
      192.168.10.200: ('院内系统合集',)
      10.20.98.201: ('我的个人电脑',)


      IP history set:
      ==============================
      (empty)


      IP备注的原始数据文件是以 “IP 备注” 格式录入的纯文本文件,使用 txt 格式作为数据装载。数据装载的动作可以多次执行,当字典中 IP 已存在时会自动跳过。

      原始数据文件格式为:

      数据装载时会自动跳过空行及只有 IP 没有备注的行。注意文本文件没有行号,截图中的行号为编辑器自动添加的。在数据装载时要注意文本文件编码格式,默认使用的是 UTF-8。如果是在 Windows 上编辑需要处理文件编码的自动识别,晚点会把编码判断的功能加上。文件编码识别要用到 chardet 库,这里先埋个坑,chardet 编码识别示例代码为:

        import chardet


        def detect_file_encoding(file_path):
        with open(file_path, 'rb') as file:
        detector = chardet.universaldetector.UniversalDetector()


        for line in file:
        detector.feed(line)
        if detector.done:
        break


        detector.close()
        result = detector.result


        print(f"Detected encoding: {result['encoding']} with confidence {result['confidence']}")


        # 示例用法
        file_path = 'path/to/your/file.txt'
        detect_file_encoding(file_path)


        实际使用中可以先查看 IP 字典中已有的备注信息,在执行具体命令时通过管道将内容传递给命令行工具,激活管道解析使用的命令选项是 -a

        以上命令是替换掉 last 命令中的 IP,将备注信息打在 IP 后面,使用的是正则表达式替换,不会出现部分替换的情况。

        再给一个显示 ipset 的例子:

        也能处理一行中存在多个 IP 的情况:

        当字符串中的IP不在字典中时,不会做替换。同样只有一部分适配时也不会作替换,比如不会把 192.168.10.233 识别成 192.168.10.23

        当没有管道输入时,会停留在交互模式,可以手工输入 IP 查询备注:

        如果要重置数据文件,可以使用 -e 选项:

        重置后,数据文件为空。使用 -l 选项,IP 字典为空。后续可以重新使用 -i 选项装载数据。

        代码还不完善,初始版本将就可用,里面定义了一些多余的函数和类没有删除。

        代码如下:

          #!env python3


          import pickle
          import os
          import argparse
          import re
          import sys
          from pprint import pprint


          # IP 字典
          ip_dict = dict()


          # IP 备注历史
          ip_history = set()




          class MyIP:
          def __init__(self, value):
          self.value = value


          def __hash__(self):
          # 返回对象的哈希值
          return hash(self.value[0])


          def __eq__(self, other):
          # 比较两个对象是否相等
          if isinstance(other, MyIP):
          return self.value == other.value
          return False


          def __str__(self):
          return f"{self.value[0]} {' '.join(self.value[0:])}"


          def __repr__(self):
          return f"{self.value[0]}\t{self.value[0:]}"




          def foreach_set(myset):
          # 创建迭代器
          iterator = iter(myset)


          # 使用while循环和next函数遍历集合中的元素
          while True:
          try:
          element = next(iterator)
          pprint(element)
          except StopIteration:
          break




          def foreach_dict(mydict):
          for key, value in mydict.items():
          print(f"{key}: {value}")




          def load_data(file_path):
          global ip_dict, ip_history
          if os.path.exists(file_path):
          with open(file_path, 'rb') as file:
          loaded_data = pickle.load(file)
          ip_dict, ip_history = loaded_data[0], loaded_data[1]




          def save_data(file_path):
          global ip_dict, ip_history
          data = [ip_dict, ip_history]
          with open(file_path, 'wb') as file:
          pickle.dump(data, file)




          def insert_ip_note(file_path):
          global ip_dict, ip_history


          # 检查文件是否存在
          if not os.path.exists(file_path):
          print(f"The file at {file_path} does not exist.")




          with open(file_path, 'r', encoding='utf-8') as f:
          line = f.readline()
          while line:
          #pprint(line)


          ip_line = line.split()


          # 过滤空行及无备注的行
          if len(ip_line) <= 1:
          line = f.readline()
          continue
          ip_tmp = tuple(ip_line)
          k, v = ip_tmp[0], ip_tmp[1:]


          if k in ip_dict:
          if v != ip_dict[k]:
          #pprint(ip_dict[k])
          old_ip = ip_dict.pop(k)
          ip_dict.update({k:v})
          ip_history.add((k,)+old_ip)
          else:
          ip_dict.update({k:v})


          # pprint(line.split())
          line = f.readline()




          # 删历史IP
          def clean_ip_history():
              history_ip = '192.168.1.1'
          list_to_remove = []
          for item in ip_history:
          if item[0] == history_ip:
          list_to_remove.append(item)


          for item in list_to_remove:
          ip_history.discard(item)




          def regex(pattern, line):
          match = pattern.findall(line)
          if not match:
          return
          return(match)




          # 返回IP位置
          def regex_pos(pattern, line):
          match = pattern.search(line)
          if not match:
          return len(line)
          return(match.end())




          def search_ip_dict(s):
          global ip_dict
          if s in ip_dict:
          return s + ' [' + ' '.join(ip_dict[s]) +']'
          else:
          return s




          # 替换字符串中的IP为带备注的版本
          def replace_ip():
          pattern_ip = re.compile(r'((?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))')
          f = sys.stdin
          line = f.readline()
          while line:
          ret = regex(pattern_ip, line)
          if ret:
          line_list = []
          ip_end = len(line)
          for i in ret:
          ip1 = search_ip_dict(i)


          ip_end = regex_pos(pattern_ip, line)
          line_changed = line[0:ip_end]


          line_list.append(line_changed.replace(i, ip1))
          line = line[ip_end:]
          line_list.append(line)
          print(''.join(line_list), end='', flush=True)
          line = f.readline()
          else:
          print(line, end='', flush=True)
          line = f.readline()



          # 显示 IP 字典内容
          def show():


          #pprint(ip_dict)
          print('IP dict:')
          print('-'*30)
          foreach_dict(ip_dict)
          print()
          print('IP history set:')
          print('='*30)
          foreach_set(ip_history)
          if not ip_history:
          print('(empty)')




          def erase(data_file):
          global ip_dict, ip_history
          while True:
          user_input = input("请确认操作 (yes/no): ").lower() # 将输入转换为小写,以便不区分大小写
          if user_input == 'yes':
          ip_dict = dict()
          ip_history = set()
          save_data(data_file)
          break
          elif user_input == 'no':
          print("取消操作。")
          break
          else:
          print("无效的输入,请输入 'yes' 或 'no'。")






          if __name__ == '__main__':



          # 创建 ArgumentParser 对象
          parser = argparse.ArgumentParser(description='IP 备注')


          # 添加命令行参数
          parser.add_argument('--ip_file', '-i', type=str, default='', help='IP 文件路径,文件内容格式:IP 备注')
          parser.add_argument('--data_file', '-d', type=str, default='ip.pkl', help='数据文件路径,默认数据文件:ip.pkl')
          parser.add_argument('--interactive', '-a', action='store_true', help='读取管道中的内容,并进行IP替换')
          parser.add_argument('--list', '-l', action='store_true', help='显示IP字典中的内容')
          parser.add_argument('--erase', '-e', action='store_true', help='清空数据文件内容')




          # 解析命令行参数
          args = parser.parse_args()


          ip_file = args.ip_file
          data_file = args.data_file
          interactive = args.interactive
          show_ip = args.list
          erase_data = args.erase


          load_data(data_file)


          if os.path.exists(ip_file):
          insert_ip_note(ip_file)

          if interactive:
          replace_ip()


          if show_ip:
          show()


          if erase_data:
          erase(data_file)


          # 如果有文件输入,则存盘
          if ip_file:
          save_data(data_file)


          全文完。

          如果转发本文,文末务必注明:“转自微信公众号:生有可恋”。

                 

          文章转载自生有可恋,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论