Source code for cerebras.modelzoo.trainer.extensions.bigcode.bigcode_eval_harness

# Copyright 2022 Cerebras Systems.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

This module provides a callback class to run BigCode's Evaluation Harness.

import inspect
import json
import os
from collections import defaultdict
from copy import deepcopy
from dataclasses import asdict, dataclass
from functools import cached_property
from math import ceil
from typing import Any, Dict, List, Optional, Tuple, Union
from warnings import warn

from bigcode_eval import tasks as bigcode_tasks
from bigcode_eval.base import Task
from bigcode_eval.evaluator import Evaluator
from bigcode_eval.utils import update_code_gens
from lm_eval.utils import pattern_match

import cerebras.pytorch as cstorch
from cerebras.appliance.environment import appliance_environ
from cerebras.appliance.log import ClassLogger, named_class_logger
from import RequestType
from cerebras.modelzoo.trainer import Trainer
from cerebras.modelzoo.trainer.callbacks import (
from cerebras.modelzoo.trainer.callbacks.flags import _ScopedFlags
from cerebras.modelzoo.trainer.extensions.eval_harness_adapter import (

[docs]@dataclass class BigCodeCLIArgs: r"""Captures BigCode EH's CLI arguments with defaults. Fields: prefix: Prefix to add to the prompt. For example InCoder needs prefix='<| file |>\n' do_sample: Sample from the language model's output distribution. temperature: Sampling temperature used for generation. top_k: Top-k parameter used for generation. top_p: Top-p parameter used for nucleus sampling. n_samples: Number of completions to generate for each sample. seed: Random seed used for evaluation. tasks: List of tasks to evaluate code evals instruction_tokens: A series of instruction tokens used for instruction-tuning benchamrks separated by comma e.g. <user_message>,<end_user_message>,<assistant_message> max_tokens: Maximum number of tokens to generate. limit: Number of samples to solve and evaluate from the benchmark limit_start: Optional offset to start from when limiting the number of samples save_every_k_tasks: Optional saving after every k tasks postprocess: Postprocess model outputs before execution, always on except during generation tests allow_code_execution: Allow code evaluation to execute external/untrusted Python code on your machine generation_only: Do code generation but no evaluation load_generations_path: Path of file with previously generated solutions, if provided generation is skipped and only evaluation is done load_data_path: Path of additional data to load for the tasks metric_output_path: Path to save the results save_generations: Whether to save code generations load_generations_intermediate_paths: List of paths for saving the intermediate code generations save_generations_path: Path for saving the code generations save_references: Whether to save reference solutions/tests save_references_path: Path for saving the references solutions/tests prompt: Prompt type to use for generation in HumanEvalPack tasks check_references: Don't run generation but benchmark groundtruth (useful for debugging) """ # BigCode `EvalArguments` dataclass injected as into CLI prefix: str = "" do_sample: bool = True temperature: Optional[float] = None top_k: Optional[int] = None top_p: Optional[float] = None n_samples: int = 1 seed: int = 0 # Other BigCode CLI arguments tasks: Optional[Union[str, List[str]]] = None instruction_tokens: Optional[str] = None max_tokens: Optional[int] = None limit: Optional[int] = None limit_start: int = 0 save_every_k_tasks: int = -1 postprocess: bool = True allow_code_execution: bool = False generation_only: bool = True # We only run this flow by default load_generations_path: Optional[str] = None load_data_path: Optional[str] = None metric_output_path: str = "evaluation_results.json" save_generations: bool = ( True # We always save for the separate code execution flow ) load_generations_intermediate_paths: Optional[List[str]] = None save_generations_path: str = "generations.json" save_references: bool = True save_references_path: str = "references.json" prompt: str = "prompt" check_references: bool = False
@named_class_logger("BigCodeEvalHarnessRunner") class BigCodeEvalHarnessRunner(ClassLogger): """Util class for invoking BigCode's run script with CSX-specific components.""" def __init__( self, bigcode_args: BigCodeCLIArgs, ): """Constructs a `BigCodeEvalHarnessRunner` instance. Args: bigcode_args: `BigCodeCLIArgs` dataclass object capturing BCEH's CLI args """ super().__init__() self.args = deepcopy(bigcode_args) # Validate user-specified tasks if not self.task_names: raise ValueError( f"Task not found: {self.args.tasks}.\n" f"Available tasks: {','.join(bigcode_tasks.TASK_REGISTRY.keys())}" ) @cached_property def task_names(self) -> List[str]: """Returns the task names list for the specified tasks.""" if self.args.tasks is None: raise ValueError( "Need to specify a bigcode task to evaluate.\n" f"Available tasks: {','.join(bigcode_tasks.TASK_REGISTRY.keys())}" ) else: return pattern_match( self.args.tasks.split(","), bigcode_tasks.ALL_TASKS ) def evaluate(self, trainer: Trainer, evaluator: Evaluator) -> None: # pylint: disable=line-too-long """Invoke's logic from BigCode's run script on the `bigcode evaluator <bigcode_evaluator>`_. .. bigcode_evaluator: Args: trainer: Trainer object evaluator: The evaluator object (subclass of BigCode's Evaluator class) """ f"Starting BigCode evaluation harness on selected tasks: {self.task_names}" ) load_generations_intermediate_paths = ( self.args.load_generations_intermediate_paths ) if load_generations_intermediate_paths and len( load_generations_intermediate_paths ) != len(self.task_names): raise ValueError( "If passing --load_generations_intermediate_paths, " "must pass equal number of files as number of tasks" ) results = {} for idx, task in enumerate(self.task_names): if self.args.load_generations_path: raise RuntimeError( "Code evaluation mode is not yet supported. " "Please specify `--generation_only` flag to run " "bigcode's generation flow on CSX." ) elif self.args.generation_only:"Running with generation-only mode") intermediate_generations = None if load_generations_intermediate_paths: with open( load_generations_intermediate_paths[idx], "r" ) as f_in: # intermediate_generations: list[list[str | None]] of len n_tasks # where list[i] = generated codes or empty intermediate_generations = json.load(f_in) generations, references = evaluator.generate_text( task, intermediate_generations=intermediate_generations ) save_generations_path = os.path.splitext( self.args.save_generations_path )[0] save_generations_path = ( f"{save_generations_path}_{task}_{trainer.global_step}.json" ) save_references_path = os.path.splitext( self.args.save_references_path )[0] save_references_path = ( f"{save_references_path}_{task}_{trainer.global_step}.json" ) evaluator.save_json_files( generations, references, save_generations_path, save_references_path, ) else: raise RuntimeError( f"Code evaluation mode is not yet supported. " "Please specify `--generation_only` flag to run " "bigcode's generation flow on CSX." ) # Save all args to config results["config"] = asdict(self.args) if not self.args.generation_only: dumped = json.dumps(results, indent=2) with open(self.args.metric_output_path, "w") as f: f.write(dumped) class BigCodeEvaluator(CSEvalHarnessAdapter, Evaluator): """ Subclasses BigCode's `Evaluator` base class, overriding the `generate_text` method. """ def __init__( self, trainer, bigcode_args: BigCodeCLIArgs, dataloader_args: Dict[str, Any], ): """ Args: trainer: Trainer object bigcode_args: `BigCodeCLIArgs` dataclass object capturing BCEH's CLI args dataloader_args: Dict of dataloader args. """ self.args: BigCodeCLIArgs self.dataloader_args: Dict[str, Any] Evaluator.__init__(self, None, None, None, args=bigcode_args) CSEvalHarnessAdapter.__init__( self, trainer=trainer, dataloader_args=dataloader_args ) def evaluate( self, task_name: str, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, ): """Override of the BCEH's Evaluator class' method. Note: Code evaluation flow is not yet supported. """ raise NotImplementedError("Code evaluation flow is not yet supported.") def _construct_prompts( self, task: Any, dataset: Any, n_tasks: int, limit_start: int = 0, n_copies: int = 1, instruction_tokens: Optional[List[str]] = None, ) -> List[str]: """Helper from BigCode's implementaion to preprocess task dataset into a list of raw text samples. """ def _make_infill_prompt(self, prefix, suffix, preprefix=""): """Make a prompt for infilling. Currently supported only for official InCoder and SantaCoder implementations. """ model_id = self.tokenizer.name_or_path if model_id in ["facebook/incoder-1B", "facebook/incoder-6B"]: self.tokenizer.add_special_tokens({"pad_token": "<pad>"}) return f"{preprefix}{prefix}<|mask:0|>{suffix}<|mask:0|>" elif model_id in ["bigcode/santacoder"]: return f"<fim-prefix>{preprefix}{prefix}<fim-suffix>{suffix}<fim-middle>" elif model_id in ["bigcode/starcoder", "bigcode/starcoderbase"]: return f"<fim_prefix>{preprefix}{prefix}<fim_suffix>{suffix}<fim_middle>" else: raise ValueError(f"Infilling not yet supported for: {model_id}") def _make_instruction_prompt(self, instruction, context, prefix=""): """Make a prompt for instruction-tuning. Delimit instruction and context with specific tokens if provided. """ if not instruction_tokens: warn( "Instruction-tuning tokens are not provided for an " "instruction-tuning task, we will leave them empty." ) user_token, end_token, assistant_token = "", "", "\n" else: user_token, end_token, assistant_token = instruction_tokens if not user_token or not assistant_token or not end_token: warn( "Instruction-tuning tokens provided but one or more are empty. " "Ignore warning if this was intended" ) return ( prefix + user_token + instruction + end_token + assistant_token + context ) # Extract stop words stopping_criteria = [] if task.stop_words: for stop_word in task.stop_words: stopping_criteria.append(stop_word) prompts = [] infill = False instruction = False mixed_error_log = ( "Mixing tasks with infill/instruction " "and completion prompts is not supported." ) for sample in range(limit_start, limit_start + n_tasks): prompt_contents = task.get_prompt(dataset[sample]) if isinstance(prompt_contents, str): # Normal code completion mode if infill: raise ValueError(mixed_error_log) instruction = True prompt = self.args.prefix + prompt_contents elif isinstance(prompt_contents, dict): if instruction: raise ValueError(mixed_error_log) infill = True if set(prompt_contents.keys()) == {"prefix", "suffix"}: # Infilling mode (Currently supported only for official InCoder and SantaCoder # implementations.) prompt = _make_infill_prompt( **prompt_contents, preprefix=self.args.prefix ) elif set(prompt_contents.keys()) == {"instruction", "context"}: # Instruction-tuning mode prompt = _make_instruction_prompt( **prompt_contents, prefix=self.args.prefix ) else: raise ValueError( f"Unsupported prompt format: {type(prompt_contents)}" ) prompts.append((prompt, deepcopy(stopping_criteria))) return prompts def generate_on_csx( self, task: Any, prompts: List[str], gen_kwargs: Dict[str, Any], n_tasks: int, limit_start: int = 0, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, instruction_tokens: Optional[str] = None, ) -> List[List[Optional[str]]]: """Generate code samples on CSX from the given prompts. Args: task: Code evaluation task object prompts: List of raw text prompts as processed by BCEH's script gen_kwargs: Dict specifying settings for generative inference n_tasks: Number of data samples limit_start: Offset to limit the number of samples. Defaults to 0. intermediate_generations: List of previously loaded generations. Defaults to None. instruction_tokens: List of instruction tokens used for instruction-tuning benchamrks. Returns: List of generated code samples """ ( samples_file_list, dataset_size, metadata, ) = self.preprocess_dataset( prompts, request_type=RequestType.bigcode_eh, max_tokens=gen_kwargs.get("max_tokens"), ) # keep track of the list of generated codes # where len(code_gens) = n_tasks and len(code_gens[0]) = number of generated code samples code_gens: List[List[Optional[str]]] = [[] for _ in range(n_tasks)] generations = ( [] if not intermediate_generations else intermediate_generations ) # Generate tokens on appliance with GenerateTokens(metadata["requests"], gen_kwargs) as gen: self.trainer.validate( self.input_fn, self.dataloader_args, samples_file_list, dataset_size, RequestType.bigcode_eh.value, **metadata["dataset_kwargs"], ), loop=BigCodeEvalHarnessLoop(), ckpt_path=None, ) self.logger.debug(f"Output results: {gen.gen_token_dict}") code_gens = update_code_gens( task, self.tokenizer, limit_start, self.args.prefix, instruction_tokens, self.args.postprocess, code_gens, gen.gen_token_dict, ) generations.extend(code_gens) return generations def generate_text( self, task_name: str, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, ) -> Tuple[List[List[str]], List[str]]: """Override of the BCEH's Evaluator class' method. Args: task_name: Name of the BigCode task to evaluate intermediate_generations: List of intermediate generations, if loaded Returns: Tuple of list of generated code samples and list of references """ task: Task = bigcode_tasks.get_task(task_name, self.args) if ( hasattr(task, "max_length_multiplier") and task.max_length_multiplier ): raise RuntimeError( f"BigCode task {task_name} specifies a max_length_multipler " f"stopping criterion, which is currently not supported. Please " f"choose a different task." ) dataset = task.get_dataset() # if args.limit is None, use all samples # if args.limit is used, make sure args.limit_start + args.limit <= len(dataset) n_tasks = ( min(self.args.limit, len(dataset) - self.args.limit_start) if self.args.limit else len(dataset) ) # when args.limit is None # adjust n_tasks by args.limit_start to prevent out of bounds issues if not self.args.limit: n_tasks -= self.args.limit_start references = [ task.get_reference(dataset[i]) for i in range( self.args.limit_start, self.args.limit_start + n_tasks ) ] if self.args.check_references: if ( "get_solution" in inspect.signature(task.get_reference).parameters ): solutions = [ [task.get_reference(dataset[i], get_solution=True)] for i in range( self.args.limit_start, self.args.limit_start + n_tasks ) ] else: solutions = [[ref] for ref in references] return solutions, references curr_generations = [] # list[list[str | None] | None] if intermediate_generations: curr_generations = [gen for gen in intermediate_generations if gen] n_tasks -= len(curr_generations) curr_sample_idx = len(curr_generations)"Number of problems for this task is {n_tasks}") n_copies = ceil(self.args.n_samples / self.batch_size) limit_start = self.args.limit_start + curr_sample_idx if self.args.instruction_tokens: instruction_tokens = self.args.instruction_tokens.split(",") if len(instruction_tokens) != 3: raise ValueError( "Instruction tokens should contain exactly 3 tokens " "separated by a comma. If a token is empty, represent it as ''" ) for token in instruction_tokens: if token.strip() != "": task.stop_words.append(token) else: instruction_tokens = None # Set up generation settings gen_kwargs = { "do_sample": self.args.do_sample, "temperature": self.args.temperature, "top_p": self.args.top_p, "top_k": self.args.top_k, "max_tokens": self.args.max_tokens, } stopping_criteria = [] if task.stop_words: for stop_word in task.stop_words: stopping_criteria.append(stop_word) if stopping_criteria: gen_kwargs["stopping_criteria"] = stopping_criteria # Fetch list of prompts prompts = self._construct_prompts( task, dataset, n_tasks=n_tasks, limit_start=limit_start, n_copies=n_copies, instruction_tokens=instruction_tokens, ) # Generate tokens on CSX for the given prompts data generations = self.generate_on_csx( task, prompts, gen_kwargs=gen_kwargs, intermediate_generations=curr_generations, n_tasks=n_tasks, limit_start=limit_start, instruction_tokens=instruction_tokens, ) if len(generations[0]) > self.args.n_samples: generations = [g[: self.args.n_samples] for g in generations] warn( f"Number of tasks wasn't proportional to number of devices, we " f"removed extra predictions to only keep nsamples={self.args.n_samples}" ) return generations, references class BigCodeEvalHarnessLoop(ValidationLoop): """Subclass of `ValidationLoop` to run BigCode's Evaluation Harness.""" def __init__(self): """Initializes the BigCodeEvalHarnessLoop object.""" super().__init__(hook="bigcode_eval_harness") def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """ Run ValidationLoop's `on_validate_start` method to ensure that eval_steps is being computed correctly. """ model.eval() self.on_validate_start(trainer, model, val_dataloader, loop) class GenerateTokens(Callback): """ Callback class to post-process model output tokens. """ def __init__( self, metadata: List[Tuple[int, int]], gen_kwargs: Dict[str, Any], ): """ Args: metadata: List of tuples of (sample idx, prompt encoding length) for each sample in the batch gen_kwargs: Dict specifying settings for generative inference. """ self.metadata = metadata self.start_token = None self.sample_idx = 0 self.gen_token_dict = defaultdict( list ) # dict of list of generated tokens # Generation settings self.temperature = gen_kwargs.get("temperature") self.top_p = gen_kwargs.get("top_p") self.top_k = gen_kwargs.get("top_k") self.max_tokens = gen_kwargs.get("max_tokens") self.progress = EvalHarnessProgress("BigCode Generative Eval") def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """Runs before the BigCode Evaluation Harness starts.""" self.start_token = getattr(model, "start_token", None) if self.start_token is None: raise RuntimeError( "No start token specified under `model.start_token`. " "Please specify a start token for generative tasks." ) if self.max_tokens is not None: model.max_tokens = self.max_tokens if self.temperature is not None: model.temperature = self.temperature if self.top_p is not None: model.top_p = self.top_p if self.top_k is not None: model.top_k = self.top_k def on_bigcode_eval_harness_batch_end( self, trainer, model, outputs, batch, batch_idx ): """Runs after every batch is processed.""" self.progress.print(trainer, batch_idx) def on_before_forward(self, trainer, model, batch, args, kwargs): kwargs["autoregressive"] = True def on_after_forward(self, trainer, model, outputs, batch): self.post_process(predictions=outputs["output"]) @cstorch.step_closure def post_process(self, predictions): """ Post-processes the model generated output tokens. Args: predictions: Tensor of shape (batch_size, max_seq_len) containing the model's predictions """ for gen_tokens in predictions: if not self.metadata[self.sample_idx]: continue sample_idx, _ = self.metadata[self.sample_idx] assert sample_idx == self.sample_idx, "Mismatching sample indices" # Grab generation tokens try: start_token_idx = gen_tokens.tolist().index(self.start_token) gen_tokens = gen_tokens[:start_token_idx].numpy() except ValueError: # Generated string spans msl pass self.gen_token_dict[sample_idx].append(gen_tokens) self.sample_idx += 1
[docs]class BigCodeEvalHarness(ValidationCallback): """ ValidationCallback class to run BigCode's Evaluation Harness. """ id = 0 def __init__( self, # BigCode Args bigcode_args: Union[BigCodeCLIArgs, Dict[str, Any]], # Cerebras specific args keep_data_dir: bool = False, every_n_vals: int = 1, flags: Optional[dict] = None, name_scope: Optional[str] = None, # Data Args batch_size: Optional[int] = None, data_dir: Optional[str] = None, max_sequence_length: Optional[int] = None, tokenizer_file_path: Optional[str] = None, eos_id: Optional[int] = None, **dataloader_args, ): """ Args: bigcode_args: `BigCodeCLIArgs` dataclass or dict capturing BCEH's CLI args keep_data_dir: Specifies whether dumped data samples should be kept for reuse. Defaults to False, i.e. data samples are deleted after the run. every_n_vals: Run the BigCode eval harness script every N validations. e.g. If the eval_frequency is set to 200 and N=2, then BigCode eval harness runs every 400 training steps. The BigCode eval harness script will also always run after the final training iteration. flags: A optional dictionary of scoped global flags to set during the BigCode eval harness run. name_scope: An optional string that gets added to the trainer's name scope. batch_size: Batch size to BigCodeEvalHarness to preprocess input data samples from the specified eval harness tasks. data_dir: Path to data directory max_sequence_length: Maximum sequence length tokenizer_file_path: Path to tokenizer file eos_id: End of sentence token id dataloader_args: Any additional dataloader args, e.g. num_workers. """ # Handling parsing for creating trainer from yaml if isinstance(bigcode_args, dict): self.bigcode_args = BigCodeCLIArgs(**bigcode_args) else: self.bigcode_args = bigcode_args self.bceh_runner = BigCodeEvalHarnessRunner( bigcode_args=self.bigcode_args ) self.dataloader_args = dict( batch_size=batch_size, data_dir=os.path.realpath(data_dir), keep_data_dir=keep_data_dir, max_sequence_length=max_sequence_length, tokenizer_file_path=tokenizer_file_path, eos_id=eos_id, **dataloader_args, ) # Removes annoying logs relating to process forking appliance_environ["TOKENIZERS_PARALLELISM"] = "false" self.every_n_vals = every_n_vals self.scoped_flags = ScopedBigCodeEvalHarnessFlags(**(flags or {})) self._id = += 1 if name_scope is None: name_scope = f"bigcode_{self._id}" self.name_scope = name_scope
[docs] def run(self, trainer): """Run BigCode Eval Harness. Args: trainer: the Trainer object """"Running BigCode Eval Harness") # If no absolute file paths for output dumps are provided, dump inside model_dir if not os.path.isabs(self.bceh_runner.args.save_generations_path): self.bceh_runner.args.save_generations_path = os.path.join( trainer.summary_dir, trainer.name_scope_path, self.bceh_runner.args.save_generations_path, ) os.makedirs( os.path.dirname(self.bceh_runner.args.save_generations_path), exist_ok=True, ) if not os.path.isabs(self.bceh_runner.args.save_references_path): self.bceh_runner.args.save_references_path = os.path.join( trainer.summary_dir, trainer.name_scope_path, self.bceh_runner.args.save_references_path, ) os.makedirs( os.path.dirname(self.bceh_runner.args.save_references_path), exist_ok=True, ) bc_evaluator = BigCodeEvaluator( trainer, deepcopy(self.bigcode_args), deepcopy(self.dataloader_args), ) with self.scoped_flags: self.bceh_runner.evaluate(trainer=trainer, evaluator=bc_evaluator)
def run_validation(self, trainer, loop_idx, is_last): if not is_last and (loop_idx + 1) % self.every_n_vals != 0: return with trainer.name_scope(self.name_scope):
class ScopedBigCodeEvalHarnessFlags(_ScopedFlags): """ Class to set and restore global flags during the BigCode Evaluation Harness run. """ def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """Sets the global flags before the BigCode Evaluation Harness run.""" self._set_all_flags() def on_bigcode_eval_harness_end(self, trainer, model, loop): """Restores the global flags after the BigCode Evaluation Harness run.""" self._restore_all_flags()