Source code for cerebras.modelzoo.data.nlp.transformer.TransformerDynamicDataProcessor
# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Processor for PyTorch Transformer training.
"""
import csv
import os
import numpy as np
from cerebras.modelzoo.common.registry import registry
from cerebras.modelzoo.data.nlp.t5.t5_utils import (
create_transformer_input_features,
parse_text,
)
from cerebras.modelzoo.data.nlp.t5.T5DynamicDataProcessor import (
T5DynamicDataProcessor,
)
[docs]@registry.register_datasetprocessor("TransformerDynamicDataProcessor")
class TransformerDynamicDataProcessor(T5DynamicDataProcessor):
"""
Reads text files containing the input text tokens.
:param str src_vocab_file: Path to file containing tokens of vocabulary,
one token per line.
:param str src_data_dir: Path to directory containing the output of
preprocess.sh, with all the files of tokenized data.
:param int batch_size: Number of sequences per batch. Note that it is
different between systems.
:param bool shuffle, optional: If true the data will be shuffled before passing
into the model. Recommended for training. Can be set to False for
debugging.
:param int shuffle_seed, optional: Sets random seed for the order of
data shuffling. Allows for reproducibility while still shuffling data.
:param int shuffle_buffer: Size of buffer used to store data before
shuffling
:param int extra_ids, optional: Number of sentinel tokens for T5 objective
:param int src_max_sequence_length, optional: Largest possible sequence
length for the input. If longer it will be truncated. All other sequences
padded to this length.
:param int tgt_max_sequence_length, optional: Largest possible sequence
length for the labels. If longer it will be truncated. All other sequences
padded to this length.
:param int num_workers, optional: Number of processes that move data to the
accelerator system, so that the system doesn't process data faster than
it receives it.
:param bool drop_last, optional: If the last batch is not the full size,
i.e. the dataset could not divide evenly into the batch-size, do not
use the last batch.
:param int prefetch_factor, optional: Number of batch loaded in advance
by each worker.
:param bool persistent_workers, optional: If set, workers will not be
shutdown after going through the dataset once.
:param bool do_lower, optional: If set, will lowercase all tokens in vocabulary.
T5's vocabulary is cased so this is not recommended.
:param list buckets, optional: A list of boundaries for sequence lengths
to bucket together in order to speed up VTS/VSL.
:param bool dynamic_loss_weight, optional: If set, will divide the loss
for a token by the length of the sequence that the token comes from.
:param bool pack_sequences, optional: If set, will concatenate sequences
so that computation is performed on real data rather than padding
:param int num_documents_to_concatenate, optional: Specifies how many
documents to pack together
:param str oov_token, optional: Token for out-of-vocabulary words/sub-words
:param str sos_token, optional: Token for start-of-sequence
:param str eos_token, optional: Token for end-of-sequence
:param str pad_token, optional: Token for padding
:param int labels_pad_id, optional: Can set specific padding for labels
:param int input_pad_id, optional: Can set specific padding for inputs
:param bool mixed_precision, optional: If set, will use float16 rather
than float32 when possible
"""
def __init__(self, params):
super(TransformerDynamicDataProcessor, self).__init__(params)
[docs] def get_meta_data(self, data_dir):
"""
Read data from meta files.
:param str data_dir: Path to the input directory.
:return: Processed meta data.
"""
if not isinstance(data_dir, list):
data_dir = [data_dir]
meta_data = {}
for file_name in data_dir:
meta_file = os.path.join(file_name, "meta.dat")
assert os.path.exists(
meta_file
), f"Meta file is missing in the input directory: {data_dir}."
with open(meta_file, "r") as fin:
for line in fin.readlines():
line = line.strip().split()
# Format: `str_file_path tgt_file_path src_file_size`
meta_data[(line[0], line[1])] = int(line[2])
return meta_data
[docs] def load_buffer(self):
"""
Generator to read the data in chunks of size of `data_buffer`.
We read data from both source and target input datasets to
prepare features for side by side translation task.
:returns: Yields the data stored in the `data_buffer`.
"""
self.processed_buffers = 0
self.data_buffer = []
while self.processed_buffers < len(self.text_files_per_task_per_worker):
(
current_file_path,
num_examples,
start_id,
) = self.text_files_per_task_per_worker[self.processed_buffers]
src_file_path, tgt_file_path = current_file_path
with open(src_file_path, "r", newline="") as src_fin:
src_data_reader = csv.reader(src_fin, delimiter="\n")
with open(tgt_file_path, "r", newline="") as tgt_fin:
tgt_data_reader = csv.reader(tgt_fin, delimiter="\n")
for row_id, (src_row, tgt_row) in enumerate(
zip(src_data_reader, tgt_data_reader)
):
if start_id <= row_id < start_id + num_examples:
# Adding both source and target input sequences aligned.
self.data_buffer.append((src_row, tgt_row))
else:
continue
if len(self.data_buffer) == self.shuffle_buffer:
if self.shuffle:
self.rng.shuffle(self.data_buffer)
for ind in range(len(self.data_buffer)):
yield self.data_buffer[ind]
self.data_buffer = []
self.processed_buffers += 1
if self.shuffle:
self.rng.shuffle(self.data_buffer)
for ind in range(len(self.data_buffer)):
yield self.data_buffer[ind]
self.data_buffer = []
[docs] def get_single_item(self):
"""
Iterating over the data to construct input features.
:return: A dict with training features:
* np.array[int.32] input_ids: Numpy array with encoder input token indices.
Shape: (`src_max_sequence_length`).
* np.array[int.32] decoder_input_ids: Numpy array with decoder input token indices.
Shape: (`tgt_max_sequence_length`).
* np.array[int.32] attention_mask: Numpy array with attention mask for encoder.
Shape: (`src_max_sequence_length`).
* np.array[int.32] decoder_attention_mask: Numpy array with attention mask for decoder.
Shape: (`tgt_max_sequence_length`).
* np.array[int.32] labels: Numpy array with labels for teacher forcing mode.
Shape: (`tgt_max_sequence_length`).
"""
# Iterate over the data rows to create input features.
for data_row in self.load_buffer():
# `data_row` is a tuple with source and target
# input sequences.
src_tokens = parse_text(
"".join(data_row[0]), do_lower=self.do_lower
)
tgt_tokens = parse_text(
"".join(data_row[1]), do_lower=self.do_lower
)
features = create_transformer_input_features(
src_tokens,
tgt_tokens,
self.src_max_sequence_length,
self.tgt_max_sequence_length,
self.input_pad_id,
self.attn_mask_pad_id,
self.labels_pad_id,
self.src_tokenize,
self.special_tokens_indices["sos_token"],
self.special_tokens_indices["eos_token"],
)
# This example might be filtered out based on the sequence length.
# See `create_transformer_input_features` for more details.
if features:
yield features
[docs] def element_length_fn(self, features):
"""
Takes a single sample and returns the sequence length of that sample
to be used for VTS bucketing.
"""
return np.maximum(
np.sum(features["attention_mask"]),
np.sum(features["decoder_attention_mask"]),
)
def __len__(self):
"""
Since samples are filtered by max_length, this will return an upper
limit. See: transformers/pytorch/t5/input/utils.py
create_transformer_input_features(...)
"""
return self.num_examples