暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

代理池的搭建

老柴杂货铺 2025-03-15
128

使用代理IP的原因与作用

首先代理ip可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip可以完美解决这个问题。高匿名代理ip可以隐藏用户的真实ip地址,保护用户的个人数据和信息安全,提高用户上网的安全性。

其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip一定程度上可以解决这个问题。通过代理ip访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip检测比较严格的网址也需要进行替换。

不同的代理类型

开放代理:是由全网扫描而来,就是别人搭建了代理服务器被扫到了拿来用,采用分布在全球各地的云服务器使用扫描软件,借助于nmap工具,7*24*365不间断全网扫描、验证。开放代理容易随时失效存在稳定性差、速度不稳定、安全性、可用率低等问题,目前市面上很多都是这种开放代理,价格低廉,我们不推荐这种代理,建议考虑私密代理。

私密代理:非扫描而来的,而是ip提供商租用全国各地实体服务器或拨号服务器,采用自研的服务端代理程序和高可用的调度系统,并支持Http/Https/Scoks5等协议,具有高匿、高速、稳定的特点。

独享代理:是私密代理的一种,客户需要长期稳定的长效IP使用。

免费代理网址

可以从提供的免费代理网址中采集ip地址,不保证可用:

    西刺代理: http://www.xicidaili.com
    快代理: https://www.kuaidaili.com
    云代理: http://www.ip3366.net
    无忧代理: http://www.data5u.com/
    360代理: http://www.swei360.com
    66ip代理: http://www.66ip.cn
    ip海代理: http://www.iphai.com
    大象代理: http://www.daxiangdaili.com/
    米扑代理: https://proxy.mimvp.com/freesecret.php
    站大爷: http://ip.zdaye.com/
    讯代理: http://www.xdaili.cn/
    蚂蚁代理: http://www.mayidaili.com/free
    89免费代理: http://www.89ip.cn/
    全网代理: http://www.goubanjia.com/buy/high.html
    开心代理: http://www.kxdaili.com/dailiip.html
    猿人云: https://www.apeyun.com/

    公开的免费ip多数是不可用的,不建议在爬虫项目中使用免费代理。

    免费代理采集脚本

    以下为免费代理采集脚本示例:

      import re
      import json
      import requests




      class FreeAgent:
          def __init__(self):
              self.headers = {
                  'User-Agent''Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
              }


          def get_ip(self, page):
              url = f'https://www.kuaidaili.com/free/inha/{page}/'
              response = requests.get(url, headers=self.headers).text
              data = re.findall(r'const fpsList = (.*?);', response)[0]
              ip_pattern = r'"ip": "(\d{1,3}(?:\.\d{1,3}){3})"'
              port_pattern = r'"port": "(\d{1,5})"'


              ips = re.findall(ip_pattern, data)
              ports = re.findall(port_pattern, data)
              for temp in zip(ips, ports):
                  ip_dict = dict()
                  ip_dict['ip'] = temp[0]
                  ip_dict['port'] = temp[1]
                  yield ip_dict


          def test_ip(self, max_page_num):
              for page_num in range(1, max_page_num + 1):
                  for result in self.get_ip(page_num):
                      proxies = {
                          'http''http://' + result['ip'] + ':' + result['port'],
                          'https''http://' + result['ip'] + ':' + result['port']
                      }


                      print('ip代理:', proxies)
                      try:
                          response = requests.get('http://httpbin.org/ip',
                                                  headers=self.headers, proxies=proxies, timeout=3)
                          if response.status_code == 200:
                              print(response.text)
                              with open('success_ip.txt''a', encoding='utf-8'as f:
                                  f.write(json.dumps(proxies, ensure_ascii=False, indent=4) + '\n')
                      except Exception as e:
                          print('请求超时:', e)




      free_agent = FreeAgent()
      free_agent.test_ip(10)


      付费代理的设置与搭建

      付费代理平台

      ● https://proxy.ip3366.net/

      ● https://www.kuaidaili.com/

      选择自己喜欢的代理平台自行购买:不同的代理平台都有对应的api文档。在代理平台中需要自己注册账号并实名认证。

      快代理官方使用文档:https://www.kuaidaili.com/doc/dev/sdk_http/#requests

      爬虫案例 - 当当网商品数据采集

        import os
        import time
        import pymongo
        import requests
        import threading
        from lxml import etree
        from loguru import logger
        from retrying import retry
        from queue import Queue, Empty
        from fake_useragent import UserAgent




        class DangDangShop:
            mongo_client = pymongo.MongoClient()
            collection = mongo_client['py_spider']['dangdang_shop']


            def __init__(self):
                self.url = 'https://search.dangdang.com/?key=python&act=input'
                self.headers = {
                    'User-Agent''Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
                }
                self.ip_url = 'https://dps.kdlapi.com/api/getdps/?secret_id=orsnof8y6rxqtgnf2v33&signature=zugc8en4ct195aoish9iwma4dnvgeya5&num=1&pt=1&format=text&sep=1'


                self.ip_queue = Queue()  # IP队列
                self.url_queue = Queue()  # URL队列
                self.response_queue = Queue()  # 响应队列
                self.detail_queue = Queue()  # 商品信息队列


            def fetch_proxy(self, stop_event, min_count=5):
                while not stop_event.is_set():
                    if self.ip_queue.qsize() < min_count:
                        try:
                            proxy_ip = requests.get(self.ip_url).text.strip()
                            if proxy_ip:
                                self.ip_queue.put(proxy_ip)
                                logger.info(f"获取代理IP: {proxy_ip}")
                        except Exception as e:
                            logger.error(f"获取代理IP失败: {e}")
                    else:
                        time.sleep(5)  # 每5秒检查一次


            def get_page_num(self):
                try:
                    response = requests.get(self.url, headers=self.headers)
                    tree = etree.HTML(response.text)
                    max_page = tree.xpath("//ul[@name='Fy']/li[last()-2]/a/text()")
                    if max_page:
                        max_page = int(max_page[0])
                        for page in range(1, max_page + 1):
                            url = f'https://search.dangdang.com/?key=python&act=input&page_index={page}'
                            self.url_queue.put(url)
                            logger.info(f"Added URL: {url}")
                    else:
                        self.url_queue.put(self.url)
                        logger.info(f"Added URL: {self.url}")
                except Exception as e:
                    logger.error(f"获取页面数量失败: {e}")


            @retry(stop_max_attempt_number=5)
            def get_goods_list(self, stop_event):
                logger.info(f"线程 {threading.current_thread().name} 开始运行")
                while not stop_event.is_set():
                    try:
                        proxy_ip = self.ip_queue.get(timeout=1)
                        url = self.url_queue.get(timeout=1)
                        username = "自行填写"
                        password = "自行填写"
                        proxies = {
                            "http"f"http://{username}:{password}@{proxy_ip}/",
                            "https"f"http://{username}:{password}@{proxy_ip}/"
                        }
                        self.headers['User-Agent'] = UserAgent().random
                        try:
                            response = requests.get(url, headers=self.headers, proxies=proxies, timeout=5)
                            logger.info(f"请求URL: {url} 使用代理: {proxy_ip} 状态码: {response.status_code}")
                            if response.status_code == 200:
                                self.response_queue.put(response)
                                self.ip_queue.put(proxy_ip)  # 将可用的代理IP重新放回ip队列
                            else:
                                logger.warning(f'状态码异常: {response.status_code}')
                        except Exception as e:
                            logger.exception("请求失败")
                        finally:
                            self.url_queue.task_done()
                            self.ip_queue.task_done()
                    except Empty:
                        continue
                logger.info(f"线程 {threading.current_thread().name} 结束运行")


            def parse_info(self, stop_event):
                logger.info(f"线程 {threading.current_thread().name} 开始运行")
                while not stop_event.is_set():
                    try:
                        response = self.response_queue.get(timeout=10)
                        logger.info(f"开始解析响应: {response.url}")
                        tree = etree.HTML(response.text)
                        li_list = tree.xpath("//ul[@class='bigimg']/li")
                        for li in li_list:
                            item = {}
                            goods_name = li.xpath("./a/@title")
                            goods_price = li.xpath("p[@class='price']/span[1]/text()")
                            item['goods_name'] = goods_name[0if goods_name else '空'
                            item['goods_price'] = goods_price[0if goods_price else '空'
                            self.detail_queue.put(item)
                        self.response_queue.task_done()
                        logger.info(f"完成解析响应: {response.url}")
                    except Empty:
                        continue
                    except Exception as e:
                        logger.exception("解析商品信息失败")
                logger.info(f"线程 {threading.current_thread().name} 结束运行")


            def save_info(self, stop_event):
                logger.info(f"线程 {threading.current_thread().name} 开始运行")
                while not stop_event.is_set():
                    try:
                        detail = self.detail_queue.get(timeout=10)
                        logger.info(f"开始保存数据: {detail}")
                        self.collection.insert_one(detail)
                        self.detail_queue.task_done()
                        logger.info(f"完成保存数据: {detail}")
                    except Empty:
                        continue
                    except Exception as e:
                        logger.exception("存储数据失败")
                logger.info(f"线程 {threading.current_thread().name} 结束运行")


            def main(self):
                # Configure logger
                if not os.path.exists('logs'):
                    os.makedirs('logs')
                logger.add(
                    sink="logs/app.log",
                    level="DEBUG",
                    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {thread.name} | {message}",
                    rotation="10 MB",
                    compression="zip"
                )
                logger.add(
                    sink=lambda msg: print(msg, end=''),
                    level="DEBUG",
                    format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {thread.name} | {message}"
                )


                stop_event = threading.Event()
                self.get_page_num()


                # 启动代理IP获取线程
                proxy_thread = threading.Thread(target=self.fetch_proxy, args=(stop_event,))
                proxy_thread.start()


                thread_list = []
                for _ in range(5):
                    thread_obj = threading.Thread(target=self.get_goods_list, args=(stop_event,))
                    thread_list.append(thread_obj)


                for _ in range(5):
                    thread_obj = threading.Thread(target=self.parse_info, args=(stop_event,))
                    thread_list.append(thread_obj)


                save_thread = threading.Thread(target=self.save_info, args=(stop_event,))
                thread_list.append(save_thread)


                for item in thread_list:
                    item.start()


                # 等待所有队列任务完成
                self.url_queue.join()
                self.response_queue.join()
                self.detail_queue.join()


                # 设置停止事件
                stop_event.set()


                # 等待所有线程完成
                for item in thread_list:
                    item.join()


                # 结束代理IP获取线程
                proxy_thread.join()


                # 关闭MongoDB连接
                self.mongo_client.close()




        if __name__ == '__main__':
            shop = DangDangShop()
            shop.main()


        使用线程池的方式采集当当网商品信息

        当前案例未使用代理池,如需使用则在初始化函数中声明代理列表并迭代提取使用

          import pymongo
          import requests
          from lxml import etree
          from fake_useragent import UserAgent
          from concurrent.futures import ThreadPoolExecutor




          class DangDangShop:
              def __init__(self):
                  self.mongo_client = pymongo.MongoClient()
                  self.collection = self.mongo_client['py_spider']['dangdang_shop']
                  self.url = 'https://search.dangdang.com/?key=python&act=input'
                  self.headers = {
                      'User-Agent'''
                  }
                  self.urls = []  # 存储待抓取的urls


              def get_goods_index(self):
                  self.headers['User-Agent'] = UserAgent().random
                  response = requests.get(self.url, headers=self.headers)
                  return response


              def get_page_num(self, response):
                  tree = etree.HTML(response.text)
                  max_page = tree.xpath("//ul[@name='Fy']/li[last()-2]/a/text()")[0]
                  # 将url添加到列表中而不是队列
                  if max_page:
                      for page in range(1int(max_page) + 1):
                          url = f'https://search.dangdang.com/?key=python&act=input&page_index={page}'
                          self.urls.append(url)
                  else:
                      self.urls.append(self.url)


              def get_goods_list(self, url):
                  self.headers['User-Agent'] = UserAgent().random
                  response = requests.get(url, headers=self.headers)
                  self.parse_info(response)


              def save_info(self, detail):
                  self.collection.insert_one(detail)


              def parse_info(self, response):
                  tree = etree.HTML(response.text)
                  li_list = tree.xpath("//ul[@class='bigimg']/li")
                  for li in li_list:
                      item = dict()
                      goods_name = li.xpath("./a/@title")
                      goods_price = li.xpath("p[@class='price']/span[1]/text()")
                      item['goods_name'] = goods_name[0]
                      item['goods_price'] = goods_price[0if goods_price else '空'
                      print(item)
                      self.save_info(item)


              def main(self):
                  response = self.get_goods_index()
                  self.get_page_num(response)


                  # 使用线程池处理页面抓取
                  with ThreadPoolExecutor(max_workers=10as executor:
                      # 提交所有url到线程池进行处理
                      for url in self.urls:
                          executor.submit(self.get_goods_list, url)




          if __name__ == '__main__':
              shop = DangDangShop()
              shop.main()


          注意点:如果期望将以上代码中的MongoDB存储替换为MySQL存储则需要使用数据库连接池。

          文章转载自老柴杂货铺,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

          评论