南昌做網(wǎng)站的公司哪家好西安高端網(wǎng)站建設(shè)公司
前言
僅記錄學(xué)習(xí)過程,有問題歡迎討論
輸入輸出均為不定長序列(seq2seq)
自回歸語言模型:
- x 為 str[start : end ]; y為 [start+1 : end +1] 同時訓(xùn)練多個字,逐字計算交叉熵
encode-decode結(jié)構(gòu):
- Encoder將輸入轉(zhuǎn)化為向量或矩陣,其中包含了輸入中的信息
- Decoder將Encoder的輸出轉(zhuǎn)化為輸出
attention機制
- 輸入和輸出應(yīng)該和重點句子強相關(guān),給輸入加權(quán)(所以維度應(yīng)該和輸入的size一致)
Teacher forcing
- 使用真實標(biāo)簽作為下一個輸入(自回歸語言模型就是使用的teacher forcing)
Transform結(jié)構(gòu)
- Query來自Decode ,KV來自Encode
使用Mask Attation 來避免對output做計算時,獲取了所有的信息。只使用當(dāng)前的位置對應(yīng)的output信息。(自回歸模型,先mask,然后在softmax)
評價指標(biāo):
- BLEU:按照輸出的字符計算一系列的數(shù)學(xué)(懲罰機制,Ngrim)計算來評價相似性
采樣:
-
Beam size:
保留概率最大的n條路徑 -
Temperature Sampling
根據(jù)概率分布生成下一個詞,通過參數(shù)T,T越大,結(jié)果越隨機,分布更均勻 -
TOP-P/K
采樣先按概率從大到小排序,累加概率不超過P的范圍中選
采樣從TOP-K中采樣下一個詞
代碼
使用bert實現(xiàn)自回歸訓(xùn)練模型,
添加mask attention 來實現(xiàn)
# coding:utf8import torch
import torch.nn as nn
import numpy as np
import math
import random
import os
import refrom transformers import BertModel, BertTokenizer"""
基于pytorch的LSTM語言模型
"""class LanguageModel(nn.Module):def __init__(self, input_dim, vocab_size):super(LanguageModel, self).__init__()# self.embedding = nn.Embedding(len(vocab), input_dim)# self.layer = nn.LSTM(input_dim, input_dim, num_layers=1, batch_first=True)self.bert = BertModel.from_pretrained(r"D:\NLP\video\第六周\bert-base-chinese", return_dict=False)self.classify = nn.Linear(input_dim, vocab_size)# self.dropout = nn.Dropout(0.1)self.loss = nn.functional.cross_entropy# 當(dāng)輸入真實標(biāo)簽,返回loss值;無真實標(biāo)簽,返回預(yù)測值def forward(self, x, y=None):# x = self.embedding(x) # output shape:(batch_size, sen_len, input_dim)# 使用mask來防止提前預(yù)知結(jié)果if y is not None:# 構(gòu)建一個下三角的mask# bert的mask attention 為(batch_size, vocab_size, vocab_size) L*Lmask = torch.tril(torch.ones(x.shape[0], x.shape[1], x.shape[1]))print(mask)x, _ = self.bert(x, attention_mask=mask)y_pred = self.classify(x)return self.loss(y_pred.view(-1, y_pred.shape[-1]), y.view(-1))else:x = self.bert(x)[0]y_pred = self.classify(x)return torch.softmax(y_pred, dim=-1)# 加載字表
def build_vocab(vocab_path):vocab = {"<pad>": 0}with open(vocab_path, encoding="utf8") as f:for index, line in enumerate(f):char = line[:-1] # 去掉結(jié)尾換行符vocab[char] = index + 1 # 留出0位給pad tokenreturn vocab# 加載語料
def load_corpus(path):corpus = ""with open(path, encoding="utf8") as f:for line in f:corpus += line.strip()return corpus# 隨機生成一個樣本
# 從文本中截取隨機窗口,前n個字作為輸入,最后一個字作為輸出
def build_sample(tokenizer, window_size, corpus):start = random.randint(0, len(corpus) - 1 - window_size)end = start + window_sizewindow = corpus[start:end]target = corpus[start + 1:end + 1] # 輸入輸出錯開一位# print(window, target)# 中文的文本轉(zhuǎn)化為tokenizer的idinput_ids_x = tokenizer.encode(window, add_special_tokens=False, padding='max_length', truncation=True,max_length=10)input_ids_y = tokenizer.encode(target, add_special_tokens=False, padding='max_length', truncation=True,max_length=10)return input_ids_x, input_ids_y# 建立數(shù)據(jù)集
# sample_length 輸入需要的樣本數(shù)量。需要多少生成多少
# vocab 詞表
# window_size 樣本長度
# corpus 語料字符串
def build_dataset(sample_length, tokenizer, window_size, corpus):dataset_x = []dataset_y = []for i in range(sample_length):x, y = build_sample(tokenizer, window_size, corpus)dataset_x.append(x)dataset_y.append(y)return torch.LongTensor(dataset_x), torch.LongTensor(dataset_y)# 建立模型
def build_model(vocab_size, char_dim):model = LanguageModel(char_dim, vocab_size)return model# 文本生成測試代碼
def generate_sentence(openings, model, tokenizer, window_size):# reverse_vocab = dict((y, x) for x, y in vocab.items())model.eval()with torch.no_grad():pred_char = ""# 生成文本超過30字終止while len(openings) <= 30:openings += pred_charx = tokenizer.encode(openings, add_special_tokens=False, padding='max_length', truncation=True,max_length=10)x = torch.LongTensor([x])if torch.cuda.is_available():x = x.cuda()# batch_size = 1 最后一個字符的概率y = model(x)[0][-1]index = sampling_strategy(y)# 轉(zhuǎn)化為中文 只有一個字符pred_char = tokenizer.decode(index)return openings# 采樣方式
def sampling_strategy(prob_distribution):if random.random() > 0.1:strategy = "greedy"else:strategy = "sampling"if strategy == "greedy":return int(torch.argmax(prob_distribution))elif strategy == "sampling":prob_distribution = prob_distribution.cpu().numpy()return np.random.choice(list(range(len(prob_distribution))), p=prob_distribution)# 計算文本ppl
def calc_perplexity(sentence, model, vocab, window_size):prob = 0model.eval()with torch.no_grad():for i in range(1, len(sentence)):start = max(0, i - window_size)window = sentence[start:i]x = [vocab.get(char, vocab["<UNK>"]) for char in window]x = torch.LongTensor([x])target = sentence[i]target_index = vocab.get(target, vocab["<UNK>"])if torch.cuda.is_available():x = x.cuda()pred_prob_distribute = model(x)[0][-1]target_prob = pred_prob_distribute[target_index]prob += math.log(target_prob, 10)return 2 ** (prob * (-1 / len(sentence)))def train(corpus_path, save_weight=True):epoch_num = 15 # 訓(xùn)練輪數(shù)batch_size = 64 # 每次訓(xùn)練樣本個數(shù)train_sample = 10000 # 每輪訓(xùn)練總共訓(xùn)練的樣本總數(shù)char_dim = 768 # 每個字的維度window_size = 10 # 樣本文本長度# vocab = build_vocab(r"vocab.txt") # 建立字表tokenizer = BertTokenizer.from_pretrained(r"D:\NLP\video\第六周\bert-base-chinese")vocab_size = 21128corpus = load_corpus(corpus_path) # 加載語料model = build_model(vocab_size, char_dim) # 建立模型if torch.cuda.is_available():model = model.cuda()optim = torch.optim.Adam(model.parameters(), lr=0.001) # 建立優(yōu)化器print("文本詞表模型加載完畢,開始訓(xùn)練")for epoch in range(epoch_num):model.train()watch_loss = []for batch in range(int(train_sample / batch_size)):x, y = build_dataset(batch_size, tokenizer, window_size, corpus) # 構(gòu)建一組訓(xùn)練樣本if torch.cuda.is_available():x, y = x.cuda(), y.cuda()optim.zero_grad() # 梯度歸零loss = model(x, y) # 計算lossloss.backward() # 計算梯度optim.step() # 更新權(quán)重watch_loss.append(loss.item())print("=========\n第%d輪平均loss:%f" % (epoch + 1, np.mean(watch_loss)))print(generate_sentence("忽然一陣狂風(fēng)吹過,他直接", model, tokenizer, window_size))print(generate_sentence("天青色等煙雨,而我在", model, tokenizer, window_size))if not save_weight:returnelse:base_name = os.path.basename(corpus_path).replace("txt", "pth")model_path = os.path.join("model", base_name)torch.save(model.state_dict(), model_path)returnif __name__ == "__main__":train("corpus.txt", False)# mask = torch.tril(torch.ones(4, 4)).unsqueeze(0).unsqueeze(0)# print(mask)