cerebras.pytorch package#

Performance/Debug Flags#

cerebras.pytorch.backends controls the behavior of specific backends that Cerebras PyTorch API supports.

As of right now, the only backend for which there are configurable options include

  • cerebras.pytorch.backends.csx

cerebras.pytorch.backends.csx#

CSX related performance/debug flags

cerebras.pytorch.backends.csx.precision.optimization_level: int#

The precision optimization level (POL) to use when compiling the model. The POL determines the level of precision to use for the model’s weights and activations and can thus affect the model’s accuracy and performance.

The value must be an integer in range [0, 3). Default: 1

cerebras.pytorch.backends.csx.performance.micro_batch_size: Union[None, int, Literal['auto', 'explore'], Dict[str, Dict[str, int]]]#

The micro-batch size to use when compiling the model. The micro-batch size can affect the model’s performance and memory usage.

Valid values include:

  • “auto”: Automatically choose an optimal micro batch size.

  • “explore”: Search for an optimal micro batch size and return.

  • {
        "explore": {
            "min": Optional[<positive_int>],
            "max": Optional[<positive_int>]
        }
    }
    

    Search for an optimal micro batch size within the min and max bounds and return.

  • Positive int: Use this micro batch size.

  • None: Disable micro batch tiling.

The default value is "auto".

cerebras.pytorch.backends.csx.performance.transfer_processes: int#

The number of processes to use for transferring data to and from the Wafer Scale Cluster.

The default value is 5.

cerebras.pytorch.backends.csx.performance.use_speculative_optimizers: bool#

Whether to enable speculative optimizers. Speculative optimizers is a performance enhancement feature that speeds up slow weight update iterations. However it may increase memory usage, causing out-of-memory errors for some models. Disabling speculative optimizers may help with memory issues. Default: True.

cerebras.pytorch.backends.csx.debug.retrace_every_iteration: bool#

Whether to retrace the training/validation graph every iteration. Default: False.

cerebras.pytorch.backends.csx.debug.lazy_initialization: bool#

Whether to use lazy weight initialization. Default: True.

cerebras.pytorch.backends.csx.debug.debug_args: DebugArgs#

Arguments to pass to the cluster for cluster debugging purposes only. Default: None.

cerebras.pytorch.backends.csx.debug.ini: DebugArgs#

INI configuration flags for cluster debugging purposes only. Default: None.

cerebras.pytorch.backends.csx.debug.compile_crd_memory_gi: Optional[int]#

The memory limit for the compile coordinator.

cerebras.pytorch.backends.csx.debug.execute_crd_memory_gi: Optional[int]#

The memory limit for the execute coordinator.

cerebras.pytorch.backends.csx.debug.wrk_memory_gi: Optional[int]#

The memory limit for the workers.

cerebras.pytorch.backends.csx.debug.act_memory_gi: Optional[int]#

The memory limit for the activation hosts.

cerebras.pytorch.backends.csx.debug.cmd_memory_gi: Optional[int]#

The memory limit for the command hosts.

cerebras.pytorch.backends.csx.debug.wgt_memory_gi: Optional[int]#

The memory limit for the weight hosts.

Creation Ops#

Can be used to lazily initialize tensors with known shape, dtype and value to avoid have them unnecessarily take up memory.

full#

cerebras.pytorch.full(shape, value, dtype=None)[source]#

Returns an lazily initialized tensor filled with the provided value.

Parameters
  • shape – The shape of the tensor.

  • value (float) – The value to fill the tensor with.

  • dtype – The dtype of the tensor.

full_like#

cerebras.pytorch.full_like(other, value, dtype=None)[source]#

Returns an lazily initialized full tensor with the same properties as the provided tensor.

Parameters
  • other (torch.Tensor) – The tensor to copy the properties from

  • value (float) – The value to fill the tensor with

  • dtype – The dtype of the tensor. If not provided, the dtype of the other tensor is used

ones#

cerebras.pytorch.ones(shape, dtype=None)[source]#

Returns an lazily initialized tensor filled with ones.

Parameters
  • shape – The shape of the tensor

  • dtype – The dtype of the tensor

ones_like#

cerebras.pytorch.ones_like(other, dtype=None)[source]#

Returns an lazily initialized tensor full of ones with the same properties as the provided tensor.

