使用代理IP的原因与作用
首先代理ip可以保护用户信息的安全。在如今的大数据互联网时代,每个人上网总会留下一点信息,很有可能被别人利用,而使用代理ip可以完美解决这个问题。高匿名代理ip可以隐藏用户的真实ip地址,保护用户的个人数据和信息安全,提高用户上网的安全性。
其次可以提高访问速度,有时出现过访问网页时出现卡顿的问题,通过代理ip一定程度上可以解决这个问题。通过代理ip访问的一些网站等信息会存留在代理服务器的缓冲区内,假如别人访问过的信息你再访问,则会直接在缓冲区内拉取数据,进一步提高访问速度。遇到对ip检测比较严格的网址也需要进行替换。
不同的代理类型
开放代理:是由全网扫描而来,就是别人搭建了代理服务器被扫到了拿来用,采用分布在全球各地的云服务器使用扫描软件,借助于nmap工具,7*24*365不间断全网扫描、验证。开放代理容易随时失效存在稳定性差、速度不稳定、安全性、可用率低等问题,目前市面上很多都是这种开放代理,价格低廉,我们不推荐这种代理,建议考虑私密代理。
私密代理:非扫描而来的,而是ip提供商租用全国各地实体服务器或拨号服务器,采用自研的服务端代理程序和高可用的调度系统,并支持Http/Https/Scoks5等协议,具有高匿、高速、稳定的特点。
独享代理:是私密代理的一种,客户需要长期稳定的长效IP使用。
免费代理网址
可以从提供的免费代理网址中采集ip地址,不保证可用:
西刺代理: http://www.xicidaili.com快代理: https://www.kuaidaili.com云代理: http://www.ip3366.net无忧代理: http://www.data5u.com/360代理: http://www.swei360.com66ip代理: http://www.66ip.cnip海代理: http://www.iphai.com大象代理: http://www.daxiangdaili.com/米扑代理: https://proxy.mimvp.com/freesecret.php站大爷: http://ip.zdaye.com/讯代理: http://www.xdaili.cn/蚂蚁代理: http://www.mayidaili.com/free89免费代理: http://www.89ip.cn/全网代理: http://www.goubanjia.com/buy/high.html开心代理: http://www.kxdaili.com/dailiip.html猿人云: https://www.apeyun.com/
公开的免费ip多数是不可用的,不建议在爬虫项目中使用免费代理。
免费代理采集脚本
以下为免费代理采集脚本示例:
import reimport jsonimport requestsclass FreeAgent:def __init__(self):self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'}def get_ip(self, page):url = f'https://www.kuaidaili.com/free/inha/{page}/'response = requests.get(url, headers=self.headers).textdata = re.findall(r'const fpsList = (.*?);', response)[0]ip_pattern = r'"ip": "(\d{1,3}(?:\.\d{1,3}){3})"'port_pattern = r'"port": "(\d{1,5})"'ips = re.findall(ip_pattern, data)ports = re.findall(port_pattern, data)for temp in zip(ips, ports):ip_dict = dict()ip_dict['ip'] = temp[0]ip_dict['port'] = temp[1]yield ip_dictdef test_ip(self, max_page_num):for page_num in range(1, max_page_num + 1):for result in self.get_ip(page_num):proxies = {'http': 'http://' + result['ip'] + ':' + result['port'],'https': 'http://' + result['ip'] + ':' + result['port']}print('ip代理:', proxies)try:response = requests.get('http://httpbin.org/ip',headers=self.headers, proxies=proxies, timeout=3)if response.status_code == 200:print(response.text)with open('success_ip.txt', 'a', encoding='utf-8') as f:f.write(json.dumps(proxies, ensure_ascii=False, indent=4) + '\n')except Exception as e:print('请求超时:', e)free_agent = FreeAgent()free_agent.test_ip(10)
付费代理平台
● https://proxy.ip3366.net/
● https://www.kuaidaili.com/
选择自己喜欢的代理平台自行购买:不同的代理平台都有对应的api文档。在代理平台中需要自己注册账号并实名认证。
快代理官方使用文档:https://www.kuaidaili.com/doc/dev/sdk_http/#requests
爬虫案例 - 当当网商品数据采集
import osimport timeimport pymongoimport requestsimport threadingfrom lxml import etreefrom loguru import loggerfrom retrying import retryfrom queue import Queue, Emptyfrom fake_useragent import UserAgentclass DangDangShop:mongo_client = pymongo.MongoClient()collection = mongo_client['py_spider']['dangdang_shop']def __init__(self):self.url = 'https://search.dangdang.com/?key=python&act=input'self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}self.ip_url = 'https://dps.kdlapi.com/api/getdps/?secret_id=orsnof8y6rxqtgnf2v33&signature=zugc8en4ct195aoish9iwma4dnvgeya5&num=1&pt=1&format=text&sep=1'self.ip_queue = Queue() # IP队列self.url_queue = Queue() # URL队列self.response_queue = Queue() # 响应队列self.detail_queue = Queue() # 商品信息队列def fetch_proxy(self, stop_event, min_count=5):while not stop_event.is_set():if self.ip_queue.qsize() < min_count:try:proxy_ip = requests.get(self.ip_url).text.strip()if proxy_ip:self.ip_queue.put(proxy_ip)logger.info(f"获取代理IP: {proxy_ip}")except Exception as e:logger.error(f"获取代理IP失败: {e}")else:time.sleep(5) # 每5秒检查一次def get_page_num(self):try:response = requests.get(self.url, headers=self.headers)tree = etree.HTML(response.text)max_page = tree.xpath("//ul[@name='Fy']/li[last()-2]/a/text()")if max_page:max_page = int(max_page[0])for page in range(1, max_page + 1):url = f'https://search.dangdang.com/?key=python&act=input&page_index={page}'self.url_queue.put(url)logger.info(f"Added URL: {url}")else:self.url_queue.put(self.url)logger.info(f"Added URL: {self.url}")except Exception as e:logger.error(f"获取页面数量失败: {e}")@retry(stop_max_attempt_number=5)def get_goods_list(self, stop_event):logger.info(f"线程 {threading.current_thread().name} 开始运行")while not stop_event.is_set():try:proxy_ip = self.ip_queue.get(timeout=1)url = self.url_queue.get(timeout=1)username = "自行填写"password = "自行填写"proxies = {"http": f"http://{username}:{password}@{proxy_ip}/","https": f"http://{username}:{password}@{proxy_ip}/"}self.headers['User-Agent'] = UserAgent().randomtry:response = requests.get(url, headers=self.headers, proxies=proxies, timeout=5)logger.info(f"请求URL: {url} 使用代理: {proxy_ip} 状态码: {response.status_code}")if response.status_code == 200:self.response_queue.put(response)self.ip_queue.put(proxy_ip) # 将可用的代理IP重新放回ip队列else:logger.warning(f'状态码异常: {response.status_code}')except Exception as e:logger.exception("请求失败")finally:self.url_queue.task_done()self.ip_queue.task_done()except Empty:continuelogger.info(f"线程 {threading.current_thread().name} 结束运行")def parse_info(self, stop_event):logger.info(f"线程 {threading.current_thread().name} 开始运行")while not stop_event.is_set():try:response = self.response_queue.get(timeout=10)logger.info(f"开始解析响应: {response.url}")tree = etree.HTML(response.text)li_list = tree.xpath("//ul[@class='bigimg']/li")for li in li_list:item = {}goods_name = li.xpath("./a/@title")goods_price = li.xpath("p[@class='price']/span[1]/text()")item['goods_name'] = goods_name[0] if goods_name else '空'item['goods_price'] = goods_price[0] if goods_price else '空'self.detail_queue.put(item)self.response_queue.task_done()logger.info(f"完成解析响应: {response.url}")except Empty:continueexcept Exception as e:logger.exception("解析商品信息失败")logger.info(f"线程 {threading.current_thread().name} 结束运行")def save_info(self, stop_event):logger.info(f"线程 {threading.current_thread().name} 开始运行")while not stop_event.is_set():try:detail = self.detail_queue.get(timeout=10)logger.info(f"开始保存数据: {detail}")self.collection.insert_one(detail)self.detail_queue.task_done()logger.info(f"完成保存数据: {detail}")except Empty:continueexcept Exception as e:logger.exception("存储数据失败")logger.info(f"线程 {threading.current_thread().name} 结束运行")def main(self):# Configure loggerif not os.path.exists('logs'):os.makedirs('logs')logger.add(sink="logs/app.log",level="DEBUG",format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {thread.name} | {message}",rotation="10 MB",compression="zip")logger.add(sink=lambda msg: print(msg, end=''),level="DEBUG",format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {thread.name} | {message}")stop_event = threading.Event()self.get_page_num()# 启动代理IP获取线程proxy_thread = threading.Thread(target=self.fetch_proxy, args=(stop_event,))proxy_thread.start()thread_list = []for _ in range(5):thread_obj = threading.Thread(target=self.get_goods_list, args=(stop_event,))thread_list.append(thread_obj)for _ in range(5):thread_obj = threading.Thread(target=self.parse_info, args=(stop_event,))thread_list.append(thread_obj)save_thread = threading.Thread(target=self.save_info, args=(stop_event,))thread_list.append(save_thread)for item in thread_list:item.start()# 等待所有队列任务完成self.url_queue.join()self.response_queue.join()self.detail_queue.join()# 设置停止事件stop_event.set()# 等待所有线程完成for item in thread_list:item.join()# 结束代理IP获取线程proxy_thread.join()# 关闭MongoDB连接self.mongo_client.close()if __name__ == '__main__':shop = DangDangShop()shop.main()
使用线程池的方式采集当当网商品信息
当前案例未使用代理池,如需使用则在初始化函数中声明代理列表并迭代提取使用
import pymongoimport requestsfrom lxml import etreefrom fake_useragent import UserAgentfrom concurrent.futures import ThreadPoolExecutorclass DangDangShop:def __init__(self):self.mongo_client = pymongo.MongoClient()self.collection = self.mongo_client['py_spider']['dangdang_shop']self.url = 'https://search.dangdang.com/?key=python&act=input'self.headers = {'User-Agent': ''}self.urls = [] # 存储待抓取的urlsdef get_goods_index(self):self.headers['User-Agent'] = UserAgent().randomresponse = requests.get(self.url, headers=self.headers)return responsedef get_page_num(self, response):tree = etree.HTML(response.text)max_page = tree.xpath("//ul[@name='Fy']/li[last()-2]/a/text()")[0]# 将url添加到列表中而不是队列if max_page:for page in range(1, int(max_page) + 1):url = f'https://search.dangdang.com/?key=python&act=input&page_index={page}'self.urls.append(url)else:self.urls.append(self.url)def get_goods_list(self, url):self.headers['User-Agent'] = UserAgent().randomresponse = requests.get(url, headers=self.headers)self.parse_info(response)def save_info(self, detail):self.collection.insert_one(detail)def parse_info(self, response):tree = etree.HTML(response.text)li_list = tree.xpath("//ul[@class='bigimg']/li")for li in li_list:item = dict()goods_name = li.xpath("./a/@title")goods_price = li.xpath("p[@class='price']/span[1]/text()")item['goods_name'] = goods_name[0]item['goods_price'] = goods_price[0] if goods_price else '空'print(item)self.save_info(item)def main(self):response = self.get_goods_index()self.get_page_num(response)# 使用线程池处理页面抓取with ThreadPoolExecutor(max_workers=10) as executor:# 提交所有url到线程池进行处理for url in self.urls:executor.submit(self.get_goods_list, url)if __name__ == '__main__':shop = DangDangShop()shop.main()




