cerebras.modelzoo.data_preparation.data_preprocessing.data_preprocessor.DataPreprocessor#
- class cerebras.modelzoo.data_preparation.data_preprocessing.data_preprocessor.DataPreprocessor(params)[source]#
Bases:
object
Initialize the class with given parameters. :param params: Configuration parameters. :type params: dict
Methods
Appends each sequence in a dataframe to a different hdf5 file.
average_chars_and_bytes
Calculate the total number of chunks based on the given total size and the predefined max chunk size.
Calculate the total size of all input files, taking compression factors into consideration.
Check for any unused parameters and log them as warnings.
Estimates an optimal queue size based on the max_chunk_size and a fraction of available system memory.
Process the dataset by splitting files across multiple processes.
Retrieve the output directory path.
Retrieve the path to the JSON parameters file.
Get tokenizer vocabulary size :returns: text to tokenize :rtype: vocab_size (int)
Handle input files based on provided configuration.
Convert a size in bytes to a human-readable format (e.g., KB, MB, GB).
Initialize custom tokenizer.
Initialize GPT-2 tokenizer.
Initialize Hugging Face tokenizer.
Initialize miscellaneous attributes.
Initialize Neox tokenizer.
Initialize tokenizer based on the provided tokenizer_type parameter.
Loads a dataset from a specified source and saves it in a specified format in the given directory, potentially within a subdirectory denoted by a 'split'.
load_format_hook_fn
Process the dataset either through file split or task split methods.
Process dataset specific parameters.
Process the given files, tokenize the data chunks, and save to HDF5 format.
Process parameters by calling various initialization methods.
Process the processing parameters and initialize relevant class attributes.
Setup the number of processes based on provided configuration.
This function reads the checkpoint args from the created checkpoint file.
Reads data from input files and distributes them to the tokenizer queues.
save_buffer_to_hdf5
Set up the output directory based on provided configuration.
shuffle_single_file
This function divides the output hdf5 files into different processes and prepares them for the second pass of shuffling.
This function collates the stats obtained from the different writer processes into a combined final stats :param num_writer_processes: Number of writer processes
Split the dataset processing tasks across multiple processes.
Tokenizes data and forwards the tokenized data to the writer queue.
update_checkpoint
This function writes the prefix remaining after processing LMData when pack_sequences is set to true.
Process that writes tokenized data to HDF5 format.
- load_dataset(input_data_params)[source]#
Loads a dataset from a specified source and saves it in a specified format in the given directory, potentially within a subdirectory denoted by a ‘split’.
Args: input_data_params (Dict[str, Optional[str]]): Parameters for dataset loading
including ‘source’, ‘split’ (optional), and ‘format’.
Returns: str: The directory where the dataset has been saved.
Raises: ValueError: If the specified format is not supported.
- estimate_queue_size(fraction_of_memory=0.5)[source]#
Estimates an optimal queue size based on the max_chunk_size and a fraction of available system memory.
Args: - fraction_of_memory: Fraction of available system memory to be used for queues.
Returns: - An integer representing the optimal queue size.
- process_processing_params()[source]#
Process the processing parameters and initialize relevant class attributes.
- initialize_tokenizer(processing_params)[source]#
Initialize tokenizer based on the provided tokenizer_type parameter.
- Parameters
processing_params (Dict[str, Any]) – Dictionary of processing parameters.
- initialize_gpt2tokenizer(tokenizer_params)[source]#
Initialize GPT-2 tokenizer.
- Parameters
processing_params (Dict[str, Any]) – Dictionary of processing parameters.
- initialize_neoxtokenizer(tokenizer_params)[source]#
Initialize Neox tokenizer.
- Parameters
processing_params (Dict[str, Any]) – Dictionary of processing parameters.
- initialize_huggingfacetokenizer(hf_tokenizer, tokenizer_params)[source]#
Initialize Hugging Face tokenizer.
- Parameters
hf_tokenizer (str) – str: HuggingFace tokenizer name.
processing_params (Dict[str, Any]) – Dictionary of processing parameters.
- initialize_customtokenizer(custom_tokenizer, tokenizer_params)[source]#
Initialize custom tokenizer.
- Parameters
custom_tokenizer – str: Path to implemenation of custom tokenizer.
tokenizer_params (Dict[str, Any]) – (Dict[str, Any]): Dictionary of tokenizer parameters.
- get_params_file()[source]#
Retrieve the path to the JSON parameters file.
- Returns
Path to the JSON parameters file.
- Return type
str
- get_output_dir()[source]#
Retrieve the output directory path.
- Returns
Path to the output directory.
- Return type
str
- calculate_total_size()[source]#
Calculate the total size of all input files, taking compression factors into consideration.
- Returns
The total size of all input files in bytes.
- Return type
int
- human_readable_size(size, decimal_places=2)[source]#
Convert a size in bytes to a human-readable format (e.g., KB, MB, GB).
- Parameters
size (int) – Size in bytes.
decimal_places (int) – Number of decimal places for rounding.
- Returns
Formatted size string.
- Return type
str
- calculate_total_chunks(total_size)[source]#
Calculate the total number of chunks based on the given total size and the predefined max chunk size.
- Parameters
total_size (int) – The total size of the data in bytes.
- Returns
Total number of chunks.
- Return type
int
- read_checkpoint(num_writers)[source]#
This function reads the checkpoint args from the created checkpoint file. :param num_writers: The number of writer processes
- write_remaining_prefix(chunk_locks, pid)[source]#
This function writes the prefix remaining after processing LMData when pack_sequences is set to true. :param chunk_locks: List of locks for appending to hdf5 files during shuffling :param pid: Process id of the current process
- split_shuffle_second_pass()[source]#
This function divides the output hdf5 files into different processes and prepares them for the second pass of shuffling.
- stats_collation(num_writer_processes)[source]#
This function collates the stats obtained from the different writer processes into a combined final stats :param num_writer_processes: Number of writer processes
- process_files(file_paths, process_idx, checkpoint_args, progress_counter, chunk_locks)[source]#
Process the given files, tokenize the data chunks, and save to HDF5 format.
- Parameters
file_paths – list of file_paths.
process_idx – Index of current process among all process spawned for file split
checkpoint_args (Tuple[int, int, int]) – File index, doc start index, and hdf5 index.
progress_counter (Value[int]) – Shared counter tracking number of processed chunks.
chunk_locks – List of locks for appending to hdf5 files during shuffling
- file_split_process_dataset()[source]#
Process the dataset by splitting files across multiple processes.
- reader_process(process_checkpoints)[source]#
Reads data from input files and distributes them to the tokenizer queues.
- Parameters
checkpoint_args (List[Tuple[int, int, int, int, int]]) – List of File index, doc start index, start_chunk_nuber, num_chunks_written, num_sequences_written
- tokenizer_process(idx)[source]#
Tokenizes data and forwards the tokenized data to the writer queue.
- Parameters
idx (int) – Queue ID to forward tokenized chunks of data.
- writer_process(progress_counter, num_sentinels, writer_idx, chunk_locks, process_checkpoints)[source]#
Process that writes tokenized data to HDF5 format.
- Parameters
progress_counter (Value[int]) – Shared counter tracking number of processed chunks.
num_sentinels (int) – Number of sentinels to be received for the current writer process
writer_idx (int) – The index of the current writer process
chunk_locks (List[multiprocessing.context.BaseContext.Lock]) – List of locks for appending to hdf5 files during shuffling
process_checkpoints (Tuple) – Checkpoint for the current process. This is used for resuming from checkpoint.