Source code for cerebras.modelzoo.data.nlp.bert.BertTokenClassifierDataProcessor

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Processor for PyTorch BERT fine tuning - Token classifier.
"""

import csv
import json
import os
from typing import List, Literal, Optional, Union

import numpy as np
import torch
from pydantic import PositiveInt, field_validator

import cerebras.pytorch.distributed as dist
from cerebras.modelzoo.common.input_utils import get_streaming_batch_size
from cerebras.modelzoo.config import DataConfig
from cerebras.modelzoo.config.types import AliasedPath
from cerebras.modelzoo.data.common.input_utils import (
    check_sharding_sanity,
    get_data_for_task,
    task_id,
)
from cerebras.modelzoo.data.nlp.bert.bert_utils import (
    build_vocab,
    get_meta_data,
    shard_and_shuffle_data,
)


[docs]class BertTokenClassifierDataProcessorConfig(DataConfig): data_processor: Literal["BertTokenClassifierDataProcessor"] data_dir: Union[str, List[str]] = ... "Path to the data files to use." batch_size: PositiveInt = ... "The batch size." vocab_file: AliasedPath = ... "Path to the vocabulary file." label_vocab_file: Optional[AliasedPath] = None "Path to json file with class name to class index." mask_whole_word: bool = False "Flag to whether mask the entire word." do_lower: bool = False "Flag to lower case the texts." max_sequence_length: int = ... include_padding_in_loss: bool = False labels_pad_id: Optional[str] = None input_pad_id: Optional[str] = None attn_mask_pad_id: Optional[str] = None shuffle: bool = True "Whether or not to shuffle the dataset." shuffle_seed: Optional[int] = None "The seed used for deterministic shuffling." shuffle_buffer: Optional[int] = None """ Buffer size to shuffle samples across. If None and shuffle is enabled, 10*batch_size is used. """ num_workers: int = 0 "The number of PyTorch processes used in the dataloader." prefetch_factor: Optional[int] = 10 "The number of batches to prefetch in the dataloader." persistent_workers: bool = True "Whether or not to keep workers persistent between epochs." drop_last: bool = True "Whether to drop last batch of epoch if it's an incomplete batch." def post_init(self, context): super().post_init(context) if not self.num_workers: self.prefetch_factor = None # the default value in DataLoader self.persistent_workers = False model_config = context.get("model", {}).get("config") if model_config is not None: self.include_padding_in_loss = model_config.include_padding_in_loss if model_config.label_vocab_file != self.label_vocab_file: raise ValueError( f"Model and input label vocab files do not match." f"\n\tmodel value: {model_config.label_vocab_file}" f"\n\tinput value: {self.label_vocab_file}" ) @field_validator("vocab_file", mode="after") @classmethod def get_vocab_file(cls, vocab_file): if not os.path.exists(vocab_file): raise FileNotFoundError(f"Vocab file does not exist: {vocab_file}") return os.path.abspath(vocab_file) @field_validator("label_vocab_file", mode="after") @classmethod def get_label_vocab_file(cls, label_vocab_file, info): if info.context: model_config = info.context.get("model", {}).get("config") if model_config is not None: if label_vocab_file is None: label_vocab_file = model_config.label_vocab_file if label_vocab_file != model_config.label_vocab_file: raise ValueError( f"Label vocab file from model and input do not match." f"\n\tmodel vocab file: {model_config.label_vocab_file}" f"\n\tinput vocab file: {label_vocab_file}" ) if label_vocab_file is None: raise ValueError("Label vocab file must be provided.") if not os.path.exists(label_vocab_file): raise ValueError( f"Label vocab file does not exist: {label_vocab_file}" ) return os.path.abspath(label_vocab_file)
[docs]class BertTokenClassifierDataProcessor(torch.utils.data.IterableDataset): """ Reads csv file containing the input token ids, and label_ids. Creates attention_masks and sedment_ids on the fly """ def __init__(self, config: BertTokenClassifierDataProcessorConfig): super().__init__() # Input params. self.meta_data = get_meta_data(config.data_dir) self.meta_data_values = list(self.meta_data.values()) self.meta_data_filenames = list(self.meta_data.keys()) # Please note the appending of [0] self.meta_data_values_cum_sum = np.cumsum([0] + self.meta_data_values) self.num_examples = sum(map(int, self.meta_data.values())) self.batch_size = get_streaming_batch_size(config.batch_size) self.num_batches = self.num_examples // self.batch_size assert ( self.num_batches > 0 ), "Dataset does not contain enough samples for one batch. Please choose a smaller batch size" self.num_tasks = dist.num_streamers() if dist.is_streamer() else 1 self.num_batch_per_task = self.num_batches // self.num_tasks assert ( self.num_batch_per_task > 0 ), "Dataset cannot be evenly distributed across the given tasks. Please choose fewer tasks to run with" self.num_examples_per_task = self.num_batch_per_task * self.batch_size self.files_in_task = get_data_for_task( task_id(), self.meta_data_values_cum_sum, self.num_examples_per_task, self.meta_data_values, self.meta_data_filenames, ) self.shuffle = config.shuffle self.shuffle_seed = config.shuffle_seed if config.shuffle_buffer is None: self.shuffle_buffer = 10 * self.batch_size else: self.shuffle_buffer = config.shuffle_buffer self.mask_whole_word = config.mask_whole_word self.do_lower = config.do_lower # Multi-processing params. self.num_workers = config.num_workers self.drop_last = config.drop_last self.prefetch_factor = config.prefetch_factor self.persistent_workers = config.persistent_workers # Check that our sharding will produce at least one batch check_sharding_sanity( [num_examples for _, num_examples, _ in self.files_in_task], self.batch_size, self.num_workers, self.drop_last, ) self.special_tokens = { "oov_token": "[UNK]", "class_token": "[CLS]", "pad_token": "[PAD]", "document_separator_token": "[SEP]", } if self.do_lower: self.special_tokens = { key: value.lower() for key, value in self.special_tokens.items() } # Get vocab file and size. self.vocab_file = config.vocab_file self.vocab, self.vocab_size = build_vocab( self.vocab_file, self.do_lower, self.special_tokens["oov_token"] ) self.label_vocab_file = config.label_vocab_file with open(self.label_vocab_file, "r") as labelmap_fid: self.label_map = json.load(labelmap_fid) # Init tokenizer. self.tokenize = self.vocab.forward # Getting indices for special tokens. self.special_tokens_indices = { key: self.tokenize([value])[0] for key, value in self.special_tokens.items() } # Padding indices. # See https://huggingface.co/transformers/glossary.html#labels. self.labels_pad_id = ( config.labels_pad_id if config.labels_pad_id is not None else self.special_tokens_indices["pad_token"] ) self.input_pad_id = ( config.input_pad_id if config.input_pad_id is not None else self.special_tokens_indices["pad_token"] ) self.attn_mask_pad_id = ( config.attn_mask_pad_id if config.attn_mask_pad_id is not None else self.special_tokens_indices["pad_token"] ) assert all( pad >= 0 for pad in [ self.labels_pad_id, self.input_pad_id, self.attn_mask_pad_id, ] ), ( f"All padding must be non-negative, got" f" `labels_pad_id` = {self.labels_pad_id}, `input_pad_id` = {self.input_pad_id}," f" `attn_mask_pad_id` = {self.attn_mask_pad_id}." ) self.max_sequence_length = config.max_sequence_length self.include_padding_in_loss = config.include_padding_in_loss # Store params. self.data_buffer = [] self.csv_files_per_task_per_worker = [] self.processed_buffers = 0
[docs] def create_dataloader(self): """ Classmethod to create the dataloader object. """ if self.num_workers: dataloader = torch.utils.data.DataLoader( self, batch_size=self.batch_size, num_workers=self.num_workers, drop_last=self.drop_last, prefetch_factor=self.prefetch_factor, persistent_workers=self.persistent_workers, ) else: dataloader = torch.utils.data.DataLoader( self, batch_size=self.batch_size, drop_last=self.drop_last, ) return dataloader
[docs] def load_buffer(self): """ Generator to read the data in chunks of size of `data_buffer`. :returns: Yields the data stored in the `data_buffer`. """ self.data_buffer = [] while self.processed_buffers < len(self.csv_files_per_task_per_worker): ( current_file_path, num_examples, start_id, ) = self.csv_files_per_task_per_worker[self.processed_buffers] with open(current_file_path, "r", newline="") as fid: data_reader = csv.DictReader( fid, delimiter=",", quoting=csv.QUOTE_MINIMAL ) for row_id, row in enumerate(data_reader): if start_id <= row_id < start_id + num_examples: self.data_buffer.append(row) else: continue if len(self.data_buffer) == self.shuffle_buffer: if self.shuffle: self.rng.shuffle(self.data_buffer) for ind in range(len(self.data_buffer)): yield self.data_buffer[ind] self.data_buffer = [] self.processed_buffers += 1 if self.shuffle: self.rng.shuffle(self.data_buffer) for ind in range(len(self.data_buffer)): yield self.data_buffer[ind] self.data_buffer = []
def __len__(self): """ Returns the length of the dataset on task process. """ return self.num_examples_per_task def __iter__(self): """ Iterator over the data to construct input features. :return: A tuple with training features: * np.array[int.32] input_ids: Numpy array with input token indices. Shape: (`max_sequence_length`). * np.array[int.32] labels: Numpy array with labels. Shape: (`max_sequence_length`). * np.array[int.32] attention_mask Shape: (`max_sequence_length`). * np.array[int.32] token_type_ids: Numpy array with segment indices. Shape: (`max_sequence_length`). """ ( self.processed_buffers, self.csv_files_per_task_per_worker, self.shuffle_seed, self.rng, ) = shard_and_shuffle_data( self.files_in_task, self.shuffle, self.shuffle_seed, ) # Iterate over the data rows to create input features. for data_row in self.load_buffer(): # `data_row` is a dict with keys: ["tokens", "labels""]. tokens_list, labels_list = parse_ner_row(data_row, self.do_lower) features = create_ner_features( tokens_list, labels_list, self.label_map, self.max_sequence_length, self.input_pad_id, self.attn_mask_pad_id, self.labels_pad_id, self.include_padding_in_loss, self.tokenize, ) yield features
[docs]def parse_ner_row(data_row, do_lower=False): """ Postprocessing of a row in the CSV file. :param: dict data_row: dictionary with an input text tokens and labels. :return: tuple: (list of parsed tokens, List of labels). """ tokens = data_row["tokens"].split() tokens_list = ( list(map(lambda token: token.lower(), tokens)) if do_lower else tokens ) labels_list = data_row["labels"].split() return tokens_list, labels_list
[docs]def create_ner_features( tokens_list, labels_list, label_map, max_sequence_length, input_pad_id, attn_mask_pad_id, labels_pad_id, include_padding_in_loss, tokenize, ): """ Creates the features dict for token classifier model. :param list tokens_list: Tokens to process :param list labels_list: Labels to process :param dict label_map: Dictionary mapping label to int :param int max_sequence_length: Maximum sequence length. :param int input_pad_id: Input sequence padding id. :param int attn_mask_pad_id: Attention mask padding id. :param int labels_pad_id: Labels padding id. :param bool include_padding_in_loss: Flag to generate loss mask. :param callable tokenize: Method to tokenize the input sequence. :returns: dict for features which includes keys: * 'input_ids': Numpy array with input token indices. shape: (`max_sequence_length`), dtype: int32. * 'attention_mask': Numpy array with attention mask. shape: (`max_sequence_length`), dtype: int32. * 'loss_mask': Numpy array equal to attention mask if `include_padding_in_loss` is False, else all ones. shape: (`max_sequence_length`), dtype: int32. * 'token_type_ids': Numpy array with segment ids. shape: (`max_sequence_length`), dtype: int32. * 'labels': Numpy array with labels. shape: (`max_sequence_length`), dtype: int32. """ input_ids = np.ones((max_sequence_length,), dtype=np.int32) * input_pad_id attention_mask = ( np.ones((max_sequence_length,), dtype=np.int32) * attn_mask_pad_id ) loss_mask = np.ones((max_sequence_length,), dtype=np.int32) # Convert tokens to integer ids. token_ids = tokenize(tokens_list) input_ids[0 : len(token_ids)] = token_ids attention_mask[0 : len(token_ids)] = 1 label_ids = np.ones((max_sequence_length,), dtype=np.int32) * labels_pad_id labels = [label_map[label] for label in labels_list] label_ids[0 : len(token_ids)] = labels # only one segment, so segment ids is all zeros segment_ids = np.zeros((max_sequence_length,), dtype=np.int32) # loss mask if not include_padding_in_loss: loss_mask = attention_mask.copy() features = { "input_ids": input_ids, "attention_mask": attention_mask, "loss_mask": loss_mask, "token_type_ids": segment_ids, "labels": label_ids, } return features