Parameters
  • other (torch.Tensor) – The tensor to copy the properties from

  • dtype – The dtype of the tensor. If not provided, the dtype of the other tensor is used

zeros#

cerebras.pytorch.zeros(shape, dtype=None)[source]#

Returns an lazily initialized tensor filled with zeros.

Parameters
  • shape – The shape of the tensor

  • dtype – The dtype of the tensor

zeros_like#

cerebras.pytorch.zeros_like(other, dtype=None)[source]#

Returns an lazily initialized tensor full of zeros with the same properties as the provided tensor.

Parameters
  • other (torch.Tensor) – The tensor to copy the properties from

  • dtype – The dtype of the tensor. If not provided, the dtype of the other tensor is used

Checkpoint Saving/Loading utilities#

cerebras.pytorch.save(obj, checkpoint_file)[source]#

Save a PyTorch state dict to the given file.

Parameters
  • obj (dict) – The object to save.

  • checkpoint_file (str) – The path to save the object to.

cerebras.pytorch.load(checkpoint_file, map_location=None, **kwargs)#

Load a PyTorch checkpoint from a file.

Parameters
  • checkpoint_file (Union[cerebras.appliance.utils.file.StrPath, IO]) – The path to the checkpoint to load.

  • map_location (Optional[Union[str, torch.device, Callable, dict]]) – A mapping of where to load the checkpoint content to. If the map_location is None, then the tensors will be lazily loaded from the checkpoint file every single time the tensor is accessed. If the map_location is “cache”, then the tensors will be cached once they are lazily loaded from the checkpoint file. If the map location is “cpu”, then the tensors will be eagerly loaded into memory from the checkpoint file.

  • **kwargs – Additional keyword arguments to pass to the vanilla torch checkpoint loader. These are ignored if the checkpoint is a Cerebras HDF5 checkpoint.

Returns

The loaded checkpoint file.

Raises

RuntimeError – If the checkpoint file does not exist or checkpoint is not a valid HDF5 or vanilla torch checkpoint.

Return type

Any

Data Utilities#

utils.data.DataLoader#

class cerebras.pytorch.utils.data.DataLoader(*args, **kwargs)#

Wrapper around torch.utils.data.DataLoader that facilitates moving data generated by the dataloader to a Cerebras system.

Parameters
  • input_fn (Callable[[...], Union[torch.utils.data.DataLoader, Iterable]]) – A callable that returns a torch.utils.data.DataLoader instance or an iterable that returns a structure containing torch tensors.

  • *args – Any other positional or keyword arguments are passed into the input_fn when each worker instantiates their respective dataloaders

  • **kwargs

    Any other positional or keyword arguments are passed into the input_fn when each worker instantiates their respective dataloaders

state_dict()#
load_state_dict()#

Each worker will call this input function to construct their own dataloader object. This means that some data sharding scheme is required if the intent is for each worker to stream in a unique set of data.

utils.data.SyntheticDataset#

class cerebras.pytorch.utils.data.SyntheticDataset(*args, **kwargs)[source]#

A synthetic dataset that generates samples from a SampleSpec.

Constructs a SyntheticDataset instance.

A synthetic dataset can be used to generate samples on the fly with an expected dtype/shape but without needing to create a full-blown dataset. This is especially useful for compile validation.

Parameters
  • sample_spec (Union[torch.Tensor, Callable[[int], torch.Tensor], List[SampleSpecT], Tuple[SampleSpecT, ...], Dict[str, SampleSpecT], OrderedDict[str, SampleSpecT], NamedTuple]) –

    Specification of the samples to generate. This can be a nested structure of one of the following types:

    • torch.Tensor: A tensor to be cloned.

    • Callable: A callable that takes the sample index and

      returns a tensor.

    Supported data structures for holding the above leaf nodes are list, tuple, dict, OrderedDict, and NamedTuple.

  • num_samples (Optional[int]) – Total size of the dataset. If None, the dataset will generate samples indefinitely.

utils.data.DataExecutor#

class cerebras.pytorch.utils.data.DataExecutor(*args, **kwargs)#

Defines a single execution run on a Cerebras wafer scale cluster.

