Source code for common.pytorch.summaries.tensor_summary

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Utilities for saving tensor summaries.
"""
import dataclasses
import json
import logging
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import List, Union

import torch
from tensorboard.backend.event_processing.io_wrapper import (
    IsSummaryEventsFile as is_summary_events_file,
)
from torch.utils.tensorboard import SummaryWriter

from modelzoo.common.pytorch.summaries.cb_summary import (
    CBSummary,
    DeviceOutputs,
)

_LOCK = threading.Lock()
_METADATA = "__metadata__"
_VERSION_KEY = "__version__"
_VERSION = "1.0"


[docs]class TensorSummary(CBSummary): """A class for providing tensor summaries on CS/CPU/GPU devices. In constrast to other summaries, such as scalar summaries, Tensor summaries are not written to Tensorboard events files and are not viewable in Tensorboard. Instead, they are written to files and a convenience API class is provided to load the values (see `TensorSummaryReader` class below). This class still takes in a SummaryWriter, like other summary classes, but instead of using it to save values into events files, it identifies the events file path, creates a sibling directory with a similar naming scheme, and places summarized tensor data in that directory. Users are discouraged from inspecting this directory as the implementation might change. Instead, users are encouraged to use the API provided below for loading summarized tensors. """
[docs] def __init__(self, name: str): """Constructs a `TensorSummary` instance. Args: name: Name of the summary. This is the tag that appears in TensorBoard. """ if name == _METADATA: raise ValueError( f"{_METADATA} is a reserved name. Please use a different name " f"for summaries." ) super().__init__(name) self._log_provider = _LogProvider(name)
# pylint: disable=arguments-differ
[docs] def run_on_device(self, tensor: torch.Tensor) -> DeviceOutputs: """Define the portion of the summary computation that runs on device. Args: tensor: The tensor to be summarized. Returns: An instance of `DeviceOutputs`. """ if not isinstance(tensor, torch.Tensor): raise ValueError( f"Expected a torch Tensor for tensor summary {self.name}, " f"got {type(tensor)}" ) return super().run_on_device(tensor)
# pylint: disable=arguments-differ
[docs] def run_on_host(self, tensor: torch.Tensor) -> torch.Tensor: """Runs the host portion of the summary computation. Args: tensor: The tensor to be summarized. Returns: The summarized tensor. """ return tensor
[docs] def save_on_host( self, host_outputs: torch.Tensor, writer: SummaryWriter, step: int, ) -> None: """Saves the tensor summary to events file. Args: host_outputs: The summarized tensor to write to events file. writer: A writer for writing summaries to events files. step: The current global step. """ # pylint: disable=protected-access self._log_provider.update(writer.file_writer.event_writer._file_name) curr_time = time.time_ns() filename = f"{step}.{curr_time}" filepath = self._log_provider.logdir.joinpath(filename) # Write the tensor data to file torch.save( TensorDescriptor( step=int(step), ns_since_epoch=curr_time, tensor=host_outputs.detach(), ).to_dict(), str(filepath), )
[docs]def tensor_summary(name: str, tensor: torch.Tensor): """Convenience method for creating and running tensor summaries. This method searches registered summaries for the given name. If one is found, it uses it. Otherwise, it creates a new summary and runs the tensor through that summary. Args: name: Name of the summary. This is the tag that appears in TensorBoard. tensor: The tensor to be summarized. """ summary = TensorSummary(name) # Run the summary op summary(tensor)
[docs]@dataclasses.dataclass(frozen=True) class TensorDescriptor: """Descriptor for a summarized tensor. Args: step: Step at which the tensor was summarized. ns_since_epoch: Nanoseconds since "epoch" (e.g., UNIX time). tensor: The summarized tensor. """ step: int ns_since_epoch: int tensor: torch.Tensor @property def utctime(self) -> datetime: """Returns the UTC time when this tensor was saved.""" return datetime.utcfromtimestamp(float(self.ns_since_epoch) / 1e9)
[docs] def to_dict(self) -> dict: """Returns the descriptor converted to a dict.""" return dataclasses.asdict(self)
[docs] @staticmethod def from_dict(values) -> "TensorDescriptor": """Returns a descriptor from a dict of values.""" return TensorDescriptor(**values)
[docs]class TensorSummaryReader: """Class for reading summarized tensors. This class works in tandem with `TensorSummary` defined above. It provides general convenience APIs for inspecting tensor summaries produced by a run. Currently this class does not do any caching. So it can be used to inspect a live run. As more data becomes available, calling the APIs will reload the latest values. """
[docs] def __init__(self, path: str): """Constructs a `TensorSummaryReader` instance. Args: path: Path to a Tensorboard events file or a directory containing Tensorboard events files. Location of tensor summaries are inferred from these events files as there is a one-to-one mapping from Tensorboard events files and tensor summary directories. """ self._path = path self._summary_dirs: List[Path] = [] event_files = self._discover_event_files(self._path) cb_summaries = self._discover_cerebras_summary_dirs(event_files) self._summary_dirs = self._discover_tensor_summary_dirs(cb_summaries) if not self._summary_dirs: logging.warning( f"Could not find any tensor summaries in {self._path}" )
[docs] def load( self, name: str, step: int, latest_only: bool = True ) -> Union[TensorDescriptor, List[TensorDescriptor], None]: """Loads and returns tensor(s) with given name at the given step. Args: name: Name of the tensor. step: Step at which the tensor was summarized. latest_only: If False, return all if there are multiple tensors with the same name and step. If True, only return the latest value. Returns: A single tensor, multiple tensors, or no tensors matching the given name and step. """ descriptors = [] for summary_dir in self._summary_dirs: for path in summary_dir.joinpath(name).glob(f"{step}.*"): if path.exists(): content = torch.load(str(path)) descriptors.append(TensorDescriptor.from_dict(content)) if not descriptors: logging.warning( f"No tensor with name {name} has been summarized at step {step}" ) return None if not latest_only: return descriptors else: if len(descriptors) > 1: logging.warning( f"Multiple summarized tensors with name {name} found at " f"step {step}. Returning the latest one." ) descriptors.sort(key=lambda x: x.ns_since_epoch) return descriptors[-1]
[docs] def names(self) -> List[str]: """Returns a list of available tensor names.""" names = set() for summary_dir in self._summary_dirs: for subdir in summary_dir.glob("*"): if subdir == _METADATA: continue if subdir.is_dir(): names.add(subdir.name) return sorted(names)
@staticmethod def _discover_cerebras_summary_dirs(event_files: List[Path]) -> List[Path]: """Returns root cerebras summary dirs corresponding to given events. Args: event_files: List of event files to find matching tensor summary directories for. Returns: List of root cerebras summary directories. """ cb_summary_dirs = [] for events_file in event_files: root = _LogProvider.get_cb_summaries_root(events_file) add = True if not root.exists(): logging.warning( f"No Cerebras summary directories were found for events " f"file: {events_file}" ) add = False if not root.is_dir(): logging.warning( f"Expected {root} to be a directory containing Cerebras " f"summaries, but it is a file. Skipping as it has an " f"unknown format." ) add = False version = _read_version(root) if version is None: logging.warning( f"Could not detect version of Cerebras summaries at " f"directory {root}. This may lead to unexpected behavior." ) if version != _VERSION: logging.warning( f"Unknown version {version} for Cerebras summaries at " f"directory {root}. Skipping this directory." ) add = False if add: cb_summary_dirs.append(root) return cb_summary_dirs @staticmethod def _discover_tensor_summary_dirs( cb_summary_dirs: List[Path], ) -> List[Path]: """Returns tensor summary dirs corresponding to given Cerebras rootdirs. Args: cb_summary_dirs: List of root directories containing Cerebras specific summaries. Returns: List of tensor summary directories. """ tensor_summary_dirs = [] for cb_summary_dir in cb_summary_dirs: root = _LogProvider.get_tensor_summaries_root(cb_summary_dir) add = True if not root.exists(): logging.warning( f"No tensor summaries were found in directory: " f"{cb_summary_dir}" ) add = False if not root.is_dir(): logging.warning( f"Expected {root} to be a directory containing Tensor " f"summaries, but it is a file. Skipping as it has an " f"unknown format." ) add = False if add: tensor_summary_dirs.append(root) return tensor_summary_dirs @staticmethod def _discover_event_files(path: str) -> List[Path]: """Returns all events files in the given directory. Args: path: Path to an events file or directory containing events files. Returns: List of all events files in the given directory, or the path itself if it is an events file. """ event_files = [] path = Path(path) if path.is_file(): if not is_summary_events_file(str(path)): raise ValueError(f"Path {path} is not a summary events file") event_files.append(path) elif path.is_dir(): for subpath in path.glob("*"): if is_summary_events_file(str(subpath)): event_files.append(subpath) if not event_files: raise ValueError( f"No summary events files found in directory {path}" ) else: raise ValueError(f"Path {path} is neither a file nor a directory") return sorted(event_files)
class _LogProvider: """Class for providing log directories for saving tensor summary data.""" def __init__(self, name: str): """Constructs a `_LogProvider` instance. Args: name: The summary name. """ self._events_file: str = None self._logdir: Path = None self._rel_logdir: Path = None self._name = name @property def logdir(self) -> Path: """Returns absolute path to directory for writing tensor data.""" assert ( self._logdir ), "Log provider has not been tied to a SummaryWriter yet" return self._logdir @property def rel_logdir(self) -> Path: """Returns relative path to directory for writing tensor data. This is relative to the directory where the corresponding tensorboard events file resides. """ assert ( self._logdir ), "Log provider has not been tied to a SummaryWriter yet" return self._rel_logdir @staticmethod def get_cb_summaries_root(events_file: Union[str, Path]) -> Path: """Returns a root directory for placing tensor summaries.""" events_file = Path(events_file).resolve(strict=True) dirname = events_file.name.replace( "events.out.tfevents.", "events.out.cbevents." ) return events_file.parent.joinpath(dirname) @staticmethod def get_tensor_summaries_root(cb_summaries_root: Union[str, Path]) -> Path: """Returns a root directory for placing Cerebras-specific summaries.""" return cb_summaries_root.joinpath("tensors") def update(self, events_file: str) -> None: """Updates log paths based on the current tensorboard events file.""" if events_file == self._events_file: # Same events file as before, nothing to do update return self._events_file = events_file events_file = Path(events_file).resolve(strict=True) # First create the Cerebras summaries root directory rootdir = self.get_cb_summaries_root(events_file) rootdir.mkdir(parents=True, exist_ok=True) # Write metadata file with _LOCK: metadata = rootdir.joinpath(_METADATA) if not metadata.exists(): with metadata.open("w") as f: json.dump({_VERSION_KEY: _VERSION}, f, indent=True) # Now create the tensor summaries directory summaries_dir = self.get_tensor_summaries_root(rootdir) summaries_dir.mkdir(parents=True, exist_ok=True) # Now create a directory for this specific tensor summary self._logdir = summaries_dir.joinpath(self._name) self._logdir.mkdir(parents=True, exist_ok=True) self._rel_logdir = self._logdir.relative_to(Path(events_file).parent) def _read_version(rootdir: Path) -> Union[str, None]: """Reads the tensor summary version from summaries root dir. Args: rootdir: The root directory where tensor summaries are written to. Returns: The version string or None if it can't be found. """ version_file = rootdir.joinpath(_METADATA) if version_file.exists(): with version_file.open("r") as f: content = json.load(f) if _VERSION_KEY in content: return content[_VERSION_KEY] return None