Source code for cerebras.modelzoo.data.nlp.bert.BertClassifierDataProcessor

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Data loader for SST2 and MNL (GLUE tasks).
"""

import abc
import csv
import os

import numpy as np
import torch

from cerebras.modelzoo.common.input_utils import get_streaming_batch_size
from cerebras.modelzoo.common.registry import registry
from cerebras.modelzoo.data.common.input_utils import ShardedSampler
from cerebras.modelzoo.data.nlp.bert.bert_utils import build_vocab
from cerebras.modelzoo.data_preparation.nlp.tokenizers.Tokenization import (
    FullTokenizer,
)

MNLI_LABEL_IDX = {"entailment": 0, "neutral": 1, "contradiction": 2}


[docs]class ClassifierDataset(torch.utils.data.Dataset): """ Base class for dataset that load their raw data from TSV files. Child classes must provide read_tsv. Args: params (dict): List of training input parameters for creating dataset. is_training (bool): Indicator for training or validation dataset. """
[docs] def __init__(self, params, is_training): self.batch_size = get_streaming_batch_size(params["batch_size"]) self.data_dir = params["data_dir"] self.is_training = is_training self.vocab_file = params["vocab_file"] self.do_lower = params.get("do_lower", False) # Get special tokens self.special_tokens = { "oov_token": "[UNK]", "class_token": "[CLS]", "document_separator_token": "[SEP]", } if self.do_lower: self.special_tokens = { key: value.lower() for key, value in self.special_tokens.items() } self.tokenizer = FullTokenizer(self.vocab_file, self.do_lower) self.vocab, self.vocab_size = build_vocab( self.vocab_file, self.do_lower, self.special_tokens["oov_token"] ) # Init tokens_to_id converter. self.tokens_to_id = self.vocab.forward # Getting indices for special tokens. self.special_tokens_indices = { key: self.tokens_to_id([value])[0] for key, value in self.special_tokens.items() } # Padding indices. self.labels_pad_id = params.get("labels_pad_id", 0) self.input_pad_id = params.get("input_pad_id", 0) self.attn_mask_pad_id = params.get("attn_mask_pad_id", 0) self.max_sequence_length = params["max_sequence_length"]
[docs] def encode_sequence(self, text1, text2=None): """ Tokenizes a single text (if text2 is None) or a pair of texts. Truncates and adds special tokens as needed. Args: text1 (str): First text to encode. text2 (str): Second text to encode or `None`. Returns: A list for `input_ids`, `segment_ids` and `attention_mask`. - input_ids (np.array[int.32]): Numpy array with input token indices. Shape: (`max_sequence_length`). - segment_ids (np.array[int.32]): Numpy array with segment indices. Shape: (`max_sequence_length`). - attention_mask (np.array[int.32]): Numpy array with input masks. Shape: (`max_sequence_length`). """ special_tokens_count = 3 if text2 else 2 # [CLS], [SEP], Optional[SEP] max_num_tokens = self.max_sequence_length - special_tokens_count # Tokenize and truncate tokenized_text1 = self.tokenizer.tokenize(text1.strip()) tokenized_text2 = ( self.tokenizer.tokenize(text2.strip()) if text2 else None ) if text2: # Truncate the tokens one at a time from the end total_len = len(tokenized_text1) + len(tokenized_text2) while total_len > max_num_tokens: if len(tokenized_text1) > len(tokenized_text2): tokenized_text1.pop() else: tokenized_text2.pop() total_len -= 1 else: tokenized_text1 = tokenized_text1[:max_num_tokens] # convert to ids cls_token_id = self.special_tokens_indices["class_token"] sep_token_id = self.special_tokens_indices["document_separator_token"] token_ids1 = self.tokens_to_id(tokenized_text1) input_ids = [cls_token_id] + token_ids1 + [sep_token_id] if text2: token_ids2 = self.tokens_to_id(tokenized_text2) input_ids = input_ids + token_ids2 + [sep_token_id] meaningful_tokens_count = len(input_ids) pad_count = self.max_sequence_length - meaningful_tokens_count input_ids = input_ids + [self.input_pad_id] * pad_count # attention mask attention_mask = ( np.ones((self.max_sequence_length,), dtype=np.int32) * self.attn_mask_pad_id ) attention_mask[:meaningful_tokens_count] = 1 # segment ids segment_ids = np.zeros((self.max_sequence_length,), dtype=np.int32) if text2: text2_start = len(token_ids1) text2_end = text2_start + len(token_ids2) segment_ids[text2_start:text2_end] = 1 return input_ids, segment_ids, attention_mask
def read_tsv(self): raise NotImplementedError
[docs]class SST2Dataset(ClassifierDataset): """ SST2 dataset processor for sentiment analysis. Args: params (dict): List of training input parameters for creating dataset. is_training (bool): Indicator for training or validation dataset. """
[docs] def __init__(self, params, is_training): super(SST2Dataset, self).__init__(params, is_training) self.raw_data = np.array(self.read_tsv()) self.num_examples = len(self.raw_data) self.num_batches = self.num_examples // self.batch_size assert self.num_batches > 0, ( "Dataset does not contain enough samples for one batch, please " "choose a smaller batch size." )
def read_tsv(self): fname = "train" if self.is_training else "dev" tsv_file = os.path.join(self.data_dir, f"{fname}.tsv") data = [] with open(tsv_file, "r") as fid: csv_reader = csv.DictReader( fid, delimiter="\t", quoting=csv.QUOTE_NONE ) for row in csv_reader: sst_data = [row["sentence"], row["label"]] data.append(sst_data) return data def __getitem__(self, idx): """ For each text, raw_label sample in the data do: 1. Tokenize and truncate 2. Add special tokens 3. Convert tokens to ids 4. Create attention mask 5. Create a feature dict with: - input_ids: np.array[int32] input tokens indices shape: (max_sequence_length, ) attention_mask: np.array[int32] attention masks shape: (max_sequence_length, ) token_type_ids: np.array[int32] segment ids shape: (max_sequence_length, ) labels: int32 scalar indicating the sentiment Returns: A dict with features. """ text, raw_label = self.raw_data[idx] ( input_ids, segment_ids, attention_mask, ) = self.encode_sequence(text) features = { "input_ids": np.array(input_ids, dtype=np.int32), "attention_mask": attention_mask, "token_type_ids": segment_ids, "labels": np.array(int(raw_label), dtype=np.int32), } return features def __len__(self): return self.num_examples
[docs]class MNLIDataset(ClassifierDataset): """ SST2 dataset processor for sentiment analysis. Args: params (dict): List of training input parameters for creating dataset. is_training (bool): Indicator for training or validation dataset. """
[docs] def __init__(self, params, is_training): super(MNLIDataset, self).__init__(params, is_training) self.raw_data = np.array(self.read_tsv()) self.num_examples = len(self.raw_data) self.num_batches = self.num_examples // self.batch_size assert self.num_batches > 0, ( "Dataset does not contain enough samples for one batch, please " "choose a smaller batch size." )
def read_tsv(self): fnames = ["train"] if not self.is_training: # MNLI has two validation sets: # - the matched set comes from the same domains as training set # - the mismatched set comes from different domains fnames = ["dev_matched", "dev_mismatched"] data = [] for fname in fnames: tsv_file = os.path.join(self.data_dir, f"{fname}.tsv") with open(tsv_file, "r") as fid: csv_reader = csv.DictReader( fid, delimiter="\t", quoting=csv.QUOTE_NONE ) # During eval we concatenate the two validation sets. Before # doing so, we give each example an "is_matched" label so # that during eval we can measure matched and mismatched # accuracies separately. is_matched = 0 if "mismatched" in fname else 1 for row in csv_reader: mnli_data = [ row["sentence1"], row["sentence2"], row["gold_label"], is_matched, ] data.append(mnli_data) return data def __getitem__(self, idx): """ For each text, raw_label sample in the data do: 1. Tokenize sentence a and sentence b, truncate 2. Add special tokens 3. Convert tokens to ids 4. Create attention mask 5. Create a feature dict with: - input_ids: np.array[int32] input tokens indices shape: (max_sequence_length, ) attention_mask: np.array[int32] attention masks shape: (max_sequence_length, ) token_type_ids: np.array[int32] segment ids shape: (max_sequence_length, ) labels: int32 scalar indicating the sentiment Returns: A dict with features. """ text1, text2, raw_label, is_matched = self.raw_data[idx] ( input_ids, segment_ids, attention_mask, ) = self.encode_sequence(text1, text2) features = { "input_ids": np.array(input_ids, dtype=np.int32), "attention_mask": attention_mask, "token_type_ids": segment_ids, "labels": np.array(MNLI_LABEL_IDX[raw_label], dtype=np.int32), } # Add a field for is_matched on validation set if not self.is_training: features["is_matched"] = np.array(is_matched, dtype=np.int32) is_mismatched = 1 - np.array(is_matched, dtype=np.int32) features["is_mismatched"] = is_mismatched.astype(np.int32) return features def __len__(self): return self.num_examples
[docs]class DataProcessor(abc.ABC): """ Base class for processors that load their raw data from TFDS. Child classes must provide map_fn, name. Args: data_params (dict): Input parameters for creating dataset. Expects the following fields: - "vocab_file" (str): Path to the vocab file. - "data_dir" (str): Path to directory containing the TF Records. - "batch_size" (int): Batch size. - "max_sequence_length" (int): Maximum length of the sequence. - "shuffle" (bool): Flag to enable data shuffling. - "shuffle_seed" (int): Shuffle seed. - "shuffle_buffer" (int): Shuffle buffer size. - "do_lower" (bool): Flag to lower case the texts. - "num_workers" (int): How many subprocesses to use for data loading. - "drop_last" (bool): If True and the dataset size is not divisible by the batch size, the last incomplete batch will be dropped. - "prefetch_factor" (int): Number of samples loaded in advance by each worker. - "persistent_workers" (bool): If True, the data loader will not shutdown the worker processes after a dataset has been consumed once. model_params (dict, optional): Model parameters for creating the dataset, unused. """
[docs] def __init__(self, data_params, model_params) -> None: self.data_params = data_params self.model_params = model_params self.batch_size = get_streaming_batch_size(data_params["batch_size"]) self.shuffle = data_params.get("shuffle", True) self.shuffle_seed = data_params.get("shuffle_seed", None) self.num_workers = data_params.get("num_workers", 0) self.drop_last = data_params.get("drop_last", True) self.prefetch_factor = data_params.get("prefetch_factor", 10) self.persistent_workers = data_params.get("persistent_workers", True) self.dataset = None
@abc.abstractmethod def create_dataset(self, is_training): raise NotImplementedError( "Please override this method in the base class to create the dataset." ) def create_dataloader(self, is_training=True): self.create_dataset(is_training=is_training) assert self.dataset, "Unexpected error, dataset is None." sharded_sampler = ShardedSampler( self.dataset, self.shuffle, self.shuffle_seed, self.drop_last, ) if self.num_workers: # prefetch factor only allowed with `num_workers > 0` return torch.utils.data.DataLoader( self.dataset, batch_size=self.batch_size, sampler=sharded_sampler, num_workers=self.num_workers, drop_last=self.drop_last, prefetch_factor=self.prefetch_factor, persistent_workers=self.persistent_workers, ) return torch.utils.data.DataLoader( self.dataset, batch_size=self.batch_size, drop_last=self.drop_last )
[docs]@registry.register_datasetprocessor("SST2DataProcessor") class SST2DataProcessor(DataProcessor): """ The data processor responsible for creating the SST2 dataloader instance. """
[docs] def __init__(self, data_params, model_params) -> None: super(SST2DataProcessor, self).__init__(data_params, model_params)
def create_dataset(self, is_training=True): self.dataset = SST2Dataset(self.data_params, is_training)
[docs]@registry.register_datasetprocessor("MNLIDataProcessor") class MNLIDataProcessor(DataProcessor): """ The data processor responsible for creating the MNLI dataloader instance. """
[docs] def __init__(self, data_params, model_params) -> None: super(MNLIDataProcessor, self).__init__(data_params, model_params)
def create_dataset(self, is_training=True): self.dataset = MNLIDataset(self.data_params, is_training)