Pytorch 新闻分类任务(学习笔记) 电脑版发表于:2024/1/4 16:54 ![](https://img.tnblog.net/arcimg/hb/21f086c80c5d4afda1bc1029dadd8f3a.png) >#Pytorch 新闻分类任务(学习笔记) [TOC] ## 目录结构 ![](https://img.tnblog.net/arcimg/hb/101911228e54423ab6c42ec21d341b41.png) ### models文件夹 tn2>该文件夹显示搭建的网络结构。 里面有`TextCNN.py`和`TextRNN.py`两个文件,对应的网络结构是CNN和RNN,这里我们使用RNN。 ### THUCNews文件夹 tn2>在该文件夹下面有三个文件夹:`data`、`log`和`saved_dict`。 ![](https://img.tnblog.net/arcimg/hb/969988b3533b4ce797c2a5a520e916a9.png) tn2>data主要是我们的数据。 我们本次要做的一个实验是对一个新闻的一个分类。 首先我们看一下`train.txt`训练文件数据下的内容。 ![](https://img.tnblog.net/arcimg/hb/b7c5728f0abf4b8b9a920438187f7dbb.png) tn2>里面的训练样本格式是:内容+tab+新闻分类的值。 这里一共有10个类别这10个类别记录在我们的`class.txt`文件中,训练样本一共有18w行数据。 除此之外`dev.txt`文件和`test.txt`文件分别代表的我们的验证集和测试集。 #### 那么如何让计算机理解我们的内容呢? tn2>一般我们的做法是不是将内容进行清洗(举例:去掉`(图)`这种毫无意义的东西)之后,对内功进行分词或者分字,通过对应的语量表做成对应的索引id。 ![](https://img.tnblog.net/arcimg/hb/75825c6f86a84883b9502d6cf5efd76b.png) tn2>但是这种索引计算机根本**不认识**,或者说是很死板的认识。 然而`Embedding`映射表可以解决这样的问题,它是由一些知名的大厂训练出来用于将词或字映射成向量(没有大量的数据是训练不好的)。 词向量矩阵=`batch`(文本处理大小)* `max_len`(文本长度)* `E`(映射维度) tn>关于语量表这里准备的文件是`vocab.pkl`文件,关于`embedding`准备了腾讯的和搜狗的映射表。 ## 循环递归神经网络 RNN tn2>它主要解决了一个时间类型、文本类型的数据的模型。 如下图所示: ![](https://img.tnblog.net/arcimg/hb/a350452c3fc84c1c9e34681e0efd0cf1.png) tn2>我们可以看到,它仅仅是在普通的神经网络隐藏层中对特征进行再一次的,在又一次学习中既包含了新的特征又包含了旧的特征。 学习举例:如果我们有一个t1的时间数据和t2的时间数据,如果是普通神经网络模型它会将其中都放入同一输入层,二者没有凸显出时间的关系。 当使用RNN,在隐藏层中训练了t1数据,t2进入隐藏层时又回把t1和t2同时进行训练出新的特征。 ![](https://img.tnblog.net/arcimg/hb/ad1118e4de97428f961b53b815c5fc24.png) tn2>当我们有很多个时间序列数据时,我们一般只需要保留最后一个全链接层,因为它包含了前面所有的特征。 关于其中产生的中间数据结果,将全部忽略只需要保留最后。 ## 代码示例 tn2>我们的主要代码在`run.py`中。 ```python import time import torch import numpy as np from train_eval import train, init_network from importlib import import_module import argparse from tensorboardX import SummaryWriter parser = argparse.ArgumentParser(description='Chinese Text Classification') parser.add_argument('--model', type=str, required=True, help='choose a model: TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer') parser.add_argument('--embedding', default='pre_trained', type=str, help='random or pre_trained') parser.add_argument('--word', default=False, type=bool, help='True for word, False for char') args = parser.parse_args() if __name__ == '__main__': dataset = 'THUCNews' # 数据集 # 旋转Embedding # 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz, 随机初始化:random embedding = 'embedding_SougouNews.npz' if args.embedding == 'random': embedding = 'random' model_name = args.model #参数model选择模型:TextRNN 可选:TextCNN, TextRNN, if model_name == 'FastText': from utils_fasttext import build_dataset, build_iterator, get_time_dif embedding = 'random' else: from utils import build_dataset, build_iterator, get_time_dif # utils包中用于加载数据集、分词、分字的工具 # 导入模块 models.TextRNN 模块 x = import_module('models.' + model_name) # 初始化该模块的Config类,进行配置参数 config = x.Config(dataset, embedding) # 设置随机数的一致性,举例:第一次是3,第二次是6..每次都按照这种进行随机,第一个参数表示初始化值 np.random.seed(1) torch.manual_seed(1) torch.cuda.manual_seed_all(1) torch.backends.cudnn.deterministic = True # 保证每次随机结果一样 # 打印时间 start_time = time.time() print("Loading data...") # 创建数据集 vocab, train_data, dev_data, test_data = build_dataset(config, args.word) train_iter = build_iterator(train_data, config) dev_iter = build_iterator(dev_data, config) test_iter = build_iterator(test_data, config) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) # train config.n_vocab = len(vocab) # 初始化模型的 model = x.Model(config).to(config.device) writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime())) # 做一下模型初始化 if model_name != 'Transformer': init_network(model) print(model.parameters) # 训练模型 train(config, model, train_iter, dev_iter, test_iter,writer) ``` tn2>`TextRNN.py` ```python # coding: UTF-8 import torch import torch.nn as nn import torch.nn.functional as F import numpy as np class Config(object): """配置参数""" def __init__(self, dataset, embedding): # dataset 文件夹根目录名称 # embedding 映射表的文件名 self.model_name = 'TextRNN' self.train_path = dataset + '/data/train.txt' # 训练集 self.dev_path = dataset + '/data/dev.txt' # 验证集 self.test_path = dataset + '/data/test.txt' # 测试集 self.class_list = [x.strip() for x in open( dataset + '/data/class.txt').readlines()] # 类别名单 self.vocab_path = dataset + '/data/vocab.pkl' # 词表 self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果 self.log_path = dataset + '/log/' + self.model_name self.embedding_pretrained = torch.tensor( np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\ if embedding != 'random' else None # 预训练词向量 self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备 self.dropout = 0.5 # 随机失活,随机丢弃网络中的部分神经元。这里丢弃其中的50%。 (可改) self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练 (提前停止策略)(可改) self.num_classes = len(self.class_list) # 类别数 self.n_vocab = 0 # 词表大小,在运行时赋值 self.num_epochs = 10 # epoch数 迭代轮数(可改) self.batch_size = 128 # mini-batch大小(可改) self.pad_size = 32 # 每句话处理成的长度(短填长切),多退少补 self.learning_rate = 1e-3 # 学习率 self.embed = self.embedding_pretrained.size(1)\ if self.embedding_pretrained is not None else 300 # 字向量维度, 若使用了预训练词向量,则维度统一 self.hidden_size = 128 # lstm隐藏层 self.num_layers = 2 # lstm层数 '''Recurrent Neural Network for Text Classification with Multi-Task Learning''' class Model(nn.Module): def __init__(self, config): super(Model, self).__init__() # 进行映射向量 if config.embedding_pretrained is not None: self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False) else: self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1) # config.embed把每个向量映射到多少个维度 # bidirectional=True 从左往右走后,又从右往左走。维度翻倍256 self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers, bidirectional=True, batch_first=True, dropout=config.dropout) # 最后加一个全连接层128*2 2层 self.fc = nn.Linear(config.hidden_size * 2, config.num_classes) def forward(self, x): x, _ = x out = self.embedding(x) # 映射 [batch_size, seq_len, embeding]=[128, 32, 300] out, _ = self.lstm(out) # LSTM 隐藏层进行训练 out = self.fc(out[:, -1, :]) # 最后全连接层进行训练 句子最后时刻的 hidden state return out ``` tn2>`train_eval.py` ```python # coding: UTF-8 import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from sklearn import metrics import time from utils import get_time_dif from tensorboardX import SummaryWriter # 权重初始化,默认xavier def init_network(model, method='xavier', exclude='embedding', seed=123): for name, w in model.named_parameters(): if exclude not in name: if 'weight' in name: if method == 'xavier': nn.init.xavier_normal_(w) elif method == 'kaiming': nn.init.kaiming_normal_(w) else: nn.init.normal_(w) elif 'bias' in name: nn.init.constant_(w, 0) else: pass def train(config, model, train_iter, dev_iter, test_iter,writer): start_time = time.time() # 设置训练模式 model.train() # Adam训练模型 lr配置学习率 optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate) # 学习率指数衰减,每次epoch:学习率 = gamma * 学习率 # scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9) total_batch = 0 # 记录进行到多少batch dev_best_loss = float('inf') last_improve = 0 # 记录上次验证集loss下降的batch数 flag = False # 记录是否很久没有效果提升 #writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime())) for epoch in range(config.num_epochs): print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs)) # scheduler.step() # 学习率衰减 for i, (trains, labels) in enumerate(train_iter): #print (trains[0].shape) # 前向传播 outputs = model(trains) # 梯度清0 model.zero_grad() # 损失函数 loss = F.cross_entropy(outputs, labels) # 参数更新 loss.backward() optimizer.step() # 每执行一定的epoch我们执行一下验证集 if total_batch % 100 == 0: # 每多少轮输出在训练集和验证集上的效果 # 获取标签真实值 true = labels.data.cpu() # 预测值 predic = torch.max(outputs.data, 1)[1].cpu() # 计算准确率 train_acc = metrics.accuracy_score(true, predic) dev_acc, dev_loss = evaluate(config, model, dev_iter) # 判断当前损失相对于上一次如果要小,则保存模型 if dev_loss < dev_best_loss: dev_best_loss = dev_loss torch.save(model.state_dict(), config.save_path) improve = '*' last_improve = total_batch else: improve = '' time_dif = get_time_dif(start_time) # 打印结果 msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}' print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve)) writer.add_scalar("loss/train", loss.item(), total_batch) writer.add_scalar("loss/dev", dev_loss, total_batch) writer.add_scalar("acc/train", train_acc, total_batch) writer.add_scalar("acc/dev", dev_acc, total_batch) # 又调整会训练模式中 model.train() total_batch += 1 # 如果当前次数比上一次最好的大于我们设定的1000次容忍度,就结束 if total_batch - last_improve > config.require_improvement: # 验证集loss超过1000batch没下降,结束训练 print("No optimization for a long time, auto-stopping...") flag = True break if flag: break writer.close() # 选择最好的一次,调用测试集 test(config, model, test_iter) def test(config, model, test_iter): # test model.load_state_dict(torch.load(config.save_path)) model.eval() start_time = time.time() test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True) msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}' print(msg.format(test_loss, test_acc)) print("Precision, Recall and F1-Score...") print(test_report) print("Confusion Matrix...") print(test_confusion) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) def evaluate(config, model, data_iter, test=False): # 跳到验证集 model.eval() loss_total = 0 predict_all = np.array([], dtype=int) # 预测的结果 labels_all = np.array([], dtype=int) # 正确的结果 with torch.no_grad(): for texts, labels in data_iter: outputs = model(texts) # 计算损失 loss = F.cross_entropy(outputs, labels) loss_total += loss labels = labels.data.cpu().numpy() predic = torch.max(outputs.data, 1)[1].cpu().numpy() # 获取所有的验证集的真实值和预测值 labels_all = np.append(labels_all, labels) predict_all = np.append(predict_all, predic) # 正确率 acc = metrics.accuracy_score(labels_all, predict_all) # 如果我们在执行测试,它有一些评估指标 if test: report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4) confusion = metrics.confusion_matrix(labels_all, predict_all) return acc, loss_total / len(data_iter), report, confusion return acc, loss_total / len(data_iter) ``` tn2>`utils.py` ```python # coding: UTF-8 import os import torch import numpy as np import pickle as pkl from tqdm import tqdm import time from datetime import timedelta MAX_VOCAB_SIZE = 10000 # 词表长度限制 UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号 def build_vocab(file_path, tokenizer, max_size, min_freq): vocab_dic = {} with open(file_path, 'r', encoding='UTF-8') as f: for line in tqdm(f): lin = line.strip() if not lin: continue content = lin.split('\t')[0] for word in tokenizer(content): vocab_dic[word] = vocab_dic.get(word, 0) + 1 vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size] vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)} vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1}) return vocab_dic def build_dataset(config, ues_word): if ues_word: tokenizer = lambda x: x.split(' ') # 以空格隔开,word-level 定义分词器 else: tokenizer = lambda x: [y for y in x] # char-level 定义分字器 # 加载语量表,词汇表(没有就根据训练集进行创建一个) if os.path.exists(config.vocab_path): vocab = pkl.load(open(config.vocab_path, 'rb')) else: vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1) pkl.dump(vocab, open(config.vocab_path, 'wb')) print(f"Vocab size: {len(vocab)}") # 4762个 def load_dataset(path, pad_size=32): contents = [] # 读取文本数据 with open(path, 'r', encoding='UTF-8') as f: # 遍历每一行 for line in tqdm(f): # 去掉换行符 lin = line.strip() if not lin: continue # 分句得出句子和标签 content, label = lin.split('\t') words_line = [] # 分字 token = tokenizer(content) # 获取分字长度 seq_len = len(token) # 判断超过我这里是10的长度的按照<PAD>补齐 if pad_size: if len(token) < pad_size: token.extend([vocab.get(PAD)] * (pad_size - len(token))) else: token = token[:pad_size] seq_len = pad_size # word to id for word in token: # 将字转换成语量表的索引id添加到words_line中,如果找不到用<UNK>代替 words_line.append(vocab.get(word, vocab.get(UNK))) contents.append((words_line, int(label), seq_len)) # 返回语量表、标签与长度 return contents # [([...], 0), ([...], 1), ...] # 加载训练集、验证集和测试集 参数:路径,分字。 train = load_dataset(config.train_path, config.pad_size) dev = load_dataset(config.dev_path, config.pad_size) test = load_dataset(config.test_path, config.pad_size) # 返回语量表,训练集、验证集和测试集 return vocab, train, dev, test class DatasetIterater(object): def __init__(self, batches, batch_size, device): self.batch_size = batch_size self.batches = batches self.n_batches = len(batches) // batch_size self.residue = False # 记录batch数量是否为整数 # 判断能否整除 if len(batches) % self.n_batches != 0: self.residue = True self.index = 0 # 设置跑的设备 self.device = device def _to_tensor(self, datas): x = torch.LongTensor([_[0] for _ in datas]).to(self.device) y = torch.LongTensor([_[1] for _ in datas]).to(self.device) # pad前的长度(超过pad_size的设为pad_size) seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) return (x, seq_len), y def __next__(self): if self.residue and self.index == self.n_batches: batches = self.batches[self.index * self.batch_size: len(self.batches)] self.index += 1 batches = self._to_tensor(batches) return batches elif self.index > self.n_batches: self.index = 0 raise StopIteration else: batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size] self.index += 1 batches = self._to_tensor(batches) return batches def __iter__(self): return self def __len__(self): if self.residue: return self.n_batches + 1 else: return self.n_batches def build_iterator(dataset, config): iter = DatasetIterater(dataset, config.batch_size, config.device) return iter def get_time_dif(start_time): """获取已使用时间""" end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) if __name__ == "__main__": '''提取预训练词向量''' # 下面的目录、文件名按需更改。 train_dir = "./THUCNews/data/train.txt" vocab_dir = "./THUCNews/data/vocab.pkl" pretrain_dir = "./THUCNews/data/sgns.sogou.char" emb_dim = 300 filename_trimmed_dir = "./THUCNews/data/embedding_SougouNews" if os.path.exists(vocab_dir): word_to_id = pkl.load(open(vocab_dir, 'rb')) else: # tokenizer = lambda x: x.split(' ') # 以词为单位构建词表(数据集中词之间以空格隔开) tokenizer = lambda x: [y for y in x] # 以字为单位构建词表 word_to_id = build_vocab(train_dir, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1) pkl.dump(word_to_id, open(vocab_dir, 'wb')) embeddings = np.random.rand(len(word_to_id), emb_dim) f = open(pretrain_dir, "r", encoding='UTF-8') for i, line in enumerate(f.readlines()): # if i == 0: # 若第一行是标题,则跳过 # continue lin = line.strip().split(" ") if lin[0] in word_to_id: idx = word_to_id[lin[0]] emb = [float(x) for x in lin[1:301]] embeddings[idx] = np.asarray(emb, dtype='float32') f.close() np.savez_compressed(filename_trimmed_dir, embeddings=embeddings) ``` tn>注意我们在使用vscode跑时需要在`launch.json`中加上`model`参数。 ```json { "name": "Python: 当前文件", "type": "python", "request": "launch", "program": "${file}", "console": "integratedTerminal", "justMyCode": true, "args": [ "--model","TextRNN" ] }, ``` ![](https://img.tnblog.net/arcimg/hb/89749b16a53743b99581af929cb79f30.png) tn2><a href="https://download.tnblog.net/resource/index/83298d1dd8e14856b0bf5f1dc3aaaf05">代码链接</a>