Pytorch官方教程:含注意力的seq2seq2机器翻译
引入
数据下载:传送门
这个教程的任务目标是将输入的法文翻译成英文。因为翻译前后句子可能不等长,所以用之前的RNN、LSTM就不太合适,所以这里就是用的Encoder-Decoder结构。其结构如下图所示:
数据处理
所需要的库:
from __future__ import unicode_literals, print_function, division from io import open import unicodedata import string import re import random import torch import torch.nn as nn from torch import optim import torch.nn.functional as F device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
我们之后需要将每个单词对应唯一的索引作为神经网络的输入和目标.为了追踪这些索引我们使用一个帮助类 Lang
类中有 词 → 索引 (word2index
) 和 索引 → 词(index2word
) 的字典, 以及每个词word2count
用来替换稀疏词汇。
SOS_token = 0 EOS_token = 1 class Lang: def __init__(self, name): self.name = name self.word2index = {} self.word2count = {} self.index2word = {0: "SOS", 1: "EOS"} self.n_words = 2 # Count SOS and EOS def addSentence(self, sentence): for word in sentence.split(\' \'): self.addWord(word) def addWord(self, word): if word not in self.word2index: self.word2index[word] = self.n_words self.word2count[word] = 1 self.index2word[self.n_words] = word self.n_words += 1 else: self.word2count[word] += 1
这些文件全部采用Unicode编码,为了简化起见,我们将Unicode字符转换成ASCII编码、所有内容小写、并修剪大部分标点符号。
def unicodeToAscii(s): return \'\'.join( c for c in unicodedata.normalize(\'NFD\', s) if unicodedata.category(c) != \'Mn\' ) def normalizeString(s): s = unicodeToAscii(s.lower().strip()) s = re.sub(r"([.!?])", r" \1", s) s = re.sub(r"[^a-zA-Z.!?]+", r" ", s) return s
我们将按行分开并将每一行分成两列来读取文件。这些文件都是英语 -> 其他语言,所以如果我们想从其他语言翻译 -> 英语,添加reverse
标志来翻转词语对。
def readLangs(lang1, lang2, reverse=False): print("Reading lines...") # Read the file and split into lines lines = open(\'data/%s-%s.txt\' % (lang1, lang2), encoding=\'utf-8\').\ read().strip().split(\'\n\') # Split every line into pairs and normalize pairs = [[normalizeString(s) for s in l.split(\'\t\')] for l in lines] # Reverse pairs, make Lang instances if reverse: pairs = [list(reversed(p)) for p in pairs] input_lang = Lang(lang2) output_lang = Lang(lang1) else: input_lang = Lang(lang1) output_lang = Lang(lang2) return input_lang, output_lang, pairs
为方便训练,句子的最大长度是10个单词(包括标点符号),同时我们将那些翻译为“I am”或“he is”等形式的句子进行了修改(考虑到之前清除的标点符号——\’)
MAX_LENGTH = 10 eng_prefixes = ( "i am ", "i m ", "he is", "he s ", "she is", "she s ", "you are", "you re ", "we are", "we re ", "they are", "they re " ) def filterPair(p): return len(p[0].split(\' \')) < MAX_LENGTH and \ len(p[1].split(\' \')) < MAX_LENGTH and \ p[1].startswith(eng_prefixes) def filterPairs(pairs): return [pair for pair in pairs if filterPair(pair)]
完整的数据准备过程:
- 按行读取文本文件,将行拆分成对
- 规范文本,按长度和内容过滤
- 从句子中成对列出单词列表
def prepareData(lang1, lang2, reverse=False): input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse) print("Read %s sentence pairs" % len(pairs)) pairs = filterPairs(pairs) print("Trimmed to %s sentence pairs" % len(pairs)) print("Counting words...") for pair in pairs: input_lang.addSentence(pair[0]) output_lang.addSentence(pair[1]) print("Counted words:") print(input_lang.name, input_lang.n_words) print(output_lang.name, output_lang.n_words) return input_lang, output_lang, pairs input_lang, output_lang, pairs = prepareData(\'eng\', \'fra\', True) print(random.choice(pairs))
输出:
Reading lines... Read 135842 sentence pairs Trimmed to 10599 sentence pairs Counting words... Counted words: fra 4345 eng 2803 [\'ils ne sont pas encore chez eux .\', \'they re not home yet .\']
seq2seq
编码器
input:torch.size([1])
hidden:torch.size([1,1,hidden_size])
输入input的seq_len为1,且batch_size也为1,所以input进行词嵌入后维度变为torch.size([1,hidden_size]),经过view(1,1,-1)变为torch.size([1,1,hidden_size]),之所以这么做,是因为后面要放入GRU训练。
output:torch.size([1,1,hidden_size])
hidden:torch.size([1,1,hidden_size])
class EncoderRNN(nn.Module): def __init__(self, input_size, hidden_size): super(EncoderRNN, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) def forward(self, input, hidden): embedded = self.embedding(input).view(1, 1, -1) output = embedded output, hidden = self.gru(output, hidden) return output, hidden def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)
简单解码器
在最简单的seq2seq解码器中,我们只使用编码器的最后输出。这最后一个输出有时称为上下文向量因为它从整个序列中编码上下文。该上下文向量用作解码器的初始隐藏状态。
在解码的每一步,解码器都被赋予一个输入指令和隐藏状态. 初始输入指令字符串开始的<SOS>
指令,第一个隐藏状态是上下文向量(编码器的最后隐藏状态).
class DecoderRNN(nn.Module): def __init__(self, hidden_size, output_size): super(DecoderRNN, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(output_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) self.out = nn.Linear(hidden_size, output_size) self.softmax = nn.LogSoftmax(dim=1) def forward(self, input, hidden): output = self.embedding(input).view(1, 1, -1) output = F.relu(output) output, hidden = self.gru(output, hidden) output = self.softmax(self.out(output[0])) return output, hidden def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)
带注意力机制的解码器
如果仅在编码器和解码器之间传递上下文向量,则该单个向量承担编码整个句子的负担.
注意力机制允许解码器网络针对解码器自身输出的每一步”聚焦”编码器输出的不同部分. 首先我们计算一组注意力权重. 这些将被乘以编码器输出矢量获得加权的组合. 结果(在代码中为attn_applied
) 应该包含关于输入序列的特定部分的信息, 从而帮助解码器选择正确的输出单词.
注意权值的计算是用另一个前馈层attn
进行的, 将解码器的输入和隐藏层状态作为输入. 由于训练数据中的输入序列(语句)长短不一,为了实际创建和训练此层, 我们必须选择最大长度的句子(输入长度,用于编码器输出),以适用于此层. 最大长度的句子将使用所有注意力权重,而较短的句子只使用前几个.
这里的input和encoder一样,seq_len = batch_size = 1,词嵌入后维度变为torch.size([1,hidden_size]),经过view(1,1,-1)变为torch.size([1,1,hidden_size])。
embedded[0]和之前的输出hidden[0]都是torch.size([1,hidden_size]),拼接在一起后变成torch.size([1,hidden_size*2]),之后经过attn这个全连接层,经过该层之后输出维度为max_length,因为句子的长度最大为max_length,在encoder中每个单词都有output,该输出经过softmax后就变成了对每个output的权重。
接下来权重乘以encoder每个单词输出的值,在这里encoder_outputs的维度为torch.size([max_length,hidden_size]),如果输出的句子的长度小于max_length,则多出来的那部分为0。得到attn_applied的维度为torch.size([1,1,hidden_size])。
接下来embedded[0]和attn_applied[0]拼接后经过一个全连接层。
class AttnDecoderRNN(nn.Module): def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH): super(AttnDecoderRNN, self).__init__() self.hidden_size = hidden_size self.output_size = output_size self.dropout_p = dropout_p self.max_length = max_length self.embedding = nn.Embedding(self.output_size, self.hidden_size) self.attn = nn.Linear(self.hidden_size * 2, self.max_length) self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size) self.dropout = nn.Dropout(self.dropout_p) self.gru = nn.GRU(self.hidden_size, self.hidden_size) self.out = nn.Linear(self.hidden_size, self.output_size) def forward(self, input, hidden, encoder_outputs): embedded = self.embedding(input).view(1, 1, -1) embedded = self.dropout(embedded) attn_weights = F.softmax( self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1) attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0)) output = torch.cat((embedded[0], attn_applied[0]), 1) output = self.attn_combine(output).unsqueeze(0) output = F.relu(output) output, hidden = self.gru(output, hidden) output = F.log_softmax(self.out(output[0]), dim=1) return output, hidden, attn_weights def initHidden(self): return torch.zeros(1, 1, self.hidden_size, device=device)
模型训练
为了训练,对于每一对我们都需要输入张量(输入句子中的词的索引)和 目标张量(目标语句中的词的索引). 在创建这些向量时,我们会将EOS标记添加到两个序列中。
def indexesFromSentence(lang, sentence): return [lang.word2index[word] for word in sentence.split(\' \')] def tensorFromSentence(lang, sentence): indexes = indexesFromSentence(lang, sentence) indexes.append(EOS_token) return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1) def tensorsFromPair(pair): input_tensor = tensorFromSentence(input_lang, pair[0]) target_tensor = tensorFromSentence(output_lang, pair[1]) return (input_tensor, target_tensor)
为了训练我们通过编码器运行输入序列,并跟踪每个输出和最新的隐藏状态. 然后解码器被赋予<SOS>
标志作为其第一个输入, 并将编码器的最后一个隐藏状态作为其第一个隐藏状态.
“Teacher forcing” 是将实际目标输出用作每个下一个输入的概念,而不是将解码器的 猜测用作下一个输入.使用“Teacher forcing” 会使其更快地收敛,但是当训练好的网络被利用时,它可能表现出不稳定性..
您可以观察“Teacher forcing”网络的输出,这些网络使用连贯的语法阅读,但远离正确的翻译 – 直觉上它已经学会表示输出语法,并且一旦老师告诉它前几个单词就可以“提取”意义,但是 它没有正确地学习如何从翻译中创建句子。
由于PyTorch的autograd给我们的自由,我们可以随意选择使用“Teacher forcing”或不使用简单的if语句. 调高teacher_forcing_ratio
来更好地使用它.
teacher_forcing_ratio = 0.5 def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length=MAX_LENGTH): encoder_hidden = encoder.initHidden() encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() input_length = input_tensor.size(0) target_length = target_tensor.size(0) encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device) loss = 0 for ei in range(input_length): encoder_output, encoder_hidden = encoder( input_tensor[ei], encoder_hidden) encoder_outputs[ei] = encoder_output[0, 0] decoder_input = torch.tensor([[SOS_token]], device=device) decoder_hidden = encoder_hidden use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False if use_teacher_forcing: # Teacher forcing: 将目标作为下一个输入 for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder( decoder_input, decoder_hidden, encoder_outputs) loss += criterion(decoder_output, target_tensor[di]) decoder_input = target_tensor[di] # Teacher forcing else: # 不适用 teacher forcing: 使用自己的预测作为下一个输入 for di in range(target_length): decoder_output, decoder_hidden, decoder_attention = decoder( decoder_input, decoder_hidden, encoder_outputs) topv, topi = decoder_output.topk(1) decoder_input = topi.squeeze().detach() # detach from history as input loss += criterion(decoder_output, target_tensor[di]) if decoder_input.item() == EOS_token: break loss.backward() encoder_optimizer.step() decoder_optimizer.step() return loss.item() / target_length
这是一个帮助函数,用于在给定当前时间和进度%的情况下打印经过的时间和估计的剩余时间。
import time import math def asMinutes(s): m = math.floor(s / 60) s -= m * 60 return \'%dm %ds\' % (m, s) def timeSince(since, percent): now = time.time() s = now - since es = s / (percent) rs = es - s return \'%s (- %s)\' % (asMinutes(s), asMinutes(rs))
整个训练过程如下所示:
- 启动计时器
- 初始化优化器和准则
- 创建一组训练队
- 为进行绘图启动空损失数组
之后我们多次调用train
函数,偶尔打印进度 (样本的百分比,到目前为止的时间,狙击的时间) 和平均损失。
def trainIters(encoder, decoder, n_iters, print_every=1000, plot_every=100, learning_rate=0.01): start = time.time() plot_losses = [] print_loss_total = 0 # Reset every print_every plot_loss_total = 0 # Reset every plot_every encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate) decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate) training_pairs = [tensorsFromPair(random.choice(pairs)) for i in range(n_iters)] criterion = nn.NLLLoss() for iter in range(1, n_iters + 1): training_pair = training_pairs[iter - 1] input_tensor = training_pair[0] target_tensor = training_pair[1] loss = train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion) print_loss_total += loss plot_loss_total += loss if iter % print_every == 0: print_loss_avg = print_loss_total / print_every print_loss_total = 0 print(\'%s (%d %d%%) %.4f\' % (timeSince(start, iter / n_iters), iter, iter / n_iters * 100, print_loss_avg)) if iter % plot_every == 0: plot_loss_avg = plot_loss_total / plot_every plot_losses.append(plot_loss_avg) plot_loss_total = 0 showPlot(plot_losses)
绘制结果
使用matplotlib完成绘图,使用plot_losses
保存训练时的数组。
import matplotlib.pyplot as plt plt.switch_backend(\'agg\') import matplotlib.ticker as ticker import numpy as np def showPlot(points): plt.figure() fig, ax = plt.subplots() # 该定时器用于定时记录时间 loc = ticker.MultipleLocator(base=0.2) ax.yaxis.set_major_locator(loc) plt.plot(points)
评估
评估与训练大部分相同,但没有目标,因此我们只是将解码器的每一步预测反馈给它自身. 每当它预测到一个单词时,我们就会将它添加到输出字符串中,并且如果它预测到我们在那里停止的EOS指令. 我们还存储解码器的注意力输出以供稍后显示.
def evaluate(encoder, decoder, sentence, max_length=MAX_LENGTH): with torch.no_grad(): input_tensor = tensorFromSentence(input_lang, sentence) input_length = input_tensor.size()[0] encoder_hidden = encoder.initHidden() encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device) for ei in range(input_length): encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden) encoder_outputs[ei] += encoder_output[0, 0] decoder_input = torch.tensor([[SOS_token]], device=device) # SOS decoder_hidden = encoder_hidden decoded_words = [] decoder_attentions = torch.zeros(max_length, max_length) for di in range(max_length): decoder_output, decoder_hidden, decoder_attention = decoder( decoder_input, decoder_hidden, encoder_outputs) decoder_attentions[di] = decoder_attention.data topv, topi = decoder_output.data.topk(1) if topi.item() == EOS_token: decoded_words.append(\'<EOS>\') break else: decoded_words.append(output_lang.index2word[topi.item()]) decoder_input = topi.squeeze().detach() return decoded_words, decoder_attentions[:di + 1]
我们可以从训练集中对随机句子进行评估,并打印出输入、目标和输出,从而做出一些主观的质量判断:
def evaluateRandomly(encoder, decoder, n=10): for i in range(n): pair = random.choice(pairs) print(\'>\', pair[0]) print(\'=\', pair[1]) output_words, attentions = evaluate(encoder, decoder, pair[0]) output_sentence = \' \'.join(output_words) print(\'<\', output_sentence) print(\'\')
训练和评估
有了所有这些帮助函数(它看起来像是额外的工作,但它使运行多个实验更容易),我们实际上可以初始化一个网络并开始训练。
请记住输入句子被严重过滤, 对于这个小数据集,我们可以使用包含256个隐藏节点 和单个GRU层的相对较小的网络.在MacBook CPU上约40分钟后,我们会得到一些合理的结果.
注
如果你运行这个笔记本,你可以训练,中断内核,评估,并在以后继续训练。 注释编码器和解码器初始化的行并再次运行 trainIters
.
hidden_size = 256 encoder1 = EncoderRNN(input_lang.n_words, hidden_size).to(device) attn_decoder1 = AttnDecoderRNN(hidden_size, output_lang.n_words, dropout_p=0.1).to(device) trainIters(encoder1, attn_decoder1, 75000, print_every=5000)
参考