Source code for cerebras.modelzoo.trainer.trainer

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The ModelZoo Trainer class is the main entry point for training models in ModelZoo.
It is responsible for setting up the training environment, running the training/validation loop,
and saving the checkpoint.
"""

from __future__ import annotations

import os
import uuid
from collections import Counter, OrderedDict
from contextlib import ExitStack, contextmanager, nullcontext
from copy import copy
from functools import wraps
from logging import Logger as PythonLogger
from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Optional, Union, final
from warnings import warn
from weakref import finalize

import torch

import cerebras.pytorch as cstorch
from cerebras.modelzoo.trainer.callbacks import (
    GLOBAL_CALLBACK_REGISTRY,
    ArtifactDirCallback,
    BackendCallback,
    Callback,
    Checkpoint,
    DataLoaderCallback,
    GradientAccumulationCallback,
    Logging,
    LoopCallback,
    ModelCallback,
    OptimizerCallback,
    Precision,
    Reproducibility,
    SchedulersCallback,
    SchedulersInput,
    SparsityCallback,
    TrainingLoop,
    ValidationCallback,
    ValidationLoop,
)
from cerebras.modelzoo.trainer.loggers import Logger
from cerebras.modelzoo.trainer.utils import convert_output_to_dict
from cerebras.pytorch.backend import Backend
from cerebras.pytorch.optim import Optimizer
from cerebras.pytorch.sparse import SparsityAlgorithm

UNSPECIFIED = object()


[docs]class Trainer: """The Trainer class is the main entry point for training models in ModelZoo.""" @final def __init__( self, device: Optional[str] = None, backend: Optional[Backend] = None, model_dir: str = UNSPECIFIED, model: Union[ Callable[[], torch.nn.Module], torch.nn.Module ] = UNSPECIFIED, optimizer: Union[ Optimizer, Callable[[torch.nn.Module], Optimizer], None, ] = None, schedulers: SchedulersInput = None, precision: Optional[Precision] = None, sparsity: Optional[SparsityAlgorithm] = None, # Training args loop: Optional[LoopCallback] = None, checkpoint: Optional[Checkpoint] = None, logging: Optional[Logging] = None, # Trainer args callbacks: Optional[List[Callback]] = None, loggers: Optional[List[Logger]] = None, seed: Optional[int] = None, ): """ Args: device: The device to train the model on. It must be one of "CSX", "CPU", or "GPU". backend: The backend used to train the model. This argument is mutually exclusive with `device`. model_dir: The directory where the model artifacts are saved. model: The model to train. It must be one of the following: - If a callable is passed, it is assumed to be a function that takes in no arguments returns a torch.nn.Module. - If a torch.nn.Module is passed, it is used as is. optimizer: The optimizer used to optimize the model. It must be one of the following: - If a :py:class:`~cerebras.pytorch.optim.Optimizer` is passed, it is used as is. - If a callable is passed, it is assumed to be a function that takes in a torch.nn.Module and returns a :py:class:`~cerebras.pytorch.optim.Optimizer`. - If not passed, then assume that only validation will be run. schedulers: The set of optimizer schedulers to be used. Common schedulers include LR schedulers. It must be a list of these items: - If a cstorch.optim.scheduler.Scheduler is passed, it is used as is. - A callable that is assumed to be a function that takes in a :py:class:`~cerebras.pytorch.optim.Optimizer` and returns a cstorch.optim.scheduler.Scheduler. - If None, there is no optimizer param group scheduling. precision: The Precision callback used during training sparsity: The sparsity algorithm used to sparsify weights during training/validation It must be one of the following: - If a callable is passed, it is assumed to be a function that takes in no arguments returns a :py:class:`~cerebras.pytorch.sparse.SparsityAlgorithm`. - If a :py:class:`~cerebras.pytorch.sparse.SparsityAlgorithm` is passed, it is used as is. loop: The loop callback to use for training. It must be an instance of LoopCallback. If not provided, the default loop is TrainingLoop(num_epochs=1). checkpoint: The checkpoint callback to use for saving/loading checkpoints. It must be an instance of Checkpoints. If not provided, then no checkpoints are saved. logging: The logging callback used to set up python logging. This callback also controls when logs are supposed to be logged. If not provided, the default logging settings ``Logging(log_steps=1, log_level="INFO")`` are used. callbacks: A list of callbacks to used by the trainer. The order in which the callbacks are provided is important as it determines the order in which the callback's hooks are executed. loggers: A list of loggers to use for logging. seed: Initial seed for the torch random number generator. """ super().__init__() if model_dir is UNSPECIFIED: raise ValueError("model_dir is a required argument") self.model_dir = Path(model_dir) if model is UNSPECIFIED: raise ValueError("model is a required argument") if not isinstance(model, torch.nn.Module) and isinstance( optimizer, cstorch.optim.Optimizer ): raise ValueError( f"Expected optimizer to be a callable that takes in a torch.nn.Module " f"and returns a cstorch.optim.Optimizer. Got: {type(optimizer)}" ) # Attributes set by core callbacks self.artifact_dir: Path self.summary_dir: Path self.backend: "cstorch.backend.Backend" self.model: torch.nn.Module self.compiled_model: Callable self.dataloader: cstorch.utils.data.DataLoader self.optimizer: Optional[cstorch.optim.Optimizer] self.schedulers: Optional[List[cstorch.optim.scheduler.Scheduler]] self.executor: cstorch.utils.data.DataExecutor self.global_step: int # Other attributes that callbacks may set self.activation_steps: Optional[int] = None if precision is not None and not isinstance(precision, Precision): raise TypeError( f"Expected precision to be an instance of Precision. " f"Got: {type(precision)}" ) if loop is None: loop = TrainingLoop(num_epochs=1) elif not isinstance(loop, LoopCallback): raise TypeError( f"Expected loop to be an instance of LoopCallback." f"Got: {type(loop)}" ) if checkpoint is None: checkpoint = Checkpoint() elif not isinstance(checkpoint, Checkpoint): raise TypeError( f"Expected checkpoint to be an instance of Checkpoint. " f"Got: {type(checkpoint)}" ) if logging is None: logging = Logging(log_steps=1, log_level="INFO") elif not isinstance(logging, Logging): raise TypeError( f"Expected logging to be an instance of Logging. " f"Got: {type(logging)}" ) # Order of the core callbacks is important and should not be changed self.callbacks = OrderedDict( { "artifact_dir": ArtifactDirCallback(), "reproducibility": Reproducibility(seed), "backend": BackendCallback(backend, device), "model": ModelCallback(model), "dataloader": DataLoaderCallback(), "optimizer": OptimizerCallback(optimizer), "schedulers": SchedulersCallback(schedulers), "precision": precision, "sparsity": SparsityCallback(sparsity), "loop": loop, "logging": logging, "grad_accum": GradientAccumulationCallback(), "checkpoint": checkpoint, } ) core_callback_types = set( type(callback) for callback in self.callbacks.values() if callback is not None ) | {Precision, LoopCallback} core_callback_types = tuple(core_callback_types) user_callbacks = callbacks or [] for callback in user_callbacks: if isinstance(callback, Logger): warn( f"Passed logger {type(callback)} as a `callback`. " f"It will not be used by trainer.log_metrics(). " f"To use for logging metrics, pass it as a `logger` " f"instead." ) self.loggers = loggers or [] for logger in self.loggers: if not isinstance(logger, Logger): raise TypeError( f"logger must be an instance of Logger. Got: {type(logger)}" ) counter = Counter(self.callbacks.keys()) def get_name(callback): name = type(callback).__name__ counter[name] += 1 if counter[name] > 1: return f"{name}_{counter[name]}" return name for callback in user_callbacks + self.loggers: if isinstance(callback, core_callback_types): raise ValueError( f"Callback {type(callback).__name__} is a core callback " f"and cannot be overridden" ) self.callbacks[get_name(callback)] = callback # Checkpoint should be the last callback as saving a checkpoint is slow self.callbacks.move_to_end("checkpoint") # Whitelist of non-standard hooks that callbacks can implement self.non_standard_hooks_whitelist = set() # ID map for validation dataloaders self._name_scope_stack = [] self._val_dataloader_id_map = {} self._val_dataloader_id = 0 self.call("pre_setup") self.call("setup") def callback_finalize(callbacks): for callback in callbacks: if callback is not None: callback.finalize() self._finalizer = finalize( self, callback_finalize, self.callbacks.values() ) @property def all_callbacks(self) -> Generator[Callback, None, None]: """Get all callback objects available to the trainer.""" yield from self.callbacks.values() yield from GLOBAL_CALLBACK_REGISTRY.values()
[docs] def get_callbacks( self, callback_type: type ) -> Generator[Callback, None, None]: """Get all callbacks of the given type.""" for callback in self.all_callbacks: if isinstance(callback, callback_type): yield callback
[docs] def get_callback(self, callback_type: type) -> Optional[Callback]: """Get the first callback of the given type.""" return next(self.get_callbacks(callback_type), None)
@property def validation_callbacks(self) -> List[ValidationCallback]: """Returns all validation callbacks in the Trainer's callback list.""" return list(self.get_callbacks(ValidationCallback))
[docs] def call(self, hook_name: str, *args, **kwargs): """ Call the hook with name hook_name for all callbacks in the Trainer's callback list as well as the callbacks in the global registry. The callback's method is passed in the trainer object itself as well as any args and kwargs that are passed into this method. e.g. .. code: python getattr(callback, hook_name)(self, *args, **kwargs) Args: hook_name: The name of the hook to call. It must be the name of a method in the Callback class. args: Other positional arguments to forward along to the called hook. kwargs: Other keyword arguments to forward along to the called hook. """ seen = set() for callback in self.all_callbacks: if callback is None: continue if callback in seen: warn(f"Duplicate callback found in the list: {callback}") else: seen.add(callback) hook = getattr(callback, hook_name, None) if hook: try: hook(self, *args, **kwargs) except Exception as e: # TODO(SW-128935): Add this to the exception notes once # we support python 3.11 raise RuntimeError( f"Encountered error when calling " f"{type(callback).__name__}.{hook_name}" ) from e elif hook_name not in self.non_standard_hooks_whitelist: raise AttributeError( f"Callback {type(callback)} does not implement {hook_name}" )
@property def precision(self) -> Optional[Precision]: """Returns the precision callback instance if it exists.""" return self.callbacks["precision"] @property def grad_accum(self) -> GradientAccumulationCallback: """Returns the gradient accumulation callback instance.""" return self.callbacks["grad_accum"] @property def should_run_optimizer_step(self) -> bool: """Returns True if we should run the optimizer step. The gradient accumulation callback may set this to False if we are accumulating gradients and have not reached the accumulation steps. Note, this only applies to CPU/GPU runs. """ return self.grad_accum.should_run_optimizer_step @property def loop(self) -> LoopCallback: """Returns the default loop settings.""" return self.callbacks["loop"] @property def checkpoint(self) -> Checkpoint: """Returns the checkpoint callback.""" return self.callbacks["checkpoint"] @property def logging(self) -> Checkpoint: """Returns the logging callback.""" return self.callbacks["logging"] @property def logger(self) -> PythonLogger: """Returns the Trainer's Python logger object.""" return self.logging.logger @property def is_log_step(self) -> bool: """Returns True if the current step is a log step.""" return self.logging.is_log_step(self) @property def is_first_iteration(self) -> bool: """Returns True if the executor is on its first iteration.""" return self.executor and self.executor.iteration == 0 @property def is_final_iteration(self) -> bool: """Returns True if the executor is on its final iteration.""" return self.executor and self.executor.on_final_iteration @final @property def is_tracing(self) -> bool: """Returns True if we are currently tracing the model.""" return self.backend.is_tracing
[docs] @final @cstorch.step_closure def log_metrics_in_step_closure(self, **kwargs): """Log the given kwargs inside a step closure.""" # This is a sanity check and should never assert assert not self.is_tracing self.log_metrics(**kwargs)
[docs] @final def log_metrics(self, **kwargs): """ Log the given kwargs to all loggers. Example usage: .. code:: python trainer.log_metrics(loss=loss.item()) Args: kwargs: The key-value pairs to log. """ if self.is_tracing: # If we are tracing, log the kwargs inside a step closure self.log_metrics_in_step_closure(**kwargs) else: # If we're not inside a logging step and an executor is not # active, don't send to loggers. The executor check is to # handle the case where a user wants to log something after # execution. if self.executor is not None and not self.is_log_step: return if not self.loggers: warn( "No loggers are attached to the trainer. " "Call to trainer.log_metrics() will be a no-op." ) from cerebras.modelzoo.trainer.loggers import TensorBoardLogger # Don't add prefixes to metric names if the tensorboard logger # is configured to output logs to legacy event directories if any( logger.legacy_event_dirs for logger in self.get_callbacks(TensorBoardLogger) ): prefix = "" else: prefix = self.name_scope_path # Otherwise, log the kwargs directly for logger in self.loggers: logger.log_metrics( {f"{prefix}{k}": v for k, v in kwargs.items()}, step=self.global_step, )
[docs] @final @contextmanager def name_scope(self, name: str): """Append name to the trainer's name scope stack whilst inside the context. Args: name: The name to append to the name scope stack. """ try: self._name_scope_stack.append(name) yield name finally: self._name_scope_stack.pop()
@final @property def name_scope_path(self) -> str: """Returns the current name scope path. This is the the name scope stack joined by '/'. """ return os.path.join(*self._name_scope_stack, "")
[docs] @final def get_val_dataloader_scope(self, val_dataloader): """Get the name scope for the given val dataloader""" if val_dataloader.id not in self._val_dataloader_id_map: self._val_dataloader_id_map[val_dataloader.id] = ( f"validate_{self._val_dataloader_id}" ) self._val_dataloader_id += 1 return self._val_dataloader_id_map[val_dataloader.id]
[docs] @final @cstorch.trace def training_step(self, batch) -> Dict[str, Any]: """Run a single training step on the given batch.. Note that if retrace is off, content of this method will only run on the first iteration. So any inputs to this method must either be non-changing or torch tensors. Args: batch: The batch of data to train on. batch_idx: The index of the batch in the dataloader. Returns: A dictionary containing the loss and any other outputs. """ outputs = self.forward(batch) self.backward(outputs) # Only run the optimizer step if an optimizer was defined # and we should run the optimizer step if self.optimizer and self.should_run_optimizer_step: self.optimizer_step() self.optimizer_zero_grad() self.schedulers_step() return outputs
[docs] @final def forward(self, batch) -> Dict[str, Any]: """ Run the forward pass on the given batch. Args: batch: The batch of data to run the forward pass on. Returns: A dictionary containing the loss and any other outputs. """ if self.precision: ctx = self.precision.autocast_context_manager() else: ctx = nullcontext() with ctx: args = [batch] kwargs = {} self.call("on_before_forward", self.model, batch, args, kwargs) output = self.compiled_model(*args, **kwargs) outputs = convert_output_to_dict(output) self.call("on_after_forward", self.model, outputs, batch) return outputs
[docs] @final def backward(self, outputs: dict): """ Run the backward pass on the given loss. Args: outputs: The outputs of the model. Expect key 'loss' to be present. """ self.call("on_before_backward", self.model, outputs) loss = outputs["loss"] if self.precision: self.precision.backward(loss) else: loss.backward() self.call("on_after_backward", self.model, outputs)
[docs] @final def optimizer_step(self): """Run the optimizer step.""" self.call("on_before_optimizer_step", self.model, self.optimizer) if self.precision: self.precision.clip_gradients(self.optimizer) self.precision.optimizer_step(self.optimizer) else: self.optimizer.step() self.call("on_after_optimizer_step", self.model, self.optimizer)
[docs] @final def optimizer_zero_grad(self): """Zero the gradients of the optimizer.""" self.call("on_before_optimizer_zero_grad", self.model, self.optimizer) self.optimizer.zero_grad() self.call("on_after_optimizer_zero_grad", self.model, self.optimizer)
[docs] @final def schedulers_step(self): """Step all the schedulers.""" if not self.schedulers: return for scheduler in self.schedulers: self.call( "on_before_scheduler_step", self.model, self.optimizer, scheduler, ) scheduler.step() self.call( "on_after_scheduler_step", self.model, self.optimizer, scheduler, )
[docs] @contextmanager def on_exception(self, hook): """Context manager to handle exceptions in the given hook. Args: hook: The hook to handle exceptions for. """ try: yield except Exception as e: try: self.call(f"on_{hook}_exception", e) except Exception as e2: raise e2 from e raise
[docs] @final def fit( self, train_dataloader: cstorch.utils.data.DataLoader, val_dataloader: Union[ cstorch.utils.data.DataLoader, List[cstorch.utils.data.DataLoader], None, ] = None, ckpt_path: Optional[str] = UNSPECIFIED, ): """Complete a full training run on the given train and validation dataloaders. Args: train_dataloader: The training dataloader. val_dataloader: The validation dataloader. If provided, validation is run every `eval_frequency` steps as defined in the loop callback. If not provided, only training is run. If a list of dataloaders is provided, then each dataloader is validated in sequence. ckpt_path: The path to the checkpoint to load before starting training. If not provided and `autoload_last_checkpoint` is True, then the latest checkpoint is loaded """ loop = self.loop if not isinstance(loop, TrainingLoop): raise TypeError( f"Expected loop to be an instance of TrainingLoop. " f"Got: {type(loop)}" ) with ExitStack() as fit_stack: self.call( "on_enter_fit", fit_stack, train_dataloader, val_dataloader, loop, ) self.load_checkpoint(ckpt_path) self.call("on_fit_start", train_dataloader, val_dataloader, loop) for loop_idx in range(loop.num_trains): self._run_train(train_dataloader, loop, loop_idx) if loop.eval_frequency is not None and loop.eval_frequency != 0: # Run upstream and downstream validation after each training iteration self._validate_all( val_dataloader, ckpt_paths=None, loop=None, # pylint: disable=cell-var-from-loop run_validation=lambda: self.call( "run_validation", loop_idx=loop_idx, is_last=loop_idx == loop.num_trains - 1, ), ) self.call("on_fit_end", loop)
@final def _run_train(self, train_dataloader, loop, loop_idx=0): if not isinstance(loop, TrainingLoop): raise TypeError( f"Expected loop to be an instance of TrainingLoop. " f"Got: {type(loop)}" ) with ExitStack() as stack: stack.enter_context(self.name_scope("train")) self.call("on_enter_train", stack, train_dataloader, loop, loop_idx) self.call( "on_train_start", self.model, train_dataloader, loop, loop_idx ) self.executor = cstorch.utils.data.DataExecutor( train_dataloader, num_steps=loop.train_steps, checkpoint_steps=loop.checkpoint_steps, activation_steps=self.activation_steps, profiler_activities=[], # Don't use data executor's profiler ) for batch_idx, batch in enumerate(self.executor): self.call("on_train_batch_start", self.model, batch, batch_idx) outputs = self.training_step(batch) self.call( "on_train_batch_end", self.model, outputs, batch, batch_idx ) self.call("on_train_end", self.model, loop, loop_idx) self.executor = None @final @cstorch.trace @torch.no_grad() def validation_step(self, batch: Any) -> Dict[str, Any]: """Run a single validation step on the given batch and batch index. Note that if retrace is off, content of this method will only run on the first iteration. So any inputs to this method must either be constant or torch tensors. Args: batch: The batch of data to validate on. Returns: A dictionary containing the loss and any other outputs. """ return self.forward(batch)
[docs] @final def validate( self, val_dataloader: Optional[cstorch.utils.data.DataLoader] = None, ckpt_path: Optional[str] = UNSPECIFIED, loop: Optional[ValidationLoop] = None, ): """Complete a full validation run on the validation dataloader. Args: val_dataloader: The validation dataloader. If a list of dataloaders is provided, then each dataloader is validated in sequence. ckpt_path: The path to the checkpoint to load before starting validation. If not provided and `autoload_last_checkpoint` is True, then the latest checkpoint is loaded. loop: The loop callback to use for validation. If not provided, the default loop is used. If provided, it must be an instance of ValidationLoop. Note, this should only be provided if the loop callback provided in the constructor is not sufficient. """ if not isinstance(val_dataloader, cstorch.utils.data.DataLoader): raise TypeError( f"val_dataloader must be a cstorch.utils.data.DataLoader. " f"Got {type(val_dataloader)}" ) if not loop: loop = self.loop if isinstance(loop, TrainingLoop): loop = loop.val_loop if not isinstance(loop, ValidationLoop): raise TypeError( f"Expected loop to be an instance of ValidationLoop. " f"Got: {type(loop)}" ) with ExitStack() as stack: stack.enter_context( self.name_scope(self.get_val_dataloader_scope(val_dataloader)) ) if loop not in self.all_callbacks: stack.enter_context(loop) self.call("on_enter_validate", stack, val_dataloader, loop) self.load_checkpoint(ckpt_path) self.call(loop.on_start_hook, self.model, val_dataloader, loop) # For every validation run we want to iterate the dataloader from # the scratch, so we make a shallow copy of the validation dataloader, # so the streamer will treat it as new dataloader. self.executor = cstorch.utils.data.DataExecutor( copy(val_dataloader), num_steps=loop.eval_steps, profiler_activities=[], # Don't use data executor's profiler ) for batch_idx, batch in enumerate(self.executor): self.call( loop.on_batch_start_hook, self.model, batch, batch_idx ) outputs = self.validation_step(batch) self.call( loop.on_batch_end_hook, self.model, outputs, batch, batch_idx, ) self.call(loop.on_end_hook, self.model, loop) self.executor = None
[docs] @final def validate_all( self, val_dataloaders: Union[ List[cstorch.utils.data.DataLoader], cstorch.utils.data.DataLoader, None, ] = None, ckpt_paths: Union[List[str], str, None] = UNSPECIFIED, loop: Optional[ValidationLoop] = None, ): """ Runs all upstream and downstream validation permutations. .. code:: python for ckpt_path in ckpt_paths: for val_dataloader in val_dataloaders: trainer.validate(val_dataloader, ckpt_path) # run downstream validation run_validation(...) Args: val_dataloaders: A list of validation dataloaders to run validation on. ckpt_paths: A list of checkpoint paths to run validation on. Each checkpoint path must be a path to a checkpoint file, or a glob pattern. loop: The validation loop to use for validation. If not provided, then the default loop is used. """ self._validate_all( val_dataloaders, ckpt_paths, loop, run_validation=lambda: self.call( "run_validation", loop_idx=None, is_last=True ), )
@final @wraps(validate_all) def _validate_all(self, val_dataloaders, ckpt_paths, loop, run_validation): if not callable(run_validation) and run_validation is not None: raise RuntimeError( f"Expected run_validation to be a callable or None. " f"Got: {run_validation}" ) if val_dataloaders is None: val_dataloaders = [] elif not isinstance(val_dataloaders, (list, tuple)): val_dataloaders = [val_dataloaders] for i, val_dataloader in enumerate(val_dataloaders): if not isinstance(val_dataloader, cstorch.utils.data.DataLoader): raise TypeError( f"Expected val_dataloader to be an instance of DataLoader. " f"Got {type(val_dataloader)} for element {i}" ) if ckpt_paths is UNSPECIFIED or ckpt_paths is None: ckpt_paths = [ckpt_paths] else: if not isinstance(ckpt_paths, (list, tuple)): ckpt_paths = [ckpt_paths] for ckpt_path in ckpt_paths: if not isinstance(ckpt_path, (str, Path)): raise ValueError( f"Expected ckpt_path to be a path to a checkpoint file, " f"or a glob pattern. Got: {ckpt_path}" ) ckpt_path = Path(ckpt_path) if not any(ckpt_path.parent.glob(ckpt_path.name)): raise FileNotFoundError( f"Checkpoint file(s) not found at: {ckpt_path}" ) # Flatten all ckpt paths into a list ckpt_paths = ( checkpoint_file for ckpt_path in map(Path, ckpt_paths) for checkpoint_file in ckpt_path.parent.glob(ckpt_path.name) ) with ExitStack() as stack: self.call("on_enter_validate_all", stack, val_dataloaders, loop) for ckpt_path in ckpt_paths: # Load the checkpoint self.load_checkpoint(ckpt_path) # Run upstream validation for val_dataloader in val_dataloaders: self.validate(val_dataloader, ckpt_path=None, loop=loop) # Run downstream validation if run_validation is not None: run_validation()
[docs] @final @cstorch.checkpoint_closure def save_checkpoint(self): """Save a checkpoint at the current global step. The checkpoint state dict is constructed by various callbacks that implement the `on_save_checkpoint` method. """ state_dict = {} self.call("on_save_checkpoint", state_dict) self.call("postprocess_checkpoint", state_dict) ckpt_path = self.checkpoint.get_checkpoint_path( self.model_dir, self.global_step ) # atomic checkpoint save by first writing to a temp file, then renaming tmp_ckpt_path = Path(f"{ckpt_path}.{str(uuid.uuid4())[:8]}.tmp") cstorch.save(state_dict, tmp_ckpt_path) tmp_ckpt_path.rename(ckpt_path) self.call("on_after_save_checkpoint", ckpt_path)
[docs] @final def load_checkpoint(self, ckpt_path: Optional[str] = None): """Load a checkpoint from the given path. The checkpoint state dict is loaded and processed by various callbacks that implement the `on_load_checkpoint` method. Args: ckpt_path: The path to the checkpoint to load If not provided and `autoload_last_checkpoint` is True, then the latest checkpoint is loaded """ # Don't load a checkpoint if compile/validate only if not self.backend.is_e2e_execution: return if ( ckpt_path is UNSPECIFIED and self.checkpoint.autoload_last_checkpoint ): ckpt_path = self.checkpoint.get_latest_checkpoint(self) if not ckpt_path or ckpt_path is UNSPECIFIED: self.call("on_before_load_checkpoint", None) return self.call("on_before_load_checkpoint", ckpt_path) state_dict = cstorch.load(ckpt_path) self.call("preprocess_checkpoint", state_dict) self.call("on_load_checkpoint", state_dict)