Source code for cerebras.modelzoo.trainer.extensions.bigcode.bigcode_eval_harness

# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This module provides a callback class to run BigCode's Evaluation Harness.
"""

import inspect
import json
import os
from collections import defaultdict
from copy import deepcopy
from dataclasses import asdict, dataclass
from functools import cached_property
from math import ceil
from typing import Any, Dict, List, Optional, Tuple, Union
from warnings import warn

from bigcode_eval import tasks as bigcode_tasks
from bigcode_eval.base import Task
from bigcode_eval.evaluator import Evaluator
from bigcode_eval.utils import update_code_gens
from lm_eval.utils import pattern_match

import cerebras.pytorch as cstorch
from cerebras.appliance.environment import appliance_environ
from cerebras.appliance.log import ClassLogger, named_class_logger
from cerebras.modelzoo.data.nlp.gpt.InferenceDataProcessor import RequestType
from cerebras.modelzoo.trainer import Trainer
from cerebras.modelzoo.trainer.callbacks import (
    Callback,
    ValidationCallback,
    ValidationLoop,
)
from cerebras.modelzoo.trainer.callbacks.flags import _ScopedFlags
from cerebras.modelzoo.trainer.extensions.eval_harness_adapter import (
    CSEvalHarnessAdapter,
    EvalHarnessProgress,
)


[docs]@dataclass class BigCodeCLIArgs: r"""Captures BigCode EH's CLI arguments with defaults. Fields: prefix: Prefix to add to the prompt. For example InCoder needs prefix='<| file ext=.py |>\n' do_sample: Sample from the language model's output distribution. temperature: Sampling temperature used for generation. top_k: Top-k parameter used for generation. top_p: Top-p parameter used for nucleus sampling. n_samples: Number of completions to generate for each sample. seed: Random seed used for evaluation. tasks: List of tasks to evaluate code evals instruction_tokens: A series of instruction tokens used for instruction-tuning benchamrks separated by comma e.g. <user_message>,<end_user_message>,<assistant_message> max_tokens: Maximum number of tokens to generate. limit: Number of samples to solve and evaluate from the benchmark limit_start: Optional offset to start from when limiting the number of samples save_every_k_tasks: Optional saving after every k tasks postprocess: Postprocess model outputs before execution, always on except during generation tests allow_code_execution: Allow code evaluation to execute external/untrusted Python code on your machine generation_only: Do code generation but no evaluation load_generations_path: Path of file with previously generated solutions, if provided generation is skipped and only evaluation is done load_data_path: Path of additional data to load for the tasks metric_output_path: Path to save the results save_generations: Whether to save code generations load_generations_intermediate_paths: List of paths for saving the intermediate code generations save_generations_path: Path for saving the code generations save_references: Whether to save reference solutions/tests save_references_path: Path for saving the references solutions/tests prompt: Prompt type to use for generation in HumanEvalPack tasks check_references: Don't run generation but benchmark groundtruth (useful for debugging) """ # BigCode `EvalArguments` dataclass injected as into CLI prefix: str = "" do_sample: bool = True temperature: Optional[float] = None top_k: Optional[int] = None top_p: Optional[float] = None n_samples: int = 1 seed: int = 0 # Other BigCode CLI arguments tasks: Optional[Union[str, List[str]]] = None instruction_tokens: Optional[str] = None max_tokens: Optional[int] = None limit: Optional[int] = None limit_start: int = 0 save_every_k_tasks: int = -1 postprocess: bool = True allow_code_execution: bool = False generation_only: bool = True # We only run this flow by default load_generations_path: Optional[str] = None load_data_path: Optional[str] = None metric_output_path: str = "evaluation_results.json" save_generations: bool = ( True # We always save for the separate code execution flow ) load_generations_intermediate_paths: Optional[List[str]] = None save_generations_path: str = "generations.json" save_references: bool = True save_references_path: str = "references.json" prompt: str = "prompt" check_references: bool = False
@named_class_logger("BigCodeEvalHarnessRunner") class BigCodeEvalHarnessRunner(ClassLogger): """Util class for invoking BigCode's run script with CSX-specific components.""" def __init__( self, bigcode_args: BigCodeCLIArgs, ): """Constructs a `BigCodeEvalHarnessRunner` instance. Args: bigcode_args: `BigCodeCLIArgs` dataclass object capturing BCEH's CLI args """ super().__init__() self.args = deepcopy(bigcode_args) # Validate user-specified tasks if not self.task_names: raise ValueError( f"Task not found: {self.args.tasks}.\n" f"Available tasks: {','.join(bigcode_tasks.TASK_REGISTRY.keys())}" ) @cached_property def task_names(self) -> List[str]: """Returns the task names list for the specified tasks.""" if self.args.tasks is None: raise ValueError( "Need to specify a bigcode task to evaluate.\n" f"Available tasks: {','.join(bigcode_tasks.TASK_REGISTRY.keys())}" ) else: return pattern_match( self.args.tasks.split(","), bigcode_tasks.ALL_TASKS ) def evaluate(self, trainer: Trainer, evaluator: Evaluator) -> None: # pylint: disable=line-too-long """Invoke's logic from BigCode's run script on the `bigcode evaluator <bigcode_evaluator>`_. .. bigcode_evaluator: https://github.com/bigcode-project/bigcode-evaluation-harness/blob/a1b4a7949a24c8e3ef0d05a01097b2d14ffba56e/main.py#L372 Args: trainer: Trainer object evaluator: The evaluator object (subclass of BigCode's Evaluator class) """ self.logger.info( f"Starting BigCode evaluation harness on selected tasks: {self.task_names}" ) load_generations_intermediate_paths = ( self.args.load_generations_intermediate_paths ) if load_generations_intermediate_paths and len( load_generations_intermediate_paths ) != len(self.task_names): raise ValueError( "If passing --load_generations_intermediate_paths, " "must pass equal number of files as number of tasks" ) results = {} for idx, task in enumerate(self.task_names): if self.args.load_generations_path: raise RuntimeError( "Code evaluation mode is not yet supported. " "Please specify `--generation_only` flag to run " "bigcode's generation flow on CSX." ) elif self.args.generation_only: self.logger.info("Running with generation-only mode") intermediate_generations = None if load_generations_intermediate_paths: with open( load_generations_intermediate_paths[idx], "r" ) as f_in: # intermediate_generations: list[list[str | None]] of len n_tasks # where list[i] = generated codes or empty intermediate_generations = json.load(f_in) generations, references = evaluator.generate_text( task, intermediate_generations=intermediate_generations ) save_generations_path = os.path.splitext( self.args.save_generations_path )[0] save_generations_path = ( f"{save_generations_path}_{task}_{trainer.global_step}.json" ) save_references_path = os.path.splitext( self.args.save_references_path )[0] save_references_path = ( f"{save_references_path}_{task}_{trainer.global_step}.json" ) evaluator.save_json_files( generations, references, save_generations_path, save_references_path, ) else: raise RuntimeError( f"Code evaluation mode is not yet supported. " "Please specify `--generation_only` flag to run " "bigcode's generation flow on CSX." ) # Save all args to config results["config"] = asdict(self.args) if not self.args.generation_only: dumped = json.dumps(results, indent=2) self.logger.info(dumped) with open(self.args.metric_output_path, "w") as f: f.write(dumped) class BigCodeEvaluator(CSEvalHarnessAdapter, Evaluator): """ Subclasses BigCode's `Evaluator` base class, overriding the `generate_text` method. """ def __init__( self, trainer, bigcode_args: BigCodeCLIArgs, dataloader_args: Dict[str, Any], ): """ Args: trainer: Trainer object bigcode_args: `BigCodeCLIArgs` dataclass object capturing BCEH's CLI args dataloader_args: Dict of dataloader args. """ self.args: BigCodeCLIArgs self.dataloader_args: Dict[str, Any] Evaluator.__init__(self, None, None, None, args=bigcode_args) CSEvalHarnessAdapter.__init__( self, trainer=trainer, dataloader_args=dataloader_args ) def evaluate( self, task_name: str, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, ): """Override of the BCEH's Evaluator class' method. Note: Code evaluation flow is not yet supported. """ raise NotImplementedError("Code evaluation flow is not yet supported.") def _construct_prompts( self, task: Any, dataset: Any, n_tasks: int, limit_start: int = 0, n_copies: int = 1, instruction_tokens: Optional[List[str]] = None, ) -> List[str]: """Helper from BigCode's implementaion to preprocess task dataset into a list of raw text samples. """ def _make_infill_prompt(self, prefix, suffix, preprefix=""): """Make a prompt for infilling. Currently supported only for official InCoder and SantaCoder implementations. """ model_id = self.tokenizer.name_or_path if model_id in ["facebook/incoder-1B", "facebook/incoder-6B"]: self.tokenizer.add_special_tokens({"pad_token": "<pad>"}) return f"{preprefix}{prefix}<|mask:0|>{suffix}<|mask:0|>" elif model_id in ["bigcode/santacoder"]: return f"<fim-prefix>{preprefix}{prefix}<fim-suffix>{suffix}<fim-middle>" elif model_id in ["bigcode/starcoder", "bigcode/starcoderbase"]: return f"<fim_prefix>{preprefix}{prefix}<fim_suffix>{suffix}<fim_middle>" else: raise ValueError(f"Infilling not yet supported for: {model_id}") def _make_instruction_prompt(self, instruction, context, prefix=""): """Make a prompt for instruction-tuning. Delimit instruction and context with specific tokens if provided. """ if not instruction_tokens: warn( "Instruction-tuning tokens are not provided for an " "instruction-tuning task, we will leave them empty." ) user_token, end_token, assistant_token = "", "", "\n" else: user_token, end_token, assistant_token = instruction_tokens if not user_token or not assistant_token or not end_token: warn( "Instruction-tuning tokens provided but one or more are empty. " "Ignore warning if this was intended" ) return ( prefix + user_token + instruction + end_token + assistant_token + context ) # Extract stop words stopping_criteria = [] if task.stop_words: for stop_word in task.stop_words: stopping_criteria.append(stop_word) prompts = [] infill = False instruction = False mixed_error_log = ( "Mixing tasks with infill/instruction " "and completion prompts is not supported." ) for sample in range(limit_start, limit_start + n_tasks): prompt_contents = task.get_prompt(dataset[sample]) if isinstance(prompt_contents, str): # Normal code completion mode if infill: raise ValueError(mixed_error_log) instruction = True prompt = self.args.prefix + prompt_contents elif isinstance(prompt_contents, dict): if instruction: raise ValueError(mixed_error_log) infill = True if set(prompt_contents.keys()) == {"prefix", "suffix"}: # Infilling mode (Currently supported only for official InCoder and SantaCoder # implementations.) prompt = _make_infill_prompt( **prompt_contents, preprefix=self.args.prefix ) elif set(prompt_contents.keys()) == {"instruction", "context"}: # Instruction-tuning mode prompt = _make_instruction_prompt( **prompt_contents, prefix=self.args.prefix ) else: raise ValueError( f"Unsupported prompt format: {type(prompt_contents)}" ) prompts.append((prompt, deepcopy(stopping_criteria))) return prompts def generate_on_csx( self, task: Any, prompts: List[str], gen_kwargs: Dict[str, Any], n_tasks: int, limit_start: int = 0, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, instruction_tokens: Optional[str] = None, ) -> List[List[Optional[str]]]: """Generate code samples on CSX from the given prompts. Args: task: Code evaluation task object prompts: List of raw text prompts as processed by BCEH's script gen_kwargs: Dict specifying settings for generative inference n_tasks: Number of data samples limit_start: Offset to limit the number of samples. Defaults to 0. intermediate_generations: List of previously loaded generations. Defaults to None. instruction_tokens: List of instruction tokens used for instruction-tuning benchamrks. Returns: List of generated code samples """ ( samples_file_list, dataset_size, metadata, ) = self.preprocess_dataset( prompts, request_type=RequestType.bigcode_eh, max_tokens=gen_kwargs.get("max_tokens"), ) # keep track of the list of generated codes # where len(code_gens) = n_tasks and len(code_gens[0]) = number of generated code samples code_gens: List[List[Optional[str]]] = [[] for _ in range(n_tasks)] generations = ( [] if not intermediate_generations else intermediate_generations ) # Generate tokens on appliance with GenerateTokens(metadata["requests"], gen_kwargs) as gen: self.trainer.validate( val_dataloader=cstorch.utils.data.DataLoader( self.input_fn, self.dataloader_args, samples_file_list, dataset_size, RequestType.bigcode_eh.value, **metadata["dataset_kwargs"], ), loop=BigCodeEvalHarnessLoop(), ckpt_path=None, ) self.logger.debug(f"Output results: {gen.gen_token_dict}") code_gens = update_code_gens( task, self.tokenizer, limit_start, self.args.prefix, instruction_tokens, self.args.postprocess, code_gens, gen.gen_token_dict, ) generations.extend(code_gens) return generations def generate_text( self, task_name: str, intermediate_generations: Optional[ List[Optional[List[Optional[str]]]] ] = None, ) -> Tuple[List[List[str]], List[str]]: """Override of the BCEH's Evaluator class' method. Args: task_name: Name of the BigCode task to evaluate intermediate_generations: List of intermediate generations, if loaded Returns: Tuple of list of generated code samples and list of references """ task: Task = bigcode_tasks.get_task(task_name, self.args) if ( hasattr(task, "max_length_multiplier") and task.max_length_multiplier ): raise RuntimeError( f"BigCode task {task_name} specifies a max_length_multipler " f"stopping criterion, which is currently not supported. Please " f"choose a different task." ) dataset = task.get_dataset() # if args.limit is None, use all samples # if args.limit is used, make sure args.limit_start + args.limit <= len(dataset) n_tasks = ( min(self.args.limit, len(dataset) - self.args.limit_start) if self.args.limit else len(dataset) ) # when args.limit is None # adjust n_tasks by args.limit_start to prevent out of bounds issues if not self.args.limit: n_tasks -= self.args.limit_start references = [ task.get_reference(dataset[i]) for i in range( self.args.limit_start, self.args.limit_start + n_tasks ) ] if self.args.check_references: if ( "get_solution" in inspect.signature(task.get_reference).parameters ): solutions = [ [task.get_reference(dataset[i], get_solution=True)] for i in range( self.args.limit_start, self.args.limit_start + n_tasks ) ] else: solutions = [[ref] for ref in references] return solutions, references curr_generations = [] # list[list[str | None] | None] if intermediate_generations: curr_generations = [gen for gen in intermediate_generations if gen] n_tasks -= len(curr_generations) curr_sample_idx = len(curr_generations) self.logger.info(f"Number of problems for this task is {n_tasks}") n_copies = ceil(self.args.n_samples / self.batch_size) limit_start = self.args.limit_start + curr_sample_idx if self.args.instruction_tokens: instruction_tokens = self.args.instruction_tokens.split(",") if len(instruction_tokens) != 3: raise ValueError( "Instruction tokens should contain exactly 3 tokens " "separated by a comma. If a token is empty, represent it as ''" ) for token in instruction_tokens: if token.strip() != "": task.stop_words.append(token) else: instruction_tokens = None # Set up generation settings gen_kwargs = { "do_sample": self.args.do_sample, "temperature": self.args.temperature, "top_p": self.args.top_p, "top_k": self.args.top_k, "max_tokens": self.args.max_tokens, } stopping_criteria = [] if task.stop_words: for stop_word in task.stop_words: stopping_criteria.append(stop_word) if stopping_criteria: gen_kwargs["stopping_criteria"] = stopping_criteria # Fetch list of prompts prompts = self._construct_prompts( task, dataset, n_tasks=n_tasks, limit_start=limit_start, n_copies=n_copies, instruction_tokens=instruction_tokens, ) # Generate tokens on CSX for the given prompts data generations = self.generate_on_csx( task, prompts, gen_kwargs=gen_kwargs, intermediate_generations=curr_generations, n_tasks=n_tasks, limit_start=limit_start, instruction_tokens=instruction_tokens, ) if len(generations[0]) > self.args.n_samples: generations = [g[: self.args.n_samples] for g in generations] warn( f"Number of tasks wasn't proportional to number of devices, we " f"removed extra predictions to only keep nsamples={self.args.n_samples}" ) return generations, references class BigCodeEvalHarnessLoop(ValidationLoop): """Subclass of `ValidationLoop` to run BigCode's Evaluation Harness.""" def __init__(self): """Initializes the BigCodeEvalHarnessLoop object.""" super().__init__(hook="bigcode_eval_harness") def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """ Run ValidationLoop's `on_validate_start` method to ensure that eval_steps is being computed correctly. """ model.eval() self.on_validate_start(trainer, model, val_dataloader, loop) class GenerateTokens(Callback): """ Callback class to post-process model output tokens. """ def __init__( self, metadata: List[Tuple[int, int]], gen_kwargs: Dict[str, Any], ): """ Args: metadata: List of tuples of (sample idx, prompt encoding length) for each sample in the batch gen_kwargs: Dict specifying settings for generative inference. """ self.metadata = metadata self.start_token = None self.sample_idx = 0 self.gen_token_dict = defaultdict( list ) # dict of list of generated tokens # Generation settings self.temperature = gen_kwargs.get("temperature") self.top_p = gen_kwargs.get("top_p") self.top_k = gen_kwargs.get("top_k") self.max_tokens = gen_kwargs.get("max_tokens") self.progress = EvalHarnessProgress("BigCode Generative Eval") def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """Runs before the BigCode Evaluation Harness starts.""" self.start_token = getattr(model, "start_token", None) if self.start_token is None: raise RuntimeError( "No start token specified under `model.start_token`. " "Please specify a start token for generative tasks." ) if self.max_tokens is not None: model.max_tokens = self.max_tokens if self.temperature is not None: model.temperature = self.temperature if self.top_p is not None: model.top_p = self.top_p if self.top_k is not None: model.top_k = self.top_k def on_bigcode_eval_harness_batch_end( self, trainer, model, outputs, batch, batch_idx ): """Runs after every batch is processed.""" self.progress.print(trainer, batch_idx) def on_before_forward(self, trainer, model, batch, args, kwargs): kwargs["autoregressive"] = True def on_after_forward(self, trainer, model, outputs, batch): self.post_process(predictions=outputs["output"]) @cstorch.step_closure def post_process(self, predictions): """ Post-processes the model generated output tokens. Args: predictions: Tensor of shape (batch_size, max_seq_len) containing the model's predictions """ for gen_tokens in predictions: if not self.metadata[self.sample_idx]: continue sample_idx, _ = self.metadata[self.sample_idx] assert sample_idx == self.sample_idx, "Mismatching sample indices" # Grab generation tokens try: start_token_idx = gen_tokens.tolist().index(self.start_token) gen_tokens = gen_tokens[:start_token_idx].numpy() except ValueError: # Generated string spans msl pass self.gen_token_dict[sample_idx].append(gen_tokens) self.sample_idx += 1
[docs]class BigCodeEvalHarness(ValidationCallback): """ ValidationCallback class to run BigCode's Evaluation Harness. """ id = 0 def __init__( self, # BigCode Args bigcode_args: Union[BigCodeCLIArgs, Dict[str, Any]], # Cerebras specific args keep_data_dir: bool = False, every_n_vals: int = 1, flags: Optional[dict] = None, name_scope: Optional[str] = None, # Data Args batch_size: Optional[int] = None, data_dir: Optional[str] = None, max_sequence_length: Optional[int] = None, tokenizer_file_path: Optional[str] = None, eos_id: Optional[int] = None, **dataloader_args, ): """ Args: bigcode_args: `BigCodeCLIArgs` dataclass or dict capturing BCEH's CLI args keep_data_dir: Specifies whether dumped data samples should be kept for reuse. Defaults to False, i.e. data samples are deleted after the run. every_n_vals: Run the BigCode eval harness script every N validations. e.g. If the eval_frequency is set to 200 and N=2, then BigCode eval harness runs every 400 training steps. The BigCode eval harness script will also always run after the final training iteration. flags: A optional dictionary of scoped global flags to set during the BigCode eval harness run. name_scope: An optional string that gets added to the trainer's name scope. batch_size: Batch size to BigCodeEvalHarness to preprocess input data samples from the specified eval harness tasks. data_dir: Path to data directory max_sequence_length: Maximum sequence length tokenizer_file_path: Path to tokenizer file eos_id: End of sentence token id dataloader_args: Any additional dataloader args, e.g. num_workers. """ # Handling parsing for creating trainer from yaml if isinstance(bigcode_args, dict): self.bigcode_args = BigCodeCLIArgs(**bigcode_args) else: self.bigcode_args = bigcode_args self.bceh_runner = BigCodeEvalHarnessRunner( bigcode_args=self.bigcode_args ) self.dataloader_args = dict( batch_size=batch_size, data_dir=os.path.realpath(data_dir), keep_data_dir=keep_data_dir, max_sequence_length=max_sequence_length, tokenizer_file_path=tokenizer_file_path, eos_id=eos_id, **dataloader_args, ) # Removes annoying logs relating to process forking appliance_environ["TOKENIZERS_PARALLELISM"] = "false" self.every_n_vals = every_n_vals self.scoped_flags = ScopedBigCodeEvalHarnessFlags(**(flags or {})) self._id = BigCodeEvalHarness.id BigCodeEvalHarness.id += 1 if name_scope is None: name_scope = f"bigcode_{self._id}" self.name_scope = name_scope
[docs] def run(self, trainer): """Run BigCode Eval Harness. Args: trainer: the Trainer object """ trainer.logger.info("Running BigCode Eval Harness") # If no absolute file paths for output dumps are provided, dump inside model_dir if not os.path.isabs(self.bceh_runner.args.save_generations_path): self.bceh_runner.args.save_generations_path = os.path.join( trainer.summary_dir, trainer.name_scope_path, self.bceh_runner.args.save_generations_path, ) os.makedirs( os.path.dirname(self.bceh_runner.args.save_generations_path), exist_ok=True, ) if not os.path.isabs(self.bceh_runner.args.save_references_path): self.bceh_runner.args.save_references_path = os.path.join( trainer.summary_dir, trainer.name_scope_path, self.bceh_runner.args.save_references_path, ) os.makedirs( os.path.dirname(self.bceh_runner.args.save_references_path), exist_ok=True, ) bc_evaluator = BigCodeEvaluator( trainer, deepcopy(self.bigcode_args), deepcopy(self.dataloader_args), ) with self.scoped_flags: self.bceh_runner.evaluate(trainer=trainer, evaluator=bc_evaluator)
def run_validation(self, trainer, loop_idx, is_last): if not is_last and (loop_idx + 1) % self.every_n_vals != 0: return with trainer.name_scope(self.name_scope): self.run(trainer)
class ScopedBigCodeEvalHarnessFlags(_ScopedFlags): """ Class to set and restore global flags during the BigCode Evaluation Harness run. """ def on_bigcode_eval_harness_start( self, trainer, model, val_dataloader, loop ): """Sets the global flags before the BigCode Evaluation Harness run.""" self._set_all_flags() def on_bigcode_eval_harness_end(self, trainer, model, loop): """Restores the global flags after the BigCode Evaluation Harness run.""" self._restore_all_flags()