Parameters
  • dataloader (cerebras.appliance.log.named_class_logger) – the dataloader to use for the run

  • num_steps (Optional[int]) – the number of steps to run. Defaults to 1 if the backend was configured for compile or validate only

  • checkpoint_steps (Optional[Union[int, cerebras.pytorch.utils.data.utils.Schedule]]) –

    Checkpoint cadence. This can be: - None: To disable checkpointing. This is the default. - int: To take checkpoints at this frequency and at num_steps. - Schedule: To take checkpoints at the given arbitrary schedule. Note that when

    using a schedule object, the intervals must be zero-indexed.

  • activation_steps (Optional[int]) – the interval at which to schedule fetching activations from the cluster

  • profiler_activities (Optional[List[Type[cerebras.pytorch.utils.profiler.Activity]]]) – The list of activities to profile. By default the total samples, the client side rate and global rate are tracked and accessible via the profiler attribute.

Note

As of Cerebras Release 2.0, we don’t officially support multiple CS runs in a single process. This means that the above executor can only be run/iterated once. Any runs with different configurations must be run in separate processes.

utils.data.RestartableDataLoader#

class cerebras.pytorch.utils.data.RestartableDataLoader(*args, **kwargs)[source]#

Defines interface for the restartable dataloader protocol.

state_dict()[source]#

Use this method to specify what state information should be saved by each CSX Worker.

Returns

dict holding state information for the CSX Worker

Return type

Dict[str, Any]

In order to access Cerebras internal data checkpoint info per CSX Worker at some checkpoint step, follow the steps in the example below. Cerebras internal data checkpoint format is recorded in the DataLoaderCheckpoint dataclass.

Usage:

import cerebras.pytorch as cstorch
...
def state_dict(self) -> Dict[str, Any]:
    worker_state = cstorch.distributed.get_worker_state()
    state_dict = {}
    if worker_state:
        state_dict["worker_step"] = worker_state.worker_step
        state_dict["worker_id"] = worker_state.global_worker_id
    return state_dict

Note

The call to get_worker_state is well-defined only inside of the state_dict method; using this anywhere else will result in a RuntimeError exception. See linked docs for more details.

load_state_dict(state_dict, strict=True)[source]#

Use this method to load CSX Worker state for the dataloader instance, as captured from a previous run.

Parameters
  • state_dict (Dict[str, Any]) – dict holding worker state info, specified in deaggregate_state_dict

  • strict (bool) – Whether to enforce strict matching of the incoming state_dict. It is up to the implementation to decide what “strict matching” is.

Usage:

def load_state_dict(self, state_dict, strict=True):
    wrk_state_dict = state_dict.get("worker_0", {})

    worker_step = wrk_state_dict.get("worker_step", 0)
    worker_id = wrk_state_dict.get("worker_id")

    print(f"WRK {worker_id} loaded step: {worker_step}")
aggregate_state_dict(worker_states)[source]#

Use this method to specify how to combine the list of CSX Worker state dicts. Each CSX Worker state in the worker_states list is to be specified in state_dict.

Returns

The consolidated state dict that will be saved in a checkpoint.

Return type

Dict[str, Any]

Usage:

def aggregate_state_dict(self, worker_states):
    return {
        "worker_0": worker_states[0],
        "worker_1": worker_states[1]
    }
deaggregate_state_dict(aggregated_state_dict, strict=True)[source]#

Use this method to specify how to load an individual CSX Worker state given a consolidated list of state dicts, as specified in aggregate_state_dict.

Parameters
  • aggregated_state_dict (Dict[str, Any]) – The aggregated state dict to deaggregate.

  • strict (bool) – Whether to enforce strict matching of the incoming state_dict. It is up to the implementation to decide what “strict matching” is.

Returns

The state dict will be passed to the above-defined load_state_dict method.

Return type

Dict[str, Any]

Usage:

def deaggregate_state_dict(self, aggregated_state_dict, strict=True):
    return {
        "worker_0": aggregated_state_dict.get("worker_0", {})
    }

utils.data.DataLoaderCheckpoint#

class cerebras.pytorch.utils.data.DataLoaderCheckpoint#

Dataclass representing the Cerebras internal dataloader checkpoint format. Each CSX Worker captures its state information via this class at a checkpoint step.

