1、中文文本情感分类

文本情感分类是一种自然语言处理技术,它旨在自动识别一段文本中表达的情感,并将其分类为正面、负面或中性等不同的情感类别。文本情感分类的应用十分广泛,例如在社交媒体舆情分析、产品评论分析、用户满意度调查等领域都有重要的应用。通过文本情感分类,可以自动化地对大量文本数据进行分类和分析,为决策提供有用的信息和参考。

2、数据预处理

本文利用的数据:weibo_senti_100k.csv 下载地址:

https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/weibo_senti_100k/intro.ipynb

数据展示

data_processing.py:使用结巴分词库统计词频

# 数据来源 https://github.com/SophonPlus/ChineseNlpCorpus/blob/master/datasets/weibo_senti_100k/intro.ipynb

# 数据概览: 10 万多条,带情感标注 新浪微博,正负向评论约各 5 万条

# 停用词字典 https://github.com/goto456/stopwords

import jieba # 导入中文分词的第三方库,jieba分词

data_path = "../sources/weibo_senti_100k.csv" # 数据路径

data_stop_path = "../sources/hit_stopwords.txt" # 停用词数据路径

data_list = open(data_path, encoding='UTF-8').readlines()[1:] # 读出数据并去掉第一行的介绍标签, 每一行为一个大字符串

stops_word = open(data_stop_path, encoding='UTF-8').readlines() # 读取停用词内容

stops_word = [line.strip() for line in stops_word] # 将每行换行符去掉(去掉换行符),并生成停用词列表

stops_word.append(" ") # 可以自己根据需要添加停用词

stops_word.append("\n")

voc_dict = {}

min_seq = 1 # 用于过滤词频数

top_n = 1000

UNK = ""

PAD = ""

print(data_list[0])

# 对data_list进行分词的处理

# for item in data_list[:100]: 使用前100条数据测试,100000条数据太多

for item in data_list:

label = item[0] # 字符串的第一个为标签

content = item[2:].strip() # 从第三项开始为文本内容, strip()去掉最后的换行符

seg_list = jieba.cut(content, cut_all=False) # 调用结巴分词对每一行文本内容进行分词

seg_res = []

# 打印分词结果

for seg_item in seg_list:

if seg_item in stops_word: # 如果分词字段在停用词列表里,则取出

continue

seg_res.append(seg_item) # 如果不在则加入分词结果中

if seg_item in voc_dict.keys(): # 使用字典统计词频seg_item in voc_dict.keys():

voc_dict[seg_item] += 1

else:

voc_dict[seg_item] = 1

# print(content) # 打印未分词前的句子

# print(seg_res)

# 对字典进行排序,取TOPK词,如果将所有词都要,将会导致字典过大。我们只关注一些高频的词

voc_list = sorted([_ for _ in voc_dict.items() if _[1] > min_seq],

key=lambda x: x[1], # key:指定一个参数的函数,该函数用于从每个列表元素中提取比较键

reverse=True)[:top_n] # 取排完序后的前top_n个词,

voc_dict = {word_count[0]: idx for idx, word_count in enumerate(voc_list)} # 根据排序后的字典重新字典

voc_dict.update({UNK: len(voc_dict), PAD: len(voc_dict) + 1}) # 将前top_n后面的归类为UNK

print(voc_dict) # '泪': 0, '嘻嘻': 1, '都': 2,

# 保存字典

ff = open("../sources/dict.txt", "w")

for item in voc_dict.keys():

ff.writelines("{},{}\n".format(item, voc_dict[item])) # '泪': 0, '嘻嘻': 1, '都': 2,

这段代码是为了对中文文本数据进行预处理,以便进行情感分析。代码使用中文分词库jieba对微博数据进行处理。代码读取微博数据的csv文件,去除第一行(其中包含每列的标签),然后循环处理每一行数据。对于每一行,提取内容(微博帖子的文本)并使用jieba进行分词。得到的分词结果被过滤以去除任何停用词(常见的不具有很多意义的词,例如“和”或“的”),并计算它们的频率在一个字典中。

