Seq2Seq:让机器学会"同声传译"的魔法架构
一、当AI遇上国际会议:传统模型的三大困境
震撼案例:2016年某国际峰会,机器翻译出现致命错误:
原文:“The agreement is not legally binding” 错误翻译:“协议没有装订书皮” 正确翻译:“该协议不具备法律约束力”
这个真实事故背后暴露了传统模型的三大局限:
- 长度桎梏:定长输入 vs 动态议程
- 语境丢失:逐词翻译导致语义断裂
- 歧义困境:无法处理一词多义(如"apple"指水果公司)
Seq2Seq架构:编码器-解码器架构
1.1构建实验语料库和词汇表
# 构建语料库,每行包含中文、英文(解码器输入)和翻译成英文后的目标输出 3 个句子
sentences = [['咖哥 喜欢 小冰', '<sos> KaGe likes XiaoBing', 'KaGe likes XiaoBing <eos>'],['我 爱 学习 人工智能', '<sos> I love studying AI', 'I love studying AI <eos>'],['深度学习 改变 世界', '<sos> DL changed the world', 'DL changed the world <eos>'],['自然 语言 处理 很 强大', '<sos> NLP is so powerful', 'NLP is so powerful <eos>'],['神经网络 非常 复杂', '<sos> Neural-Nets are complex', 'Neural-Nets are complex <eos>']]
word_list_cn, word_list_en = [], [] # 初始化中英文词汇表
# 遍历每一个句子并将单词添加到词汇表中
for s in sentences:word_list_cn.extend(s[0].split())word_list_en.extend(s[1].split())word_list_en.extend(s[2].split())
# 去重,得到没有重复单词的词汇表
word_list_cn = list(set(word_list_cn))
word_list_en = list(set(word_list_en))
# 构建单词到索引的映射
word2idx_cn = {w: i for i, w in enumerate(word_list_cn)}
word2idx_en = {w: i for i, w in enumerate(word_list_en)}
# 构建索引到单词的映射
idx2word_cn = {i: w for i, w in enumerate(word_list_cn)}
idx2word_en = {i: w for i, w in enumerate(word_list_en)}
# 计算词汇表的大小
voc_size_cn = len(word_list_cn)
voc_size_en = len(word_list_en)
print(" 句子数量:", len(sentences)) # 打印句子数
print(" 中文词汇表大小:", voc_size_cn) # 打印中文词汇表大小
print(" 英文词汇表大小:", voc_size_en) # 打印英文词汇表大小
print(" 中文词汇到索引的字典:", word2idx_cn) # 打印中文词汇到索引的字典
print(" 英文词汇到索引的字典:", word2idx_en) # 打印英文词汇到索引的字典
1.2 生成seq2seq训练序列
import numpy as np # 导入 numpy
import torch # 导入 torch
import random # 导入 random 库
# 定义一个函数,随机选择一个句子和词汇表生成输入、输出和目标数据
def make_data(sentences):# 随机选择一个句子进行训练random_sentence = random.choice(sentences)# 将输入句子中的单词转换为对应的索引encoder_input = np.array([[word2idx_cn[n] for n in random_sentence[0].split()]])# 将输出句子中的单词转换为对应的索引decoder_input = np.array([[word2idx_en[n] for n in random_sentence[1].split()]])# 将目标句子中的单词转换为对应的索引target = np.array([[word2idx_en[n] for n in random_sentence[2].split()]])# 将输入、输出和目标批次转换为 LongTensorencoder_input = torch.LongTensor(encoder_input)decoder_input = torch.LongTensor(decoder_input)target = torch.LongTensor(target)return encoder_input, decoder_input, target
# 使用 make_data 函数生成输入、输出和目标张量
encoder_input, decoder_input, target = make_data(sentences)
for s in sentences: # 获取原始句子if all([word2idx_cn[w] in encoder_input[0] for w in s[0].split()]):original_sentence = sbreak
print(" 原始句子:", original_sentence) # 打印原始句子
print(" 编码器输入张量的形状:", encoder_input.shape) # 打印输入张量形状
print(" 解码器输入张量的形状:", decoder_input.shape) # 打印输出张量形状
print(" 目标张量的形状:", target.shape) # 打印目标张量形状
print(" 编码器输入张量:", encoder_input) # 打印输入张量
print(" 解码器输入张量:", decoder_input) # 打印输出张量
print(" 目标张量:", target) # 打印目标张量
生成数据:从一个多语言的语料库中随机挑选一个句子,并将其中的中文、英文输入和目标输出都转换成对应的数字索引张量。
为神经网络准备:这种数字化的数据是神经网络处理文本数据时必不可少的,确保模型能理解和训练每个单词。
1.3 定义编码器和解码器
import torch.nn as nn # 导入 torch.nn 库
# 定义编码器类,继承自 nn.Module
class Encoder(nn.Module):def __init__(self, input_size, hidden_size):super(Encoder, self).__init__() self.hidden_size = hidden_size # 设置隐藏层大小 self.embedding = nn.Embedding(input_size, hidden_size) # 创建词嵌入层 self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True) # 创建 RNN 层 def forward(self, inputs, hidden): # 前向传播函数embedded = self.embedding(inputs) # 将输入转换为嵌入向量 output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入 RNN 层并获取输出return output, hidden
# 定义解码器类,继承自 nn.Module
class Decoder(nn.Module):def __init__(self, hidden_size, output_size):super(Decoder, self).__init__() self.hidden_size = hidden_size # 设置隐藏层大小 self.embedding = nn.Embedding(output_size, hidden_size) # 创建词嵌入层self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True) # 创建 RNN 层 self.out = nn.Linear(hidden_size, output_size) # 创建线性输出层 def forward(self, inputs, hidden): # 前向传播函数 embedded = self.embedding(inputs) # 将输入转换为嵌入向量 output, hidden = self.rnn(embedded, hidden) # 将嵌入向量输入 RNN 层并获取输出 output = self.out(output) # 使用线性层生成最终输出return output, hidden
n_hidden = 128 # 设置隐藏层数量
# 创建编码器和解码器
encoder = Encoder(voc_size_cn, n_hidden)
decoder = Decoder(n_hidden, voc_size_en)
print(' 编码器结构:', encoder) # 打印编码器的结构
print(' 解码器结构:', decoder) # 打印解码器的结构
编码器 负责将中文句子转换为一个隐藏状态,这个隐藏状态包含了整个句子的“精华信息”。
解码器 接受这个隐藏状态,并利用自己的 RNN 层一步步生成英文翻译,直到输出完整的翻译句子。
整体流程 就像两个翻译大师默契合作:一个把中文理解得透彻,另一个根据理解生成英文表述。
1.4 定义seq2seq架构
class Seq2Seq(nn.Module):def __init__(self, encoder, decoder):super(Seq2Seq, self).__init__()# 初始化编码器和解码器self.encoder = encoderself.decoder = decoderdef forward(self, enc_input, hidden, dec_input): # 定义前向传播函数# 使输入序列通过编码器并获取输出和隐藏状态encoder_output, encoder_hidden = self.encoder(enc_input, hidden)# 将编码器的隐藏状态传递给解码器作为初始隐藏状态decoder_hidden = encoder_hidden# 使解码器输入(目标序列)通过解码器并获取输出decoder_output, _ = self.decoder(dec_input, decoder_hidden)return decoder_output
# 创建 Seq2Seq 架构
model = Seq2Seq(encoder, decoder)
print('S2S 模型结构:', model) # 打印模型的结构
整体流程:
- 初始化:将编码器和解码器组合成一个 Seq2Seq 模型。
- 编码过程:输入句子经过编码器处理,生成隐藏状态(记忆)。
- 解码过程:解码器以编码器的隐藏状态为起点,结合目标序列(通常带
<sos>
)逐步生成输出。 - 输出:最终返回解码器生成的输出,这通常用来计算损失或者进行实际翻译。
1.5 训练seq2seq架构
# 定义训练函数
def train_seq2seq(model, criterion, optimizer, epochs):for epoch in range(epochs):encoder_input, decoder_input, target = make_data(sentences) # 训练数据的创建hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态 optimizer.zero_grad()# 梯度清零 output = model(encoder_input, hidden, decoder_input) # 获取模型输出 loss = criterion(output.view(-1, voc_size_en), target.view(-1)) # 计算损失 if (epoch + 1) % 40 == 0: # 打印损失print(f"Epoch: {epoch + 1:04d} cost = {loss:.6f}") loss.backward()# 反向传播 optimizer.step()# 更新参数
# 训练模型
epochs = 400 # 训练轮次
criterion = nn.CrossEntropyLoss() # 损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # 优化器
train_seq2seq(model, criterion, optimizer, epochs) # 调用函数训练模型
每一轮,模型从语料库中随机抽取一个句子,用编码器“消化”输入,再由解码器“烹饪”输出;计算损失后,反向传播让模型不断改进。
1.6 测试seq2seq架构
# 定义测试函数
def test_seq2seq(model, source_sentence):# 将输入的句子转换为索引encoder_input = np.array([[word2idx_cn[n] for n in source_sentence.split()]])# 构建输出的句子的索引,以 '<sos>' 开始,后面跟 '<eos>',长度与输入句子相同decoder_input = np.array([word2idx_en['<sos>']] + [word2idx_en['<eos>']]*(len(encoder_input[0])-1))# 转换为 LongTensor 类型encoder_input = torch.LongTensor(encoder_input)decoder_input = torch.LongTensor(decoder_input).unsqueeze(0) # 增加一维 hidden = torch.zeros(1, encoder_input.size(0), n_hidden) # 初始化隐藏状态 predict = model(encoder_input, hidden, decoder_input) # 获取模型输出 predict = predict.data.max(2, keepdim=True)[1] # 获取概率最大的索引# 打印输入的句子和预测的句子print(source_sentence, '->', [idx2word_en[n.item()] for n in predict.squeeze()])
# 测试模型
test_seq2seq(model, '咖哥 喜欢 小冰')
test_seq2seq(model, '自然 语言 处理 很 强大')
整体流程:
- 输入句子转换为数字索引。
- 为解码器构造一个“启动提示”,包含
<sos>
和<eos>
。 - 初始化必要的张量和隐藏状态。
- 使用训练好的 Seq2Seq 模型生成预测输出。
- 将预测结果转换回单词并打印出来。