Parameters
  • global_worker_id (int) – ID of this worker amongst all other workers across all boxes

  • local_worker_id (int) – ID of this worker amongst all other workers across the same box

  • total_num_workers (int) – The total number of workers for the run across all boxes

  • num_workers_per_csx (int) – The total number of workers per box for the run

  • num_csx (int) – The total number of CSXs (boxes) for the run

  • wse_id (int) – ID of the Wafer-Scale Engine (CSX) to which this worker streams data

  • appliance_step (int) – The appliance step at which this checkpoint state info is captured

  • worker_step (int) – The worker step at which this state info is captured. Note that this is simply equal to appliance_step if num_workers_per_csx = 1; for the multi-worker scenario, the appliance step is distributed across workers on a single box in a round-robin fashion based on the local worker id

  • samples_streamed (int) – The total number of samples streamed by this worker at checkpoint step. This is simply worker_step * per_box_batch_size

Note

appliance_step, worker_step and samples_streamed are the attributes that vary across different steps; whereas the other attributes provide constant state information for the current run.

get_worker_state#

cerebras.pytorch.distributed.get_worker_state()[source]#

API exposing internal state info captured by each CSX Worker for the current run at a checkpoint step. This state info is represented in the DataLoaderCheckpoint dataclass format:

Returns

DataLoaderCheckpoint instance holding worker state information at the checkpoint step

Note

  • This method may only be called inside of a custom implementation of state_dict for

dataloaders conforming to the RestartableDataLoader protocol, since state_dict is well-defined only at a checkpoint step. - Use this method to save any of the aforementioned state info recorded by each worker when defining state_dict for custom implementations of restartable dataloaders. - This state info captured by each worker is for the current run only, i.e. if you pause and restart a run, the counters gathering information returned by this function will be reset.

numpy utilities#

from_numpy#

cerebras.pytorch.from_numpy(array)[source]#

Converts a numpy array to a torch tensor.

to_numpy#

cerebras.pytorch.to_numpy(tensor)[source]#

Converts a torch tensor to a numpy array.

Tensorboard utilities#

Note

Scalars summarized using this API are only visible in Tensorboard if a SummaryWriter was passed to the DataExecutor object.

Note

Tensors summarized using this API are only visible if a SummaryWriter was passed to the DataExecutor object.

class cerebras.pytorch.utils.tensorboard.SummaryWriter(*args, **kwargs)#

Thin wrapper around torch.utils.tensorboard.SummaryWriter.

Additional features include the ability to add a tensor summary

Parameters
  • base_step (int) – The base step to use in summarize_{scalar,tensor} functions

  • *args – Any other positional and keyword arguments are forwarded directly to the base class

  • **kwargs

    Any other positional and keyword arguments are forwarded directly to the base class

add_tensor()#
class cerebras.pytorch.utils.tensorboard.SummaryReader(*args, **kwargs)#

Class for reading summaries saved using the SummaryWriter.