处理完所有行之后,将结果字典按单词频率降序排序,并选择前n个单词(在这种情况下,n为1000)。任何出现频率低于给定阈值(在这种情况下为1)的词都将从字典中过滤掉。剩余的单词被分配唯一的索引,并保存到字典文件中,以便将来用于训练情感分析模型。

除了单词字典之外,代码定义了两个特殊标记:“”和“”。"“用于表示模型训练或推理过程中遇到的任何词汇外单词(OOV),而”"用于表示添加的填充标记,以确保模型的所有输入具有相同的长度。这些特殊标记被添加到字典的末尾,其索引比字典中的任何其他单词都要高。

3、加载数据集

dataset.py 使用PyTorch的提供的dataset的接口,根据项目重写自己的dataset和dataloader

定义dataset必须重写PyTorch中的dataset中的 init,len,__getitem__函数。

import numpy as np

import jieba

from torch.utils.data import Dataset, DataLoader

# 传入字典路径,将文件读入内存

def read_dict(voc_dict_path):

voc_dict = {}

dict_list = open(voc_dict_path).readlines()

print(dict_list[0]) # '泪,0'

for item in dict_list:

item = item.split(",") # ['泪', '0\n']

voc_dict[item[0]] = int(item[1].strip()) # item[0]值'泪' item[1].strip()值为'0'

# print(voc_dict)

return voc_dict

# 将数据集进行处理(分词,过滤...)

def load_data(data_path, data_stop_path):

data_list = open(data_path, encoding='utf-8').readlines()[1:]

stops_word = open(data_stop_path, encoding='utf-8').readlines()

stops_word = [line.strip() for line in stops_word]

stops_word.append(" ")

stops_word.append("\n")

voc_dict = {}

data = []

max_len_seq = 0 # 统计最长的句子长度

np.random.shuffle(data_list)

for item in data_list[:]:

label = item[0]

content = item[2:].strip()

seg_list = jieba.cut(content, cut_all=False)

seg_res = []

for seg_item in seg_list:

if seg_item in stops_word:

continue

seg_res.append(seg_item)

if seg_item in voc_dict.keys():

voc_dict[seg_item] = voc_dict[seg_item] + 1

else:

voc_dict[seg_item] = 1

if len(seg_res) > max_len_seq: # 以句子分词词语最长为标准

max_len_seq = len(seg_res)

data.append([label, seg_res]) # [标签,分词结果的列表]

# print(max_len_seq)

return data, max_len_seq # 句子分词后,词语最大长度

# 定义Dataset

class text_CLS(Dataset):

def __init__(self, voc_dict_path, data_path, data_stop_path):

self.data_path = data_path

self.data_stop_path = data_stop_path

self.voc_dict = read_dict(voc_dict_path) # 返回数据[[label,分词词语列表],......]

self.data, self.max_len_seq = load_data(self.data_path, self.data_stop_path)

np.random.shuffle(self.data) # 将数据的顺序打乱

def __len__(self): # 返回数据集长度

return len(self.data)

def __getitem__(self, item):

data = self.data[item]

label = int(data[0])

word_list = data[1] # 句子分词后的词语列表

input_idx = []

for word in word_list:

if word in self.voc_dict.keys(): # 如果词语在自己创建的字典中

input_idx.append(self.voc_dict[word]) # 将这个单词的词频数放进列表

else:

input_idx.append(self.voc_dict[""]) # 不在则统一归为其他类(词频太低的归为一类)

if len(input_idx) < self.max_len_seq: # 词语长度小于最长长度,则需要用PAD填充

input_idx += [self.voc_dict[""] for _ in range(self.max_len_seq - len(input_idx))]

# input_idx += [1001 for _ in range(self.max_len_seq - len(input_idx))]

data = np.array(input_idx) # 将得到的词频数列表,转化为numpy数据

return label, data

# 定义DataLoader

def data_loader(dataset, config):

return DataLoader(dataset, batch_size=config.batch_size, shuffle=config.is_shuffle)

# if __name__ == "__main__":

# data_path = "../sources/weibo_senti_100k.csv"

# data_stop_path = "../sources/hit_stopwords.txt"

# dict_path = "../sources/dict"

#

# train_dataLoader = data_loader(data_path, data_stop_path, dict_path)

# for i, batch in enumerate(train_dataLoader):

# print(batch[0], batch[1].size())

