一款基于中文分词的脱敏工具

一、工具需求

起因是某产品有几千条数据需要脱敏,主要包括公司名称、政府单位、学校医院、内部测试人员、个体商铺

不同类别有不同的脱敏方法,但核心是不能被一眼看出是哪个主体。

公司名称

主要脱敏内容为具有识别特征的品牌字段。

北京爱摸鱼技术有限公司需要脱敏为北京xxx技术有限公司,但也不能脱敏过度搞成北京xxxxxxx公司

政府机关

这一类需要脱敏的内容为地区特征,但也需要遵循脱敏最小行政单位的原则。

西安市雁塔区人民政府需要脱敏为西安市xx区人民政府,而不能是xx市雁塔区人民政府,这样的脱敏是无效的。

学校医院

学校医院一般来说经常会互相混搭,很多学校都有对应的附属医院,因此将这两个归为一类。脱敏的内容为视情况,主要为地区名称,部分为特征名称。

地区特征的如北京科技大学脱敏为xx科技大学

特征名称的如长安大学脱敏为xx大学

测试人员

人名只需要脱敏第二个字就可以了,如果遇到四个字的姓名脱敏前两个字。

个体商铺

这一类是最为复杂的,商铺的名称各异会对规则产生很多影响,没有什么固定的脱敏方式,需要根据具体名称进行判断。

二、中文分词

最开始我想使用正则表达式进行匹配,但是在匹配公司名称的时候,发现公司的后缀非常多,科技有限公司电子商务有限公司技术有限公司传媒公司…为了在脱敏的时候尽可能让文字多一些,如果匹配公司有限公司会导致过度脱敏,但如果为每一种公司都写一个正则,会非常难以维护。

使用中文分词公司名可以切分为:

1
2
"北京爱摸鱼技术股份有限公司" = ["北京", "爱摸鱼", "技术", "股份有限公司"]
"成都锦江区小蜜蜂传媒有限公司" = ["成都锦江区", "小蜜蜂", "传媒", "有限公司"]

这是一种理想的形式,切分为地区品牌行业公司后缀。很幸运,感谢 companynameparser 做了这些并开源,当然我们也尽量不要自己造轮子。

使用方法非常简单。

1
2
3
4
5
6
7
8
9
10
11
import companynameparser

company_strs = [
"武汉海明智业电子商务有限公司",
"泉州益念食品有限公司",
"常州途畅互联网科技有限公司合肥分公司",
"昆明享亚教育信息咨询有限公司",
]
for name in company_strs:
r = companynameparser.parse(name)
print(r)

resault

1
2
3
4
{'place': '武汉', 'brand': '海明智业', 'trade': '电子商务', 'suffix': '有限公司', 'symbol': ''}
{'place': '泉州', 'brand': '益念', 'trade': '食品', 'suffix': '有限公司', 'symbol': ''}
{'place': '常州,合肥', 'brand': '途畅', 'trade': '互联网科技', 'suffix': '有限公司,分公司', 'symbol': ''}
{'place': '昆明', 'brand': '享亚', 'trade': '教育信息咨询', 'suffix': '有限公司', 'symbol': ''}

虽然作者测试案例中的准确率非常不错,但是实际情况还是有意想不到的情况,分词失败和各种奇怪的名称,因此我们需要基于这个工具进行改进。

三、分类器

根据上面的需求,可以归纳出每个领域的脱敏逻辑,根据脱敏词大致分为四类,并构建一个分类器。我们需要通过分类器判断一个名称应该被怎么处理,从而执行对应的脱敏规则。根据上面的原则,将创建两个列表 KEY_FEATURE 和 KEY_PLACE 存放关键字,前者可以存放公司集团等关键字匹配公司名称,后者存放政府医院等关键字。

如果都没命中则被归为其他类,这里有两种情况,一种是属于上面的类别,但是没有被关键字命中;第二种是一些特殊名称无法归类,例如中华人民共和国国务院这种国家级单位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def industry_classification(text):
text_type = 'other'
# 短名称单独处理
if len(text) <= 4:
text_type = 'short'
return text_type
# 特征脱敏
for _key in KEY_FEATURE:
if _key in text:
text_type = 'feature'
return text_type
# 地区脱敏
for _key in KEY_PLACE:
if _key in text:
text_type = 'place'
return text_type
return text_type

四、脱敏规则

短名称脱敏