Parameters
  • log_dirs (str) – The directories at which the event files can be found

  • filter (Optional[str]) – If provided, filter only for scalar/tensor names that belong groups that begin with the filter string. In other words, only names matching filter*/* are kept.

  • kwargs – The remaining keyword arguments are forwarded to the internal EventMultiplexer object.

reload()#
read_scalar()#
Scalars()#
read_tensor()#
scalar_names()#
scalar_groups()#
tensor_names()#
text_summary_names()#
read_text_summary()#
Tags()#

Dataloader benchmark utilities#

cerebras.pytorch.utils.benchmark.benchmark_dataloader(input_fn, num_epochs=None, steps_per_epoch=None, sampling_frequency=None, profile_activities=None, print_metrics=True)[source]#

Utility to benchmark a dataloader.

Parameters
  • input_fn (Callable[[...], Iterable]) – Function that creates and returns a dataloader.

  • num_epochs (Optional[int]) – Number of epochs to iterate over the dataloader. If None, the dataloader is only iterated for one epoch.

  • steps_per_epoch (Optional[int]) – Number of steps to iterate over the dataloader in each epoch. If None, the dataloader is iterated in its entirety.

  • sampling_frequency (Optional[int]) – Frequency at which to sample metrics. If None, a default value of 100 (i.e. every 100 steps) is used. First step of each epoch is always sampled.

  • profile_activities (Optional[List[str]]) – List of optional activities to profile. If None, no extra activities are profiled. Note that these may incur additional overhead and could affect overall performance of the dataloader, especially if the sampling frequency is high.

  • print_metrics (bool) – Whether to pretty print the final metrics to console.

Returns

Metrics for the dataloader experiment.

Return type

cerebras.pytorch.utils.benchmark.utils.dataloader.Metrics

class cerebras.pytorch.utils.benchmark.utils.dataloader.Metrics(dataloader_build_time=<factory>, epoch_metrics=<factory>, batch_specs=<factory>, total_time=<factory>, global_rate=0.0, is_partial=True, start_time_ns=<factory>, end_time_ns=0)[source]#

Metrics for a single dataloader experiment.

Parameters
  • dataloader_build_time (numpy.timedelta64) – Time to build the dataloader.

  • epoch_metrics (List[cerebras.pytorch.utils.benchmark.utils.dataloader.EpochMetrics]) – List of metrics for each epoch.

  • batch_specs (Dict[cerebras.pytorch.utils.benchmark.utils.dataloader.BatchSpec, cerebras.pytorch.utils.benchmark.utils.dataloader.BatchSpecOccurence]) – Mapping between unique batch specs found and their occurences.

  • total_time (numpy.timedelta64) – Total time to iterate through all epochs.

  • global_rate (float) – Overall global rate in steps/second.

  • is_partial (bool) – Whether the metrics are partial. This can happen if the benchmark is interrupted in the middle of execution.

  • start_time_ns (int) – Time at which the experiment started.

  • end_time_ns (int) – Time at which the experiment ended.

dataloader_build_time: numpy.timedelta64#
epoch_metrics: List[cerebras.pytorch.utils.benchmark.utils.dataloader.EpochMetrics]#
batch_specs: Dict[cerebras.pytorch.utils.benchmark.utils.dataloader.BatchSpec, cerebras.pytorch.utils.benchmark.utils.dataloader.BatchSpecOccurence]#
total_time: numpy.timedelta64#
global_rate: float = 0.0#
is_partial: bool = True#
start_time_ns: int#
end_time_ns: int = 0#
property total_steps: int#

Returns the total number of steps across all epochs.

property global_sample_rate: Optional[float]#

Returns the overall global rate in samples/second.

Note that this value only exists if all batches have the exact same structure, dtypes, and shapes. Otherwise, this value is None.

class cerebras.pytorch.utils.benchmark.utils.dataloader.EpochMetrics(iterator_creation=<factory>, iteration_time=<factory>, total_steps=0, batch_metrics=<factory>, start_time_ns=<factory>, end_time_ns=0)[source]#

Metrics for a single epoch of a dataloader experiment.

Parameters
  • iterator_creation (numpy.timedelta64) – Time to create the dataloader iterator.

  • iteration_time (numpy.timedelta64) – Time to iterate the entire epoch excluding the creation of the iterator.

  • total_steps (int) – Total number of steps in the epoch.

  • batch_metrics (List[cerebras.pytorch.utils.benchmark.utils.dataloader.BatchMetrics]) – List of metrics for batches generated in the epoch.

  • start_time_ns (int) – Time at which the epoch started.

  • end_time_ns (int) – Time at which the epoch ended.

iterator_creation: numpy.timedelta64#
iteration_time: numpy.timedelta64#
total_steps: int = 0#
batch_metrics: List[cerebras.pytorch.utils.benchmark.utils.dataloader.BatchMetrics]#
start_time_ns: int#
end_time_ns: int = 0#
property total_time: numpy.timedelta64#

Returns the total time to create and iterate the epoch.

class cerebras.pytorch.utils.benchmark.utils.dataloader.BatchMetrics(epoch_step, global_step, local_rate, global_rate, profile_activities=<factory>, sampling_time_ns=<factory>)[source]#

Metrics for a single batch of a dataloader experiment.

Parameters
  • epoch_step (int) – Epoch step at which the batch was generated.

  • global_step (int) – Global step at which the batch was generated.

  • local_rate (float) – Local rate (in steps/second) at the sampling frequency. This is the instantaneous rate (relative to previous batch) at which the batch was generated.

  • global_rate (float) – global rate (in steps/second) at the sampling frequency. This is the global rate since the start of the iterating epochs.

  • profile_activities (Dict[str, Any]) – Dictionary of profile activities and their values.

  • sampling_time_ns (int) – Time at which the batch was sampled.

epoch_step: int#
global_step: int#
local_rate: float#
global_rate: float#
profile_activities: Dict[str, Any]#
sampling_time_ns: int#