Source code for cerebras.modelzoo.common.utils.input.utils

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Specifies `SamplesSaver` and `SamplesViewer` classes for writing and reading numpy array datasets."""
import logging
import os
import shutil
import sys
from typing import List, Optional

import numpy as np


[docs]class SamplesViewer: """Handles iteraing over data samples of numpy arrays from .npy files."""
[docs] def __init__(self, samples_file_list: List[str]): if not samples_file_list: raise RuntimeError( "No samples files to load. Please provide a list of " "valid paths to .npy files to load data samples from " "when initializing this class." ) self._samples_file_list = samples_file_list
def __iter__(self): for samples_file in self._samples_file_list: try: with open(samples_file, 'rb') as f: samples = np.load(f) except Exception as e: raise RuntimeError( f"Failed to read requests file: {samples_file}" ) from e for sample in samples: yield sample
[docs]class SamplesSaver: """Manages data samples chunking and saving for numpy arrays."""
[docs] def __init__( self, data_dir: str, max_file_size: int, filename_prefix: Optional[str] = None, ): """ Args: data_dir: Path to mounted dir where the samples are dumped max_file_size: Maximum file size (in bytes) for the .npy samples file(s) filename_prefix: (Optional) filename prefix for the .npy file(s) """ self.data_dir = data_dir os.makedirs(data_dir, exist_ok=True) self.max_file_size = max_file_size self.filename_prefix = ( filename_prefix if filename_prefix is not None else "samples" ) self._data_samples = [] self._samples_file_list = [] self._samples_len = 0 self._chunk_idx = 0 self._samples_sizeb = 0
@property def dataset_size(self) -> int: """Returns the total numer of data samples.""" return self._samples_len @property def samples_files(self) -> List[str]: """Returns the list of `.npy` file(s).""" return self._samples_file_list
[docs] def add_sample(self, data_sample: np.array) -> None: """Adds the np array to internally maintained list of data samples and dumps these to file if the total size exceeds `max_file_size` threshold. Args: data_sample: np array data sample """ self._samples_sizeb += sys.getsizeof(data_sample) # If total size of samples (in bytes) exceeds threshold, # write the chunk to file to prevent possible OOM issues # with holding a very large dataset if self._samples_sizeb >= self.max_file_size: self._dump_samples_file() self._data_samples.append(data_sample) self._samples_len += 1
def _dump_samples_file(self) -> None: """Saves the data samples in the current chunk as a `.npy` file and resets attributes for writing the next chunk. """ if not self._data_samples: # Skip dumping if previously chunked return samples_file_path = os.path.join( self.data_dir, f"{self.filename_prefix}_{self._chunk_idx}.npy" ) try: with open(samples_file_path, 'wb') as f: np.save(f, np.array(self._data_samples)) except Exception as e: raise RuntimeError( f"Failed to save samples in file {samples_file_path} " f"due to error: {e}" ) from e # Add filename to list of files self._samples_file_list.append(samples_file_path) # Increment chunk no. and reset samples' list and size attrs self._chunk_idx += 1 self._data_samples = [] self._samples_sizeb = 0
[docs] def flush(self): """Dumps any remaining data samples not yet written to file.""" if self._data_samples: self._dump_samples_file()
[docs] def delete_data_dumps(self) -> None: """Cleans up by deleting all dumped data.""" try: shutil.rmtree(self.data_dir) except Exception as e: logging.error( f"Failed to delete samples data directory due to: {e}" )