命名实体识别实战:HMM
代码仓库:https://github.com/daiyizheng/NER-Set/tree/master/hmm
理论基础:https://blog.csdn.net/weixin_42486623/article/details/118122786
使用数据
- MSRA
B-ORG
O
B-LOC
I-PER
I-ORG
B-PER
I-LOC
- ontonotes5
I-GPE B-ORDINAL B-PERSON B-PRODUCT I-ORG I-PRODUCT I-LAW B-MONEY I-DATE B-DATE I-NORP I-TIME I-LANGUAGE B-WORK_OF_ART B-FAC I-QUANTITY B-LAW B-LOC I-FAC B-QUANTITY O I-EVENT B-LANGUAGE B-EVENT I-MONEY I-CARDINAL I-WORK_OF_ART B-NORP B-GPE I-ORDINAL B-CARDINAL B-ORG I-PERSON B-PERCENT I-LOC I-PERCENT B-TIME
- resume
I-LOC
B-CONT
I-TITLE
B-LOC
B-NAME
I-PRO
B-EDU
I-RACE
B-RACE
B-TITLE
B-ORG
O
I-EDU
I-NAME
I-ORG
B-PRO
I-CONT
- weibo
B-GPE B-ORG I-PER B-PER I-LOC B-LOC I-ORG O I-GPE
模型
class HMM(object):
def __init__(self, hidden_status, logger):
"""
:param hidden_status: int, 隐状态数
"""
self.hmm_N = hidden_status
## 状态转移概率矩阵 A[i][j] 表示从i状态转移到j状态的概率
self.hmm_A = torch.zeros(self.hmm_N, self.hmm_N)
## 初始状态概率 Pi[i] 表示初始时刻为状态i的概率
self.hmm_pi = torch.zeros(self.hmm_N)
## 日志
self.logger = logger
def _build_corpus_map(self, sentences_list):
"""
构建词典
:param sentences_list: list, 句子数组
"""
char2id = {}
for sentence in sentences_list:
for word in sentence:
if word not in char2id:
char2id[word] = len(char2id)
return char2id
def _init_emission(self):
"""
初始化观测矩阵
"""
self.hmm_M = len(self.word2id)
# 观测概率矩阵,B[i][j] 表示i状态下生成j观测的概率
self.hmm_B = torch.zeros(self.hmm_N,self.hmm_M)
def train(self, sentences_list, tags_list):
"""HMM的训练,即根据训练语料对模型参数进行估计,
因为我们有观测序列以及其对应的状态序列,所以我们
可以使用极大似然估计的方法来 估计 隐马尔可夫模型的参数
参数:
:param sentences_list: list,其中每个元素由字组成的列表,如 ['担','任','科','员']
:param tags_list: list,其中每个元素是由对应的标注组成的列表,如 ['O','O','B-TITLE', 'E-TITLE']
"""
start_time = time.time()
logger = self.logger
## 查看句子数量与标签的数量是否相等
assert len(sentences_list) == len(tags_list), "the lens of tag_lists is not eq to word_lists"
logger.info("开始构建token字典。。。")
self.word2id = self._build_corpus_map(sentences_list)
self.tag2id = self._build_corpus_map(tags_list)
self.id2tag = dict((id_, tag) for tag, id_ in self.tag2id.items())
logger.info('训练语料总数:{}'.format(len(sentences_list)))
logger.info('词典总数:{}'.format(len(self.word2id)))
logger.info('标签总数:{}'.format(len(self.tag2id)))
assert self.hmm_N == len(self.tag2id), "hidden_status is {}, but total tag is {}".format(self.hmm_N, len(self.tag2id))
self._init_emission()
logger.info('构建词典完成{:>.4f}s'.format(time.time()-start_time))
logger.info('开始构建转移概率矩阵...')
## 估计转移概率矩阵
for tags in tqdm(tags_list):
seq_len = len(tags)
for i in range(seq_len-1):
current_tagid = self.tag2id[tags[i]] ## 当前tagid
next_tagid = self.tag2id[tags[i+1]] ## 下一个tagid
self.hmm_A[current_tagid][next_tagid] += 1
# 问题:如果某元素没有出现过,该位置为0,这在后续的计算中是不允许的
# 解决方法:我们将等于0的概率加上很小的数
self.hmm_A[self.hmm_A == 0.] = 1e-10 ## 位置为0 填补小数
self.hmm_A = self.hmm_A / self.hmm_A.sum(axis=1, keepdims=True) # 计算概率
logger.info('完成转移概率矩阵构建. {:>.4f}s'.format(time.time() - start_time))
logger.info('开始构建观测概率矩阵...')
## 估计观测概率矩阵
for tags, sentence in tqdm(zip(tags_list, sentences_list)):
assert len(tags) == len(sentence), "the lens of tag_list is not eq to word_list"
for tag, word in zip(tags, sentence):
tag_id = self.tag2id[tag]# 当前标签
word_id= self.word2id[word]
self.hmm_B[tag_id][word_id] += 1
self.hmm_B[self.hmm_B == 0.] = 1e-10
self.hmm_B = self.hmm_B / self.hmm_B.sum(axis=1, keepdims=True)
logger.info('完成观测概率矩阵构建. {:>.4f}s'.format(time.time() - start_time))
logger.info('初始化初识状态概率...')
# 估计初始状态概率
for tags in tqdm(tags_list):
init_tags = self.tag2id[tags[0]]
self.hmm_pi[init_tags] += 1
self.hmm_pi[self.hmm_pi == 0.] = 1e-10
self.hmm_pi = self.hmm_pi / self.hmm_pi.sum()
logger.info('完成初始状态概率构建. {:>.4f}s'.format(time.time() - start_time))
def predict(self, sentences_list):
"""
:param sentences_list list,其中每个元素由字组成的列表,如 [['担','任','科','员']]
"""
logger = self.logger
logger.info('启动HMM解码预测...')
logger.info('预测句子总数:{}'.format(len(sentences_list)))
pred_tag_lists = []
for sentence in tqdm(sentences_list):
pre_tag_list = self.decoding(sentence) ## 维特比算法函数解码
pred_tag_lists.append(pre_tag_list)
return pred_tag_lists
def decoding(self, word_list):
"""
:param sentence list,其中每个元素由字组成的列表,如 ['担','任','科','员']
使用维特比算法对给定观测序列求状态序列, 这里就是对字组成的序列,求其对应的标注。
维特比算法实际是用动态规划解隐马尔可夫模型预测问题,即用动态规划求概率最大路径(最优路径)
这时一条路径对应着一个状态序列
"""
A = torch.log(self.hmm_A)
B = torch.log(self.hmm_B)
Pi = torch.log(self.hmm_pi)
# 初始化 维特比矩阵viterbi 它的维度为[状态数, 序列长度]
seq_len = len(word_list)
viterbi = torch.zeros(self.hmm_N, seq_len)
## 等解码的时候,我们用backpointer进行回溯,以求出最优路径
backpointer = torch.zeros(self.hmm_N, seq_len).long()
start_wordid = self.word2id.get(word_list[0], None)
Bt = B.t()
if start_wordid is None:
# 如果字不在字典里, 则假设状态的概率分布是均匀的
bt = torch.log(torch.ones(self.hmm_N)/self.hmm_N)
else:
bt = Bt[start_wordid]
viterbi[:, 0] = Pi+ bt
backpointer[:, 0] = -1
for step in range(1, seq_len):
wordid = self.word2id.get(word_list[step], None)
# 处理字不在字典中的情况
# bt是在t时刻字为word时,则假设状态分布式均匀的
if wordid is None:
# 如果字不再字典里,则假设状态的概率分布是均匀的
bt = torch.log(torch.ones(self.hmm_N) / self.hmm_N)
else:
bt = Bt[wordid] # 否则从观测概率矩阵中取bt
for tag_id in range(len(self.tag2id)):
max_prob, max_id = torch.max(
viterbi[:, step-1] + A[:, tag_id],
dim=0
)
viterbi[tag_id, step] = max_prob + bt[tag_id]
backpointer[tag_id,step] = max_id
# 终止, t=seq_len 即 viterbi[:, seq_len]中的最大概率,就是最优路径的概率
best_path_prob, best_path_pointer = torch.max(
viterbi[:, seq_len - 1], dim=0
)
# 回溯,求最优路径
best_path_pointer = best_path_pointer.item()
best_path = [best_path_pointer]
for back_step in range(seq_len-1, 0, -1):
best_path_pointer = backpointer[best_path_pointer, back_step]
best_path_pointer = best_path_pointer.item()
best_path.append(best_path_pointer)
# 将tag_id组成的序列转化为tag
assert len(best_path) == len(word_list)
tag_list = [self.id2tag[id_] for id_ in reversed(best_path)]
return tag_list
评价指标
def model_metrics(true_labels, pre_labels, logger):
"""
:param true_labels 真实标签数据 [O,O,B-OR, I-OR]
:param pre_labels 预测标签数据 [O,O,B-OR, I-OR]
:param logger 日志实例
"""
start_time = time.time()
acc = accuracy_score(true_labels, pre_labels)
f1score = f1_score(true_labels, pre_labels, average='macro')
report = classification_report(true_labels, pre_labels, digits=4)
msg = '\nTest Acc: {0:>6.2%}, Test f1: {1:>6.2%}'
logger.info(msg.format(acc, f1score))
logger.info("\nPrecision, Recall and F1-Score...")
logger.info("\n{}".format(report))
time_dif = time.time() - start_time
logger.info("Time usage:{0:>.6}s".format(time_dif))
f1score | hmm |
---|---|
68.73 | |
msra | 56.72 |
resume | 71.57 |
ontonotes5 | 53.45 |
评论(0)
您还未登录,请登录后发表或查看评论