# print(batch[0], batch[1])

这段代码实现了一个文本分类任务的数据预处理部分,包括读取字典文件和数据文件、分词、过滤停用词、将文本转化为词频数列表、以及构建Dataset和DataLoader。

具体实现包括以下几个函数和类: read_dict(voc_dict_path):读取字典文件,将文件内容读入内存,并返回一个字典,其中字典的键为词语,值为对应的词频数。 load_data(data_path, data_stop_path):读取数据文件,对每个文本进行分词、过滤停用词,并将文本转化为词频数列表,返回一个数据列表和词语最大长度。 text_CLS(Dataset):定义了一个Dataset类,用于在PyTorch中加载数据集。在初始化函数中,调用read_dict和load_data函数读取数据和字典,然后打乱数据顺序。在__getitem__函数中,将文本数据转化为词频数列表,并返回标签和列表。__len__函数返回数据集长度。 data_loader(dataset, config):定义了一个DataLoader函数,用于在PyTorch中批量加载数据。将dataset和config作为参数传入,返回一个DataLoader对象。 这段代码实现了将文本数据转化为数值化的词频数列表,并将其打包成一个PyTorch的Dataset和DataLoader对象,方便进行后续的模型训练。

4、搭建模型结构

models.py

import torch

import torch.nn as nn

import torch.nn.functional as F

import numpy as np

# padding_idx:padding_idx (python:int, optional) – 填充id,比如,输入长度为100,但是每次的句子长度并不一样,后面就需要用统一的数字填充,

# 而这里就是指定这个数字,这样,网络在遇到填充id时,就不会计算其与其它符号的相关性。(初始化为0)

class Model(nn.Module):

def __init__(self, config):

super(Model, self).__init__()

# 词嵌入层

self.embeding = nn.Embedding(config.n_vocab, # 字典大小:congif.n_vocab,表示词典中词的数量

embedding_dim=config.embed_size, # 词嵌入的输出大小,就是每个词经过embedding后用多少位向量表示。表示每个词对应的向量维度

padding_idx=config.n_vocab - 1) # padding_idx ,pad

# lstm层

self.lstm = nn.LSTM(input_size=config.embed_size, # 输入大小,即每个词的维度

hidden_size=config.hidden_size, # 隐藏层输出大小

num_layers=config.num_layers, # lstm的层数

bidirectional=True, # 双向lstm层

batch_first=True, # 数据结构:[batch_size,seq_len,input_size]

dropout=config.dropout) # 防止过拟合

# 卷积层

self.maxpooling = nn.MaxPool1d(config.pad_size) # 一维卷积。积核长度:

# 全连接层

self.fc = nn.Linear(config.hidden_size * 2 + config.embed_size, # 因为是双向LSTM,所以要*2,

config.num_classes) # 第二个为预测的类别数

self.softmax = nn.Softmax(dim=1)

def forward(self, x):

embed = self.embeding(x) # 输出为[batchsize, seqlen, embed_size] 标准RNN网络的输入

# print("embed.size:",embed.size())

out, _ = self.lstm(embed) # out的shape:[batch_size, seq_len, hidden_size*2]

# print("lstm层的输出size:",out.size())

# torch.cat((x,y),dim) 在dim上拼接,x,y

out = torch.cat((embed, out), 2) # 这里解析全连接层输入大小为 config.hidden_size * 2 + cinfig.embed_size。

# [batch_size,seg_len,config.hidden_size * 2 + cinfig.embed_size]

# print("cat后的size:",out.size())

out = F.relu(out) # 经过relu层,增加非线性表达能力

# print("relu层的out.size:",out.size())

out = out.permute(0, 2, 1) # 交换维度

# print("交换维度后的out.size:",out.size())

out = self.maxpooling(out).reshape(out.size()[0], -1) # 转化为2维tensor

# print("MaxPooling后的out.size:",out.size())

out = self.fc(out)

# print("全连接层的out.size:",out.size())

out = self.softmax(out)

# print("softmax后的out.size:",out.size())

return out

# 测试网络是否正确

if __name__ == '__main__':

cfg = Config()

cfg.pad_size = 640

model_textcls = Model(config=cfg)

input_tensor = torch.tensor([i for i in range(640)]).reshape([1, 640])

out_tensor = model_textcls.forward(input_tensor)

print(out_tensor.size())

print(out_tensor)

5、网络模型的配置

config.py

import torch

# 定义网路的配置类

class Config():

def __init__(self):

'''

self.embeding = nn.Embedding(config.n_vocab,

config.embed_size,

padding_idx=config.n_vocab - 1)

self.lstm = nn.LSTM(config.embed_size,

config.hidden_size,

config.num_layers,

bidirectional=True, batch_first=True,

dropout=config.dropout)

self.maxpool = nn.MaxPool1d(config.pad_size)

self.fc = nn.Linear(config.hidden_size * 2 + config.embed_size,

config.num_classes)

self.softmax = nn.Softmax(dim=1)

'''

self.n_vocab = 1002 # 字典长度

self.embed_size = 128 # 词嵌入表达的大小

self.hidden_size = 128 # 隐藏层输出大小

self.num_layers = 3 # lstm网络的层数

self.dropout = 0.8 #

self.num_classes = 2 # 二分类问题

self.pad_size = 32

self.batch_size = 128

self.is_shuffle = True

self.learn_rate = 0.001

self.num_epochs = 100

self.devices = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

6、训练脚本的搭建

run_train.py

import torch

import torch.nn as nn

from torch import optim

from models import Model

from datasets import data_loader, text_CLS

from configs import Config

cfg = Config()

data_path = "weibo_senti_100k.csv"

data_stop_path = "hit_stopwords.txt"

dict_path = "dict"

dataset = text_CLS(dict_path, data_path, data_stop_path)

train_dataloader = data_loader(dataset, cfg)

cfg.pad_size = dataset.max_len_seq #

model_text_cls = Model(cfg)

model_text_cls.to(cfg.devices)

loss_func = nn.CrossEntropyLoss() # 损失函数。交叉熵损失函数

optimizer = optim.Adam(model_text_cls.parameters(), lr=cfg.learn_rate) # 定义优化器

for epoch in range(cfg.num_epochs):

for i, batch in enumerate(train_dataloader):

label, data = batch

data = torch.tensor(data).to(cfg.devices)

label = torch.tensor(label).to(cfg.devices)

optimizer.zero_grad()

pred = model_text_cls.forward(data)

loss_val = loss_func(pred, label)

print("epoch is {},ite is {},val is {}".format(epoch, i, loss_val))

loss_val.backward() # 后向传播

optimizer.step() # 更新参数

if epoch % 10 == 0:

torch.save(model_text_cls.state_dict(), "../models/{}.pth".format(epoch))

7、训练脚本的搭建

为了简单测试,本文没有数据集拆分为训练集和测试集。 拆分训练集可以将CSV表格中打乱,然后根据需要拆分训练集和测试集。然后构建测试集的dataset和datalaoder。 test.py

import torch

import torch.nn as nn

from torch import optim

from models import Model

from datasets import data_loader, text_CLS

from configs import Config

cfg = Config()

#读取数据

data_path = "sources/weibo_senti_100k.csv"

data_stop_path = "sources/hit_stopword"

dict_path = "sources/dict"

dataset = text_CLS(dict_path, data_path, data_stop_path)

train_dataloader = data_loader(dataset, cfg)

cfg.pad_size = dataset.max_len_seq

model_text_cls = Model(cfg)

model_text_cls.to(cfg.devices)

#加载模型,保存好的模型

model_text_cls.load_state_dict(torch.load("models/10.pth"))

for i, batch in enumerate(train_dataloader):

label, data = batch

data = torch.tensor(data).to(cfg.devices)

label = torch.tensor(label,dtype=torch.int64).to(cfg.devices)

pred_softmax = model_text_cls.forward(data)

#print(pred_softmax)

print(label)

pred = torch.argmax(pred_softmax, dim=1)

print(pred)

#统计准确率

out = torch.eq(pred,label)

print(out.sum() * 1.0 / pred.size()[0])

可以上github把整个项目download下来

https://github.com/yingzhang123/Text_Sentiment_Classification

Time:2023.4.16 (周日) 如果上面代码对您有帮助,欢迎点个赞!!!

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: