暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

手把手带你应用 LRU 爬取知乎热榜

鸡仔说 2021-03-04
317

鸡仔说:上一节我们通过代码演示了 LRU 的算法实现,这次我们使用 python 的内置 lru_cache 模块实现一个基于知乎热榜的通用爬虫,废话少说,黑喂狗

首先我们了解一下内置 lru 的基本用法,代码如下所示

from functools import lru_cacheimport random@lru_cache(maxsize=3)def get_cloth(event):    event_cloth_map = {        "见鸡仔"f"背心+裤衩",        "求职"f"西服+衬衣",        "跳舞"f"卫衣+休闲裤",        "工作"f"外套+T恤+休闲裤",    }    if isinstance(event, int):        if event < 4:            return "羽绒服"        elif 4 <= event < 6:            return "外套"        elif 6 <= event < 8:            return "毛衣"        elif 8 <= event < 10:            return "大衣"        elif 10 <= event < 12:            return "马夹"        elif 12 <= event < 14:            return "卫衣"        elif 14 <= event < 16:            return "卫衣"        elif 16 < event < 20:            return "长袖"        else:            return "短袖"    elif isinstance(event, str) and event in event_cloth_map:        return event_cloth_map[event]    return "裸奔"if __name__ == '__main__':    random.seed(444)    for _ in range(10):        temperature = random.randint(030)        print(f"当前温度>>>{temperature},穿{get_cloth(temperature)}")    e = "见鸡仔"    print(f"如果{e},就穿{get_cloth(e)}")    e = "求职"    print(f"如果{e},就穿{get_cloth(e)}")    print(get_cloth.cache_info())
输出如下:
当前温度>>>9,穿大衣当前温度>>>9,穿大衣当前温度>>>0,穿羽绒服当前温度>>>9,穿大衣当前温度>>>15,穿卫衣当前温度>>>8,穿大衣当前温度>>>27,穿短袖当前温度>>>9,穿大衣当前温度>>>23,穿短袖当前温度>>>16,穿短袖如果见鸡仔,就穿背心+裤衩如果求职,就穿西服+衬衣CacheInfo(hits=2, misses=10, maxsize=3, currsize=3)
最后一句 get_cloth.cache_info() 可以将 get_cloth 的 lru 缓存信息显示出来,其中 hits 表示缓存命中了几次,这里看到我们命中了两次,分别命中了第二行和第四行。misses 表示我们没有执行了几次代码逻辑。maxsize 和 currsize 分别表示缓存容量和当前缓存里元素的个数。从 cache info 中我们可以看出。当前我们的缓存使用率太低,12 次发送,只有两次走了缓存。因此可以适当调大缓存大小,自己动手试验一下吧~
有了上面的使用基础,我们来做基于知乎热榜的爬虫就比较简单啦!

基础分析

进一步分析
需求很简单啊,三个函数就可以搞定。但如果我们立马动手开干,就会在将来发现有很多坑。我们应该停下来想想,未来的这个需求的拓展性。比如现在是爬取知乎,那如果我要加入 v2ex 的热榜呢,再来一个豆瓣的热榜呢?还有输出到终端,那如果我要导出成 excel 呢,持久化到 mongo、mysql 呢?基于此,我们来完善一下需求。

再进一步分析
哎,不对啊?你有没有发现,其实我们使用到 lru 啊。别急,这就来。我们发现不同平台的热榜都是有更新时间周期的。有些是天级别更新,有些是小时级别更新。那么,当用户频繁调用获取热榜数据的时候,我们是否有必要每次都向平台发起抓取呢?当然不需要,如果用户多次发起相同的请求,我们只需要看缓存中是否有数据存在。如果没有才重新发起请求,否则就直接返回缓存中的数据,继续完善需求,并且我们给缓存设置一个过期时间,如果过期了,才重新发起一次请求。

所有需求都明晰了,ok,放码过来吧

项目文件结构
├── parser.py        # 解析器├── run.py           # 启动文件├── saver.py         # 存储器└── spider.py        # 爬虫
spider.py
class BaseSpider:    passclass ZhihuSpider(BaseSpider):    passclass V2exSpider(BaseSpider):    pass
parser.py
class BaseParser:    passclass JsonFormatParser(BaseParser):        pass
saver.py:
class BaseSaver:    passclass MongoSaver(BaseSaver):    passclass MysqlSaver(BaseSaver):    passclass ExcelSaver(BaseSaver):    pass
run.py 文件,我们先不写代码,等基础功能完善了,再写不迟。

① 完善 spider.py
from abc import (ABC, abstractmethod)import requestsclass BaseSpider(ABC):    _headers = {        'accept''application/json, text/plain, */*',        "accept-encoding""gzip, deflate, br",        "cache-control""no-cache",        'accept-language''zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',        'user-agent''Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Ch'                      'rome/84.0.4147.135 Safari/537.36',        'content-type''application/json;charset=UTF-8',        'sec-fetch-site''same-origin',        'sec-fetch-mode''cors',        'sec-fetch-dest''empty',        "pragma""no-cache",    }    @abstractmethod    def spider(self):        passclass ZhihuSpider(BaseSpider):    url = "https://www.zhihu.com/api/v3/feed/topstory/hot-lists/total?limit=50&desktop=true"    def spider(self):        resp = requests.get(self.url, headers=self._headers, timeout=30)        return resp.json()class V2exSpider(BaseSpider):    url = "https://www.v2ex.com/api/topics/hot.json"    def spider(self):        resp = requests.get(self.url, headers=self._headers, timeout=30)        return resp.json()class BilibiliSpider(BaseSpider):    url = "https://api.bilibili.com/x/web-interface/ranking/v2?rid=0&type=all"    def spider(self):        resp = requests.get(self.url, headers=self._headers, timeout=30)        return resp.json()
② 完善 parser.py
首先我们定义好最终的数据结构。创建 obj.py。定义一个 News 对象,存储我们想要的数据形式。这里多说一句,之所以采用这种形式,是因为 python 太灵活了,怎么传数据,传什么数据全部都没有限制,比如某个字段应该是 int 类型的 0 ,但我们传入了字符串 "0",python 字典形式并不会拦截,这就可能会引入无法预料的 bug,因此就非常有必要对最终结果数据进行类型限定。
obj.py
from attr import attrs, attrib, validators@attrsclass News:    _platform = attrib(type=str, validator=validators.instance_of(str))    title = attrib(type=str, validator=validators.instance_of(str))    url = attrib(type=str, validator=validators.instance_of(str))    reply_cnt = attrib(type=int, validator=validators.instance_of(int), default=-1)    summary = attrib(type=str, validator=validators.instance_of(str), default="")    publish_time = attrib(type=str, validator=validators.instance_of(str), default=""
parser.py
from loguru import loggerimport inspectfrom .obj import Newsfrom glom import glomfrom datetime import datetimeclass BaseParser:    passclass JsonFormatParser(BaseParser):    def __init__(self):        pass    def __get_all_method(self):        instance_members = inspect.getmembers(self, predicate=inspect.ismethod)        return map(lambda x: x[0], filter(lambda x: not x[0].startswith("_"), instance_members))    def parse(self, topic, result):        # topic 不可以命名成 parse        if topic in ["parse"]:            raise ValueError(f"topic 不可以命名成 parse")        if topic not in self.__get_all_method():            raise ValueError(f"没有找到可用的解析函数:topic={topic}\tall_method_name={all_method_name}")        return getattr(self, topic)(result)    def zhihu(self, result):        all_result = []        for per_result in result["data"]:            try:                n = News(                    platform="zhihu",                    title=glom(per_result, "target.title"),                    url=glom(per_result, "target.url"),                    reply_cnt=int(glom(per_result, "target.answer_count")),                    summary=(glom(per_result, "target.excerpt")),                )                all_result.append(n)            except Exception as e:                logger.error(f"解析 知乎 数据错误:")                logger.error(f"per_result={per_result}")                logger.exception(e)        return all_result    def v2ex(self, result):        all_result = []        for per_result in result:            try:                n = News(                    platform="v2ex",                    title=per_result.get("title"),                    url=per_result.get("url"),                    publish_time=str(datetime.fromtimestamp(per_result.get("created"))),                    summary=per_result.get("content"),                    reply_cnt=int(per_result.get("replies"-1))                )                all_result.append(n)            except Exception as e:                logger.error(f"解析 v2ex 数据错误:")                logger.error(f"per_result={per_result}")                logger.exception(e)        return all_result    def bilibili(self, result):        all_result = []        for per_result in glom(result, "data.list"):            try:                n = News(                    platform="bilibili",                    title=per_result.get("title"),                    url=f"https://www.bilibili.com/video/{per_result.get('bvid')}",                    summary=per_result.get("desc"),                    reply_cnt=int(glom(per_result, "stat.reply", default=-1)),                    publish_time=str(datetime.fromtimestamp(per_result.get("ctime")))                )                all_result.append(n)            except Exception as e:                logger.error(f"解析 b站 数据错误:")                logger.error(f"per_result={per_result}")                logger.exception(e)        return all_result
解析器部分通过 parse 作为统一入口,也相当于代理,真正的逻辑执行部分由真正的各维度的自定义解析器完成。解析器将各维度的信息进行打包并生成一组 News 对象,这样各维度数据就能用统一的格式使用。
③ 完善 saver.py
from loguru import loggerfrom abc import ABC, abstractmethodfrom cattr import unstructureclass BaseSaver(ABC):    # 解析打包的对象,本例子中是 News 对象    def _unpack_struct(self, data_obj):        return unstructure(data_obj)    @abstractmethod    def save(self, objs):        passclass MongoSaver(BaseSaver):    def save(self, objs):        unpack_data = list(map(self._unpack_struct, objs))        logger.info(f"save all data to mongo {unpack_data}")class MysqlSaver(BaseSaver):    def save(self, objs):        unpack_data = list(map(self._unpack_struct, objs))        logger.info(f"save all data to mysql {unpack_data}")class ExcelSaver(BaseSaver):    def save(self, objs):        unpack_data = list(map(self._unpack_struct, objs))        logger.info(f"save all data to excel {unpack_data}")
④ 实现 run.py
基础准备工作都做完后,我们就可以来实现 run.py 文件了
import syssys.path.append("..")from loguru import loggerfrom lru_lab.spider import (ZhihuSpider, V2exSpider, BilibiliSpider)from lru_lab.parser import JsonFormatParserfrom lru_lab.saver import (ExcelSaver, MongoSaver, MysqlSaver)from cattr import unstructurefrom functools import lru_cachefrom flask import (Flask, request)spider_map = {    "bilibili": BilibiliSpider().spider,    "v2ex": V2exSpider().spider,    "zhihu": ZhihuSpider().spider,}save_map = {    "excel":  ExcelSaver().save,    "mysql": MysqlSaver().save,    "mongo": MongoSaver().save,}@lru_cache(maxsize=3)def grab_hot(topic, save_method='excel'):    spider = spider_map.get(topic)    save = save_map.get(save_method)    spider_result = spider()    parse_result = JsonFormatParser().parse(topic, spider_result)    save(parse_result)    return list(map(unstructure, parse_result))app = Flask(__name__)# 让中文正常显示app.config["JSON_AS_ASCII"] = False@app.route("/", methods=["post"])def index():    res = request.json    topic = res.get("topic")    if not topic or topic not in spider_map:        logger.warning(f"topic error:{topic}")        return {"status"-1"message""未知主题"}    save_method = res.get("save"or "excel"    result = grab_hot(topic, save_method)    logger.info(grab_hot.cache_info())    return {"status"1"results": result}if __name__ == '__main__':    app.run(host="0.0.0.0", debug=True)
启动程序 run.py 定义 grab_hot 函数,接口 topic 和 save_method 分别指定主题和存储方式。spider_map 和 save_map 分别通过主题映射爬虫和通过存储参数映射存储方式。你注意到没?grab_hot 被 lru_cache 装饰。因此他如果接受了同一个参数,会返回缓存结果,而不是再重新走一遍爬虫和解析,我们来测试一下,先启动 run.py。
>>>python run.py  * Serving Flask app "run" (lazy loading) * Environment: production   WARNING: This is a development server. Do not use it in a production deployment.   Use a production WSGI server instead. * Debug mode: on * Running on http://0.0.0.0:5000/ (Press CTRL+C to quit) * Restarting with stat * Debugger is active! * Debugger PIN: 317-909-129
然后,我们通过命令行工具向接口发送一条请求,看下日志
curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000
flask 的日志显示
* Debugger is active!* Debugger PIN: 317-909-1292021-03-01 09:09:24.212 | INFO     | lru_lab.saver:save:39 - save all data to excel [{'_platform''zhihu''title''如何看待山东拉面哥十几...2021-03-01 09:09:24.258 | INFO     | __main__:index:67 - CacheInfo(hits=0, misses=1, maxsize=3, currsize=1)127.0.0.1 - - [01/Mar/2021 09:09:24] "POST / HTTP/1.1" 200 -
我们再执行多次相同的请求命令,看看 cache_info
2021-03-01 09:09:24.258 | INFO     | __main__:index:67 - CacheInfo(hits=0, misses=1, maxsize=3, currsize=1)127.0.0.1 - - [01/Mar/2021 09:09:24"POST / HTTP/1.1" 200 -2021-03-01 09:12:14.628 | INFO     | __main__:index:67 - CacheInfo(hits=1, misses=1, maxsize=3, currsize=1)127.0.0.1 - - [01/Mar/2021 09:12:14"POST / HTTP/1.1" 200 -2021-03-01 09:12:21.598 | INFO     | __main__:index:67 - CacheInfo(hits=2, misses=1, maxsize=3, currsize=1)127.0.0.1 - - [01/Mar/2021 09:12:21"POST / HTTP/1.1" 200 -
从日志中我们可以看出,当 lru_cache 里面已经存在了我们之前爬取过的数据,就直接从缓存中返回,符合我们的预期。
⑤ 带过期时间的 lru_cache
以上就是...,等等,你发现没有,这里有一个严重的问题,就是如果我们一直访问知乎爬取。那么,只要 lru_cache 队列不满,他一直都返回之前爬取过的数据,但这显然和我们刚开始的需求不相符,我们希望爬取到的内容,只有一小时的生命周期,超时自动清除,重新爬取。那么我们就要实现一个带支持传入过期时间的 lru ,很简单,只需要把 lru 算法再包装一层装饰器,传入过期时间即可,开整~
class LruCache:    def __init__(self, maxsize=3, timeout=2):        self.maxsize = maxsize        self.timeout = timeout        self.last_time = int(time.time())    def __call__(self, func):        func = lru_cache(maxsize=self.maxsize)(func)        def wrapper(*args, **kwargs):            if int(time.time()) - self.last_time > self.timeout:                logger.debug(func.cache_info())                func.cache_clear()                self.last_time = int(time.time())            return func(*args, **kwargs)        return wrapper
装饰器写完,只需要在原来的 grab_hot 就可以改成如下形式,其中 timeout 为过期时间,这里设置成 2 s 仅是调试时候使用,实际如果是上线,可以根据自己的需要设置。
@LruCache(maxsize=3, timeout=2)def grab_hot(topic, save_method='excel'):
这时为了方便观察,我们在知乎维度的爬虫中增加 debug 日志。
def spider(self):        logger.debug("开始知乎爬虫...")        resp = requests.get(self.url, headers=self._headers, timeout=30)        return resp.json()
现在我们开启程序 run.py 。然后多调用几次知乎维度的爬取
curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000curl -X POST -H 'content-type: application/json' -d '{"topic":"zhihu"}' http://127.0.0.1:5000...
日志如下:
2021-03-02 20:43:41.819 | DEBUG    | __main__:wrapper:50 - CacheInfo(hits=0, misses=0, maxsize=3, currsize=0)2021-03-02 20:43:41.820 | DEBUG    | lru_lab.spider:spider:35 - 开始知乎爬虫...2021-03-02 20:43:42.048 | INFO     | lru_lab.saver:save:39 - save all data to excel [{'_platform''zhihu''title''如何看待日本...127.0.0.1 - - [02/Mar/2021 20:43:42] "POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:42] "POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:43] "POST / HTTP/1.1" 200 -2021-03-02 20:43:44.588 | DEBUG    | __main__:wrapper:50 - CacheInfo(hits=2, misses=1, maxsize=3, currsize=1)2021-03-02 20:43:44.588 | DEBUG    | lru_lab.spider:spider:35 - 开始知乎爬虫...2021-03-02 20:43:44.764 | INFO     | lru_lab.saver:save:39 - save all data to excel [{'_platform': 'zhihu', 'title': '如何看待日本...127.0.0.1 - - [02/Mar/2021 20:43:44"POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:45"POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:45"POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:46"POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:46"POST / HTTP/1.1" 200 -2021-03-02 20:43:47.389 | DEBUG    | __main__:wrapper:50 - CacheInfo(hits=4, misses=1, maxsize=3, currsize=1)2021-03-02 20:43:47.389 | DEBUG    | lru_lab.spider:spider:35 - 开始知乎爬虫...2021-03-02 20:43:47.568 | INFO     | lru_lab.saver:save:39 - save all data to excel [{'_platform''zhihu''title''如何看待日本...127.0.0.1 - - [02/Mar/2021 20:43:47] "POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:47] "POST / HTTP/1.1" 200 -127.0.0.1 - - [02/Mar/2021 20:43:48] "POST / HTTP/1.1" 200 -2021-03-02 20:44:18.904 | DEBUG    | __main__:wrapper:50 - CacheInfo(hits=2, misses=1, maxsize=3, currsize=1)2021-03-02 20:44:18.904 | DEBUG    | lru_lab.spider:spider:35 - 开始知乎爬虫...


可以发现,这次就完全符合我们之前的预期了。每 2s 清空一次 lru_cache 缓存,若未过期就直接使用缓存中的数据,过期就重新爬取知乎热榜数据。

附上代码 github 地址: https://github.com/hacksman/learn_lab/tree/master/lru_lab









以上,就是我们应用内置 lru_cache 作为缓存,实现了一个完整的可拓展热榜数据爬虫方案。我们首先通过小的 demo,带大家了解了内置的 lru_cache 是怎么用的。然后手把手带大家一起分析了一下,通用的热榜爬虫有哪几步。站在实战的的角度,分析了当我们拿到项目需求的时候,应该如何拆解需求,如何达到高内聚,低耦合。最后我们分别实现各个模块的代码,并在原有 lru_cache 基础上通过装饰器增加过期失效的功能。希望这个小实战能对大家有所帮助。码字不易,求点赞,求转发,求再看,求三连~


参考资料:

[1] user3586940 (2019). Why Use A Doubly Linked List and HashMap for a LRU Cache Instead of a Deque?

https://stackoverflow.com/questions/54730706/why-use-a-doubly-linked-list-and-hashmap-for-a-lru-cache-instead-of-a-deque

[2] Jose Alberto Torres Agüera(2019). Python and LRU Cache.

https://medium.com/lambda-automotive/python-and-lru-cache-f812bbdcbb51

[3] Santiago Valdarrama(2020). Caching in Python Using the LRU Cache Strategy.

https://realpython.com/lru-cache-python/#adding-cache-expiration

[4] Cameron MacLeod(2019). Easy Python speed wins with functools.lru_cache.

https://www.cameronmacleod.com/blog/python-lru-cache

[5] Cake Labs(2021). LRU Cache.

https://www.interviewcake.com/concept/java/lru-cache

[6] sakshiparikh23(2020). Python – LRU Cache

https://www.geeksforgeeks.org/python-lru-cache/


以上,如果觉得内容对你有所帮助,还请点个「在看」支持,谢谢各位dai佬!




好看的人都点了在看

文章转载自鸡仔说,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论