# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import spacy
from tqdm import tqdm
from cerebras.modelzoo.data_preparation.nlp.tokenizers.Tokenization import (
FullTokenizer,
)
from cerebras.modelzoo.data_preparation.utils import (
convert_to_unicode,
text_to_tokenized_documents,
)
[docs]class PreprocessInstance:
"""
A single training (sentence-pair) instance.
:param list tokens: List of tokens for sentence pair
:param list segment_ids: List of segment ids for sentence pair
:param bool is_random_next: Specifies wh ether the second element in the
pair is random
"""
[docs] def __init__(
self,
tokens,
segment_ids,
is_random_next,
):
self.tokens = tokens
self.segment_ids = segment_ids
self.is_random_next = 1 if is_random_next else 0
self.dict = {
"tokens": self.tokens,
"segment_ids": self.segment_ids,
"is_random_next": self.is_random_next,
}
def __str__(self):
tokens = " ".join([convert_to_unicode(x) for x in self.tokens])
segment_ids = " ".join([str(x) for x in self.segment_ids])
s = ""
s += f"tokens: {tokens}\n"
s += f"segment_ids: {segment_ids}\n"
s += f"is_random_next: {self.is_random_next}\n"
s += "\n"
return s
def to_dict(self):
return self.dict
def __repr__(self):
return self.__str__()
[docs]def data_generator(
metadata_files,
vocab_file,
do_lower,
split_num,
max_seq_length,
short_seq_prob,
mask_whole_word,
max_predictions_per_seq,
masked_lm_prob,
dupe_factor,
output_type_shapes,
min_short_seq_length=None,
multiple_docs_in_single_file=False,
multiple_docs_separator="\n",
single_sentence_per_line=False,
inverted_mask=False,
seed=None,
spacy_model="en_core_web_sm",
input_files_prefix="",
sop_labels=False,
):
"""
Generator function used to create input dataset
for MLM + NSP dataset.
1. Generate raw examples by concatenating two parts
'tokens-a' and 'tokens-b' as follows:
[CLS] <tokens-a> [SEP] <tokens-b> [SEP]
where :
tokens-a: list of tokens taken from the
current document and of random length (less than msl).
tokens-b: list of tokens chosen based on the
randomly set "next_sentence_labels" and of
length msl-len(<tokens-a>)- 3 (to account for [CLS] and [SEP] tokens)
If "next_sentence_labels" is 1, (set to 1 with 0.5 probability),
tokens-b are list of tokens from sentences chosen randomly
from different document
else,
tokens-b are list of tokens taken from the same document
and is a continuation of tokens-a in the document
The number of raw tokens depends on "short_sequence_prob" as well
:param str or list[str] metadata_files: A string or strings list each
pointing to a metadata file. A metadata file contains file paths for
flat text cleaned documents. It has one file path per line.
:param str vocab_file: Vocabulary file, to build tokenization from
:param bool do_lower: Boolean value indicating if words should be
converted to lowercase or not
:param int split_num: Number of input files to read at a given
time for processing.
:param int max_seq_length: Maximum length of the sequence to generate
:param int short_seq_prob: Probability of a short sequence. Defaults to 0.
Sometimes we want to use shorter sequences to minimize the mismatch
between pre-training and fine-tuning.
:param bool mask_whole_word: If True, all subtokens corresponding to a word
will be masked.
:param int max_predictions_per_seq: Maximum number of Masked tokens
in a sequence
:param float masked_lm_prob: Proportion of tokens to be masked
:param int dupe_factor: Number of times to duplicate the dataset
with different static masks
:param int min_short_seq_length: When short_seq_prob > 0, this number
indicates the least number of tokens that each example should have i.e
the num_tokens (excluding pad) would be in the range
[min_short_seq_length, MSL]
:param dict output_type_shapes: Dictionary indicating the shapes of
different outputs
:param bool multiple_docs_in_single_file: True, when a single text file
contains multiple documents separated by <multiple_docs_separator>
:param str multiple_docs_separator: String which separates
multiple documents in a single text file.
:param single_sentence_per_line: True,when the document is already
split into sentences with one sentence in each line and there is
no requirement for further sentence segmentation of a document
:param bool inverted_mask: If set to False, has 0's on padded positions and
1's elsewhere. Otherwise, "inverts" the mask, so that 1's are on padded
positions and 0's elsewhere.
:param int seed: Random seed.
:param spacy_model: spaCy model to load, i.e. shortcut
link, package name or path. Used to segment text into sentences.
:param str input_file_prefix: Prefix to be added to paths of the input files.
:param bool sop_labels: If true, negative examples of the dataset will be two
consecutive sentences in reversed order. Otherwise, uses regular (NSP)
labels (where negative examples are from different documents).
:returns: yields training examples (feature, label)
where label refers to the next_sentence_prediction label
"""
if min_short_seq_length is None:
min_short_seq_length = 2
elif (min_short_seq_length < 2) or (
min_short_seq_length > max_seq_length - 3
):
raise ValueError(
f"The min_short_seq_len param {min_short_seq_length} is invalid.\n"
f"Allowed values are [2, {max_seq_length - 3})"
)
# define tokenizer
vocab_file = os.path.abspath(vocab_file)
tokenizer = FullTokenizer(vocab_file, do_lower)
vocab_words = tokenizer.get_vocab_words()
rng = random.Random(seed)
# get all text files by reading metadata files
if isinstance(metadata_files, str):
metadata_files = [metadata_files]
input_files = []
for _file in metadata_files:
with open(_file, "r") as _fin:
input_files.extend(_fin.readlines())
input_files = [x.strip() for x in input_files if x]
rng.shuffle(input_files)
split_num = len(input_files) if split_num <= 0 else split_num
# for better performance load spacy model once here
nlp = spacy.load(spacy_model)
for i in range(0, len(input_files), split_num):
current_input_files = input_files[i : i + split_num]
all_documents = []
for _file in tqdm(current_input_files):
_fin_path = os.path.abspath(os.path.join(input_files_prefix, _file))
with open(_fin_path, "r") as _fin:
_fin_data = _fin.read()
processed_doc, _ = text_to_tokenized_documents(
_fin_data,
tokenizer,
multiple_docs_in_single_file,
multiple_docs_separator,
single_sentence_per_line,
nlp,
)
all_documents.extend(processed_doc)
rng.shuffle(all_documents)
# create a set of instance to process further
# repeat this process `dupe_factor` times
# get a list of SentencePairInstances
instances = []
for _ in range(dupe_factor):
for document_index in range(len(all_documents)):
instances.extend(
_create_sentence_instances_from_document(
all_documents,
document_index,
vocab_words,
max_seq_length,
short_seq_prob,
min_short_seq_length,
mask_whole_word,
max_predictions_per_seq,
masked_lm_prob,
rng,
sop_labels,
)
)
rng.shuffle(instances)
for instance in instances:
yield instance
def _create_sentence_instances_from_document(
all_documents,
document_index,
vocab_words,
max_seq_length,
short_seq_prob,
min_short_seq_length,
mask_whole_word,
max_predictions_per_seq,
masked_lm_prob,
rng,
sop_labels=False,
):
"""
Create instances from documents.
:param list all_documents: List of lists which contains tokenized
senteneces from each document
:param int document_index: Index of document to process currently
:param list vocab_words: List of all words present in the vocabulary
:param bool sop_labels: If true, negative examples of the dataset will be two
consecutive sentences in reversed order. Otherwise, uses regular (NSP)
labels (where negative examples are from different documents).
:returns: List of SentencePairInstance objects
"""
# get document with document_index
# Example:
# [
# [line1], [line2], [line3]
# ]
# where each line = [tokens]
document = all_documents[document_index]
# account for [CLS], [SEP], [SEP]
max_num_tokens = max_seq_length - 3
# We usually want to fill up the entire sequence since we are padding
# to `max_seq_length` anyways, so short sequences are generally not
# needed. However, we sometimes (i.e., short_seq_prob = 0.1 == 10% of
# the time) want to use shorter sequences to minimize the mismatch
# between pre-training and fine-tuning. The `target_seq_len` is just a
# rough target however, whereas `max_seq_length` is a hard limit
target_seq_len = max_num_tokens
if rng.random() < short_seq_prob:
target_seq_len = rng.randint(min_short_seq_length, max_num_tokens)
# We don't just concatenate all of the tokens from a line into a long
# sequence and choose an arbitrary split point because this would make
# the NSP task too easy. Instead, we split the input into segments
# `A` and `B` based on the actual "sentences" provided by the user
# input
instances = []
current_chunk = []
current_length = 0
i = 0
# lambda function for fast internal calls. called multiple times
flatten = lambda l: [item for sublist in l for item in sublist]
while i < len(document):
# a line is a list of tokens [includes words / punctuations /
# special characters / wordpieces]
# we initially read an entire line - but also ensure that if we
# meet the target seq_len with the current line - we cut it off
# remove the unused `segments` and put them back in circulation for
# input creation
line = document[i]
current_chunk.append(line)
current_length += len(line)
if i == len(document) - 1 or current_length >= target_seq_len:
if current_chunk:
# generate a sentence pair instance for NSP loss
# `a_end` is how many segments from `current_chunk` go into
# `A` (first sentence)
a_end = 1
if len(current_chunk) >= 2:
a_end = rng.randint(1, len(current_chunk) - 1)
tokens_a = []
tokens_a.extend(flatten(current_chunk[0:a_end]))
tokens_b = []
# Random next
is_random_next = False
if len(current_chunk) == 1 or (
not sop_labels and rng.random() < 0.5
):
is_random_next = True
target_b_length = target_seq_len - len(tokens_a)
# this should rarely go for more than one iteration
# for large corpora. However, just to be careful, we
# try to make sure that the random document is
# not the same as the document we are processing
for _ in range(10):
random_document_index = rng.randint(
0, len(all_documents) - 1
)
if random_document_index != document_index:
break
random_document = all_documents[random_document_index]
random_start = rng.randint(0, len(random_document) - 1)
for j in range(random_start, len(random_document)):
tokens_b.extend(random_document[j])
if len(tokens_b) >= target_b_length:
break
# We don't actually use these segments [peices of line]
# so we "put them back" so they do not to waste for
# later computations
num_unused_segments = len(current_chunk) - a_end
i -= num_unused_segments
elif sop_labels and rng.random() < 0.5:
# From https://github.com/google-research/albert/blob/a36e095d3066934a30c7e2a816b2eeb3480e9b87/create_pretraining_data.py#L338
is_random_next = True
for j in range(a_end, len(current_chunk)):
tokens_b.extend(current_chunk[j])
tokens_a, tokens_b = tokens_b, tokens_a
else:
# Actual next
is_random_next = False
tokens_b.extend(
flatten(current_chunk[a_end : len(current_chunk)])
)
# truncate seq pair tokens to max_num_tokens
_truncate_seq_pair(tokens_a, tokens_b, max_num_tokens, rng)
assert len(tokens_a) >= 1
assert len(tokens_b) >= 1
# create actual input instance
tokens = []
segment_ids = []
# add special token for input start
tokens.append("[CLS]")
segment_ids.append(0)
# append input `A`
extend_list = [0] * len(tokens_a)
segment_ids.extend(extend_list)
tokens.extend(tokens_a)
# add special token for input separation
tokens.append("[SEP]")
segment_ids.append(0)
# append input `B`
extend_list = [1] * len(tokens_b)
segment_ids.extend(extend_list)
tokens.extend(tokens_b)
# add special token for input separation
tokens.append("[SEP]")
segment_ids.append(1)
instance = PreprocessInstance(
tokens=tokens,
segment_ids=segment_ids,
is_random_next=is_random_next,
)
instances.append(instance)
# reset buffers
current_chunk = []
current_length = 0
# move on to next segment
i += 1
return instances
def _truncate_seq_pair(tokens_a, tokens_b, max_num_tokens, rng):
"""
Truncate a pair of tokens so that their total length is lesser than
defined maximum number of tokens
:param list tokens_a: First list of tokens in sequence pair
:param list tokens_b: Second list of tokens in sequence pair
:param int max_num_tokens: Maximum number of tokens for the length of
sequence pair tokens
"""
total_length = len(tokens_a) + len(tokens_b)
while total_length > max_num_tokens:
# find the correct list to truncate this iteration of the loop
trunc_tokens = tokens_a if len(tokens_a) > len(tokens_b) else tokens_b
assert (len(trunc_tokens)) >= 1
# check whether to remove from front or rear
if rng.random() < 0.5:
del trunc_tokens[0]
else:
trunc_tokens.pop()
# recompute lengths again after deletion of token
total_length = len(tokens_a) + len(tokens_b)