在上⽂中已经介绍了TextCNN的原理,本⽂通过tf2.0来做代码实践。数据集:
导库
import osimport reimport jsonimport jiebaimport datetimeimport numpy as npimport tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequencesfrom tensorflow.keras.initializers import Constantfrom sklearn.model_selection import train_test_split
from gensim.models.keyedvectors import KeyedVectorsrandom_seed = 100
数据预处理
设置数据路径
Dir = './data/iflytek_public/'
label_json_path = os.path.join(Dir, 'labels.json')train_json_path = os.path.join(Dir, 'train.json')test_json_path = os.path.join(Dir, 'test.json')dev_json_path = os.path.join(Dir, 'dev.json')
read_json: 定义json数据读取函数
ReplacePunct: ⼀个⽤正则去除标点符号的类
string2list: 解析读取到的json列表,并提取⽂字序列和分类标签
def read_json(path): json_data = []
with open(path, encoding='utf-8') as f: for line in f.readlines():
json_data.append(json.loads(line)) return json_data
class ReplacePunct: def __init__(self):
self.pattern = re.compile(r\"[!?',.:;!?’、,。:;「」~~○]\")
def replace(self, string):
return re.sub(self.pattern, \"\Replacer = ReplacePunct()
def string2list(data_json): '''
paras: input:
data_json: the list of sample jsons
outputs:
data_text: the list of word list data_label: label list '''
data_text = [list(Replacer.replace(text['sentence'])) for text in data_json] data_label = [int(text['label']) for text in data_json] return data_text, data_label
读取数据,过滤标点符号,转为字符序列并提取标签。打印训练集、验证集的数量
label_json = read_json(label_json_path)train_json = read_json(train_json_path)dev_json = read_json(dev_json_path)
print ('train:{} | dev:{}'.format(len(train_json), len(dev_json)))train_text, train_label = string2list(train_json)dev_text, dev_label = string2list(dev_json)train:12133 | dev:2599
定义tokenizer并使⽤准备好的⽂本序列进⾏拟合
tokenizer = tf.keras.preprocessing.text.Tokenizer( num_words=None, filters=' ', lower=True, split=' ',
char_level=False,
oov_token='UNKONW', document_count=0)
tokenizer.fit_on_texts(train_text)
定义batch_size, 序列最⼤长度将字符串序列转为整数序列将序列按照最⼤长度填充准备label tensor
准备 train_dataset, dev_dataset
BATCH_SIZE = 64MAX_LEN = 500
BUFFER_SIZE = tf.constant(len(train_text), dtype=tf.int64)# text 2 lists of int
train_sequence = tokenizer.texts_to_sequences(train_text)dev_sequence = tokenizer.texts_to_sequences(dev_text)
# padding sequence
train_sequence_padded = pad_sequences(train_sequence, padding='post', maxlen=MAX_LEN)dev_sequence_padded = pad_sequences(dev_sequence, padding='post', maxlen=MAX_LEN)# cvt the label tensors
train_label_tensor = tf.convert_to_tensor(train_label, dtype=tf.float32)dev_label_tensor = tf.convert_to_tensor(dev_label, dtype=tf.float32)
# create the dataset
train_dataset = tf.data.Dataset.from_tensor_slices((train_sequence_padded, train_label_tensor)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True).prefetch(BUFFER_SIZE)dev_dataset = tf.data.Dataset.from_tensor_slices((dev_sequence_padded, dev_label_tensor)).batch(BATCH_SIZE, drop_remainder=True).prefetch(BUFFER_SIZE)
⼀个batch的input, label样例
example_input, example_output = next(iter(train_dataset))example_input.shape, example_output.shape(TensorShape([64, 500]), TensorShape([64]))
构建模型
定义常量
VOCAB_SIZE = len(tokenizer.index_word) + 1 # 词典⼤⼩EMBEDDING_DIM = 300 # 词向量⼤⼩FILTERS = [3, 4, 5] # 卷积核尺⼨个数
FILTER_NUM = 256 # 卷积层卷积核个数 CLASS_NUM = len(label_json) # 类别数
DROPOUT_RATE = 0.8 # dropout⽐例
get_embeddings: 读取预训练词向量
PretrainedEmbedding: 构建加载预训练词向量且可fine tuneEmbedding Layer
def get_embeddings():
pretrained_vec_path = \"./saved_model/sgns.baidubaike.bigram-char\"
word_vectors = KeyedVectors.load_word2vec_format(pretrained_vec_path, binary=False) word_vocab = set(word_vectors.vocab.keys())
embeddings = np.zeros((VOCAB_SIZE, EMBEDDING_DIM), dtype=np.float32) for i in range(len(tokenizer.index_word)): i += 1
word = tokenizer.index_word[i] if word in word_vocab:
embeddings[i, :] = word_vectors.get_vector(word) return embeddings
class PretrainedEmbedding(tf.keras.layers.Layer):
def __init__(self, VOCAB_SIZE, EMBEDDING_DIM, embeddings, rate=0.1): super(PretrainedEmbedding, self).__init__() self.VOCAB_SIZE = VOCAB_SIZE
self.EMBEDDING_DIM = EMBEDDING_DIM
self.embeddings_initializer = tf.constant_initializer(embeddings) self.dropout = tf.keras.layers.Dropout(rate)
def build(self, input_shape):
self.embeddings = self.add_weight(
shape = (self.VOCAB_SIZE, self.EMBEDDING_DIM), initializer=self.embeddings_initializer, dtype=tf.float32 )
def call(self, x, trainable=None): output = tf.nn.embedding_lookup( params = self.embeddings, ids = x )
return self.dropout(output, training=trainable)
embeddings = get_embeddings()
构建模型
class TextCNN(tf.keras.Model):
def __init__(self, VOCAB_SIZE, EMBEDDING_DIM, FILTERS, FILTER_NUM, CLASS_NUM, DROPOUT_RATE, embeddings): super(TextCNN, self).__init__()
self.VOCAB_SIZE = VOCAB_SIZE
self.EMBEDDING_DIM = EMBEDDING_DIM self.FILTERS = FILTERS
self.FILTER_NUM = FILTER_NUM
self.CLASS_NUM = CLASS_NUM
self.DROPOUT_RATE = DROPOUT_RATE
# self.embed = tf.keras.layers.Embedding(VOCAB_SIZE, EMBEDDING_DIM,
# embeddings_initializer=tf.keras.initializers.Constant(embeddings))
self.embed = PretrainedEmbedding(self.VOCAB_SIZE, self.EMBEDDING_DIM, embeddings) self.convs = []
self.max_pools = []
for i, FILTER in enumerate(self.FILTERS):
conv = tf.keras.layers.Conv1D(self.FILTER_NUM, FILTER,
padding='same', activation='relu', use_bias=True) max_pool = tf.keras.layers.GlobalAveragePooling1D() self.convs.append(conv)
self.max_pools.append(max_pool)
self.dropout = tf.keras.layers.Dropout(self.DROPOUT_RATE)
self.fc = tf.keras.layers.Dense(self.CLASS_NUM, activation='softmax')
def call(self, x):
x = self.embed(x, trainable=True) conv_results = []
for conv, max_pool in zip(self.convs, self.max_pools): conv_results.append(max_pool(conv(x))) x = tf.concat(conv_results, axis=1) x = self.dropout(x) x = self.fc(x) return x
textcnn = TextCNN(VOCAB_SIZE, EMBEDDING_DIM, FILTERS, FILTER_NUM, CLASS_NUM, DROPOUT_RATE, embeddings)out = textcnn(example_input)
定义损失函数、优化器
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()optimizer = tf.keras.optimizers.Adam(0.0005)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')eval_loss = tf.keras.metrics.Mean(name='eval_loss')
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy')
定义单步训练、测试函数
@tf.function
def train_step(input_tensor, label_tensor): with tf.GradientTape() as tape:
prediction = textcnn(input_tensor)
loss = loss_object(label_tensor, prediction)
gradients = tape.gradient(loss, textcnn.trainable_variables)
optimizer.apply_gradients(zip(gradients, textcnn.trainable_variables))
train_loss(loss)
train_accuracy(label_tensor, prediction)@tf.function
def eval_step(input_tensor, label_tensor): prediction = textcnn(input_tensor)
loss = loss_object(label_tensor, prediction)
eval_loss(loss)
eval_accuracy(label_tensor, prediction)
定义writer,⽤于写⼊信息供tensorboard可视化观察使⽤。
current_time = datetime.datetime.now().strftime(\"%Y%m%d-%H%M%S\")train_log_dir = 'logs/' + current_time + '/train'test_log_dir = 'logs/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)test_summary_writer = tf.summary.create_file_writer(test_log_dir)
模型训练,保存权重
EPOCHS = 10
for epoch in range(EPOCHS):
train_loss.reset_states()
train_accuracy.reset_states() eval_loss.reset_states()
eval_accuracy.reset_states()
for batch_idx, (train_input, train_label) in enumerate(train_dataset): train_step(train_input, train_label) with train_summary_writer.as_default():
tf.summary.scalar('loss', train_loss.result(), step=epoch)
tf.summary.scalar('accuracy', train_accuracy.result(), step=epoch)
for batch_idx, (dev_input, dev_label) in enumerate(dev_dataset): eval_step(dev_input, dev_label) with test_summary_writer.as_default():
tf.summary.scalar('loss', eval_loss.result(), step=epoch)
tf.summary.scalar('accuracy', eval_accuracy.result(), step=epoch)
template = 'Epoch {}, Loss: {:.4f}, Accuracy: {:.4f}, Test Loss: {:.4f}, Test Accuracy: {:.4f}' print (template.format(epoch+1,
train_loss.result().numpy(),
train_accuracy.result().numpy()*100, eval_loss.result().numpy(),
eval_accuracy.result().numpy()*100))
textcnn.save_weights('./saved_model/weights_{}.h5'.format(epoch))
Epoch 1, Loss: 3.7328, Accuracy: 22.9497, Test Loss: 3.2937, Test Accuracy: 28.2422Epoch 2, Loss: 2.9424, Accuracy: 33.8790, Test Loss: 2.7973, Test Accuracy: 35.1953Epoch 3, Loss: 2.5407, Accuracy: 40.1620, Test Loss: 2.5324, Test Accuracy: 41.0156Epoch 4, Loss: 2.3023, Accuracy: 44.6759, Test Loss: 2.4003, Test Accuracy: 43.1641Epoch 5, Loss: 2.1400, Accuracy: 47.5942, Test Loss: 2.2732, Test Accuracy: 45.2344Epoch 6, Loss: 2.0264, Accuracy: 49.5784, Test Loss: 2.2155, Test Accuracy: 45.1172Epoch 7, Loss: 1.9319, Accuracy: 51.7361, Test Loss: 2.1572, Test Accuracy: 48.2812Epoch 8, Loss: 1.8622, Accuracy: 53.1415, Test Loss: 2.1201, Test Accuracy: 48.7109Epoch 9, Loss: 1.7972, Accuracy: 54.2411, Test Loss: 2.0863, Test Accuracy: 49.1016Epoch 10, Loss: 1.7470, Accuracy: 55.2331, Test Loss: 2.1074, Test Accuracy: 48.8281
可视化
tensorboard --logdir logs/
因篇幅问题不能全部显示,请点此查看更多更全内容