前言
3月18日,笔者写了一篇文,前前后后修改发布折腾近十次,终于通过了系统敏感词检测,得以发布。审查制度不透明的情况下,永远都不知道哪里越界了,并且最终解释权归有关部门。一怒之下,笔者决定手撕敏感词检测。
首先笔者想到找一些敏感词库、违禁词表之类的文件,网络上一搜,有不少。但多数都不够全面,也不够新。一个敏感词检测工具的精确率,取决于敏感词库的丰富程度。对于媒体单位,经过长年累月的积累和碰撞,它们的词库是丰富且与时俱进的,笔者想搞这么一份词库,也能搞到。
今次为了演示,暂且用一份网上烂大街的敏感词表,让我们重点关注下应该实现过程。
数据准备
我们遇到了一个悖论,假如笔者拿真的敏感词来演示,恐怕会触发平台审计规则,那换一组正常的词假设它们是敏感词,读者恐怕又觉得没什么卵用。总之,这就像《1984》里提到的那样,新话代替旧话,假如新话了去掉了旧话里所谓的敏感词,那不就不存在敏感词了吗?为了演示,咱还是随便弄一组词吧。准备了这么一个文件,文件里敏感词一行一个。
0x01. 敏感词集合遍历过滤
为了简化问题,假设这个敏感词典不大,只有几千条,可以将这个词典加载到一个集合里。如果敏感词典很庞大,可以利用前面讲的布隆过滤器结构来存放。总之,有这么一个敏感词集合:
sensetive_keywords = set(['金日成', '金正日', '金正恩', '金三胖'])
def filter(message, repl='*', callout=None):global sensetive_keywordsif callout:for kw in sensetive_keywords:message = message.replace(kw, f'<i>{kw}</i>')else:for kw in sensetive_keywords:message = message.replace(kw, repl*len(kw))return message
通过这么一个过滤函数处理,我们测试了一句话,得到的效果是:
替换模式下,敏感词被替换成特殊符号
朝鲜国大统帅金正恩外号金三胖。
=> 朝鲜国大统帅***外号***。
标注模式下,匹配到的敏感词被标注出来
朝鲜国大统帅金正恩外号金三胖。
=> 朝鲜国大统帅<i>金正恩</i>外号<i>金三胖</i>。
此种处理方式简单粗暴,直接循环遍历全部敏感词,效率会随着敏感词库的增大而降低。
0x02. 顺序映射表过滤
基于敏感词典的顺序映射表,过滤文本中的敏感词,能显著减少敏感词过滤次数,代码如下:
from collections import defaultdictimport reclass BSFilter:def __init__(self):self.keywords = []self.kwsets = set()# 排序字典, 元素值为集合self.bsdict = defaultdict(set)# 英文单词匹配self.pat_en = re.compile(r'^[0-9a-zA-Z]+$')def add(self, keyword):keyword = keyword.lower()if keyword not in self.kwsets:self.keywords.append(keyword)self.kwsets.add(keyword)index = len(self.keywords) - 1for word in keyword.split():if self.pat_en.search(word):self.bsdict[word].add(index)else:for char in word:self.bsdict[char].add(index)def parse(self, path):with open(path, 'r') as f:for keyword in f.readlines():self.add(keyword.strip())def filter(self, message, repl='*'):message = message.lower()for word in message.split():if self.pat_en.search(word):for index in self.bsdict[word]:message = message.replace(self.keywords[index], repl)else:for char in word:for index in self.bsdict[char]:message = message.replace(self.keywords[index], repl)return messageif __name__ == '__main__':filter = BSFilter()filter.parse('keywords.txt')print(filter.filter('朝鲜国大统帅金正恩外号金三胖,是金家第三代', '*'))
0x03. DFA算法过滤
DFA(Deterministic Finite automation)确定性的有穷状态自动机: 从一个状态输入一个字符集合能到达下一个确定的状态。
#!/usr/local/bin/python3class DFAFilter():def __init__(self):self.keyword_chains = {}# 定界符self.delimit = '\x00'def add(self, keyword):keyword = keyword.lower()chars = keyword.strip()if not chars:returnlevel = self.keyword_chainsfor i in range(len(chars)):if chars[i] in level:level = level[chars[i]]else:if not isinstance(level, dict):breakfor j in range(i, len(chars)):level[chars[j]] = {}last_level, last_char = level, chars[j]level = level[chars[j]]last_level[last_char] = {self.delimit: 0}breakif i == len(chars) - 1:level[self.delimit] = 0def parse(self, path):with open(path) as f:for keyword in f.readlines():self.add(keyword.strip())def filter(self, message, repl='*'):message = message.lower()ret = []start = 0while start < len(message):level = self.keyword_chainsstep_ins = 0for char in message[start:]:if char in level:step_ins += 1if self.delimit not in level[char]:level = level[char]else:ret.append(repl * step_ins)start += step_ins - 1breakelse:ret.append(message[start])breakelse:ret.append(message[start])start += 1return ''.join(ret)if __name__ == '__main__':filter = DFAFilter()filter.parse('keywords.txt')# print(filter.keyword_chains)print(filter.filter('朝鲜国大统帅金正恩外号金三胖,是金家第三代', '*'))
根据已有的敏感词表,得到用字典表示的状态机:
{'金': {'正': {'日': {'\x00': 0}, '恩': {'\x00': 0}}, '三': {'胖': {'\x00': 0}}, '日': {'成': {'\x00': 0}}}}
总结
本文介绍了三种敏感词过滤方法,其中敏感词遍历过滤简单但随着敏感词表的增大效率会降低,顺序映射表由于构造了单字符顺序映射,则相对能减少过滤次数。而DFA算法,基于有穷状态机,能保持恒定的过滤效率。实际工程环境,推荐使用第三种方法。这里只是实现了最简单的敏感词过滤,而实际工程应用中,还要考虑到各种噪音,比如同音字、异形字、拆分字、特殊符号隔断等,这又是另一个话题了。另外,3月18那篇文最终被用户投诉下架。