这一部分最为简单,短名称两个字和三个字的脱敏掉第二个字,四个字脱敏前两个字即可,对姓名和短公司名、学校名均适用,该部分不需要使用分词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def short_replace(text):
if len(text) == 3:
# 如果长度为3,将中间字符替换为*
new_text = text[0] + MASK_SYMBOL + text[2]
elif len(text) == 4:
# 如果长度为4,将前面字符替换为*
new_text = MASK_SYMBOL * 2 + text[2] + text[3]
elif len(text) == 2:
# 如果长度为2,将最后一个字符替换为*
new_text = text[0] + MASK_SYMBOL
else:
# 如果长度不是2或3,不执行替换操作
new_text = text

return new_text

特征脱敏

首先提取 brand 品牌内容,如果存在则将品牌名称设置为敏感词。

1
2
split_words = companynameparser.parse(include_feature_name)
sensitive_words = split_words.get('brand')

如果不存在或者提取失败,就将行业名称进行切分,将第二个后的内容设置为敏感词,有些名称确实无法被切分,经测试这是一个不错的方法。

1
2
if len(sensitive_words) <= 1:
sensitive_words = split_words.get('trade')[:2]

其中有一种公司名称比较特别,如《开心日报》杂志社有限公司,我们将书名号内的内容标记为敏感词即可。

上面已经获取了对应的特征敏感词 brand 字段,但是不是每一次都是正确的,这里通过 feature_sensitive_detail 函数进行校验。

通过脱敏词长度进行判断,如果非常短只有一个字,说明切分失败,使用 split_short_words_plugin 处理过短的敏感词,如果太长说明可能没切分成功直接返回了所有的内容,就使用 split_long_words_plugin 插件处理过长敏感词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def feature_sensitive_detail(include_feature_name, sensitive_word):
sensitive_words = []
sensitive_word = sensitive_word.split(',')[0]
# 脱敏词长度为0,说明分词失败
if len(sensitive_word) <= 1:
sensitive_word = plugins.split_short_words_plugin(include_feature_name)
sensitive_words.append(sensitive_word)
# 脱敏词过长,中间某部分词过长,需要切分
elif len(sensitive_word) > 5:
sensitive_word = plugins.split_long_words_plugin(sensitive_word)
sensitive_words.append(sensitive_word)
else:
sensitive_words.append(sensitive_word)
new_include_feature_name = clear_words(sensitive_words, include_feature_name)

return new_include_feature_name

地区名称

地区脱敏就需要提取包含地区的内容,省、市、区…内容。首先依靠分词获取 place 的内容,如果长度大于0,说明提取内容存在,通过 get_place_plugin 插件进行地区提取,如果提取失败就使用 get_city_name 插件查询省市列表进行提取。最后返回脱敏后的名称。

1
2
3
4
5
6
7
8
9
10
def clear_place_sensitive(include_place_name):
split_words = companynameparser.parse(include_place_name)
place_name = split_words.get('place')
if len(place_name) > 0:
sensitive_word = plugins.get_place_plugin(place_name)
# 有些存在地区名称,但是无法被分词提取
else:
sensitive_word = plugins.get_city_name(include_place_name)
new_include_place_name = clear_words(sensitive_word, include_place_name)
return new_include_place_name

其他脱敏

在这一部分的不一定不属于上面某个类,只是关键字没有命中(关键字不是万能的),因此先提取品牌特征,如果没有就提取地区特征。

1
2
3
4
5
6
7
8
9
10
def clear_other_sensitive(text):
split_words = companynameparser.parse(text)
brand_name = split_words.get('brand')
if len(brand_name) > 1:
sensitive_word = brand_name.split(',')[0]
# 未提取出 brand 过滤地理位置
else:
sensitive_word = plugins.get_place_plugin(text)
new_name = clear_words(sensitive_word, text)
return new_name

五、插件

脱敏插件

放入敏感词列表、原始名称和脱敏符号,会根据长度脱敏关键字,返回脱敏后文本。

1
2
3
4
5
6
7
8
def clear_words(sen_word_list, origin_name, mask):
# 防止出现 ['南', '南宁'] ,先屏蔽子集词导致的错误
sen_word_list.sort(key=len, reverse=True)
new_name = origin_name
for sen_word in sen_word_list:
sen_words_num = len(sen_word)
new_name = new_name.replace(sen_word, mask * sen_words_num)
return new_name

书名号脱敏

脱敏书名号内的文本。

1
2
3
4
5
def clear_book_mark_plugin(book_mark_name):
# 使用正则表达式替换《和》之间的字符为*
# 脱敏杂志社、报社这种
book_mark_name = re.sub(r'《(.*?)》', lambda x: '《' + '*' * len(x.group(1)) + '》', book_mark_name)
return book_mark_name

获取地区名称(查询)

这里使用脱机数据,按照行政区大小逐步获取,如果匹配则直接返回。

其中 areas_file 中,很多乡镇、县的名称只有一个字,会导致误报,因此两个字以上的名称才会进行匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def get_city_name(text):
address = []
# 优先过滤更小的单位,市级以下暂时没遇到无法识别的目前不启用
provinces_file = 'plugins/files/provinces.json'
cities_file = 'plugins/files/cities.json'
areas_file = 'plugins/files/areas.json'
with open(areas_file, 'r') as json_file:
areas_data = json.load(json_file)
# 提取"name"字段 县区名
for area in areas_data:
area_name = area.get('name', None).replace('县', '').replace('区', '').replace('市', '')
if area_name in text and len(area_name) > 1:
address.append(area_name)
break
with open(cities_file, 'r') as json_file:
city_data = json.load(json_file)
# 提取"name"字段 城市名
for city in city_data:
city_name = city.get('name', None).replace('市', '')
if city_name in text:
address.append(city_name)
break
with open(provinces_file, 'r') as json_file:
provinces_data = json.load(json_file)
# 提取"name"字段 省份名称
for province in provinces_data:
province_name = province.get('name', None).replace('省', '').replace('市', '')
if province_name in text:
address.append(province_name)
break
# 如果没匹配到,为了不影响后续处理返回空
return address

获取地区名称(正则)

较查询更优先使用,也是遵照脱敏最小行政单位的原则,使用正则对行政单位关键字进行匹配,但遇到西安xx学院,这种没有特征的只能使用查询的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def get_place_plugin(place):
place_list = []
# 有时候分词会提取重复的内容
places = list(set(place.split(',')))
for place in places:
# 匹配`xx省xx市格式
province_city_match = re.search(r'(.+省)(.+市)', place)
if province_city_match:
sensitive_word = province_city_match.group(2).replace('市', '')
place_list.append(sensitive_word)
break

# 匹配`xx市xx区xx镇格式
town_district_match = re.search(r'(.+市)(.+区)(.+镇)', place)
if town_district_match:
sensitive_word = town_district_match.group(3).replace('镇', '')
place_list.append(sensitive_word)
break

# 匹配`xx市xx区格式
city_district_match = re.search(r'(.+市)(.+区)', place)
if city_district_match:
sensitive_word = city_district_match.group(2).replace('区', '')
place_list.append(sensitive_word)
break

# 匹配`xx省、`xx市、`xx县、`xx区格式
single_match = re.search(r'(.+省|.+市|.+县|.+区|.+镇)', place)
if single_match:
sensitive_word = (single_match.group().replace('省', '')
.replace('市', '').replace('县', '')
.replace('区', '').replace('镇', ''))
place_list.append(sensitive_word)
break
# 例如西安xx学院,place提取出了西安,但是并没有市、县等这种单位
if len(place_list) == 0:
place_list = get_city_name(place)
return place_list

切分失败长名称处理

遇到切分失败的长名称,可能已经无法切分了信息过于集中,首先使用 jieba 进行再分词,如果失败则标记长度的前一半内容为敏感词。

1
2
3
4
5
6
7
8
9
10
11
def split_long_words_plugin(text):
words = list(jieba.cut(text, cut_all=False))
if len(words) <= 2:
sensitive_word = max(words, key=len)
elif len(words) == 3:
sensitive_word = words[1]
else:
half_words = int(len(words) / 2)
sensitive_word = ''.join(words[half_words:])
return sensitive_word

切分失败短名称处理

短名称的信息过于少,提取地区为敏感词。

1
2
3
4
5
6
7
8
9
def split_short_words_plugin(text):
# 如果前面分不出来,jieba 分词也不好用,所以脱敏地区
split_words = companynameparser.parse(text)
sensitive_words = split_words.get('place')
if len(sensitive_words) >= 2:
area = sensitive_words
else:
area = get_city_name(text)
return area

六、测试

抽取200个对象,其中包含人名、公司名称、政府机关、社会组织、个体商铺等多种名称,最短名称为2个字符,最长为45个字符。

总数 脱敏错误 脱敏未成功 准确率
200 16 1 91.5%

由于之前拿正则处理过一次,有440个失败的样本,属于比较难处理的,测试一次。

总数 脱敏错误 脱敏未成功 准确率
440 58 4 85.9%

大部分处理失败的都是一些具有地区特征的文本,例如杜甫草堂就在成都、大明宫在咸阳,即使去掉地区名称也能定位;还有一些带有人名的学校、景区、医院等;以及带有国外名称的内容暂时无法处理。