# Copyright 2022 Cerebras Systems.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines utils for running Eval Harness on CSX."""
import glob
import json
import os
import re
import sys
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from warnings import warn
import numpy as np
from lm_eval import evaluator, utils
from lm_eval.__main__ import (
DEFAULT_RESULTS_FILE,
_handle_non_serializable,
_int_or_none_list_arg_type,
)
from lm_eval.api.model import LM
from lm_eval.api.task import Task
from lm_eval.tasks import TaskManager, get_task_dict
from lm_eval.utils import make_table
from cerebras.appliance.log import ClassLogger, named_class_logger
SUPPORTED_MODELS = {
"btlm",
"bloom",
"gpt2",
"gptj",
"falcon",
"gpt3",
"gpt-neox",
"llama",
"mistral",
"mixtral",
"mpt",
"jais",
"santacoder",
"starcoder",
}
[docs]@dataclass
class EleutherCLIArgs:
"""Captures EEH's CLI arguments with defaults.
Fields:
tasks: List of tasks to evaluate
To get full list of tasks, use the command ``lm-eval --tasks list``
num_fewshot: Number of examples in few-shot context
output_path: The path to the output file where the result metrics
will be saved. If the path is a directory and log_samples is true,
the results will be saved in the directory. Else the parent
directory will be used.
limit: Limit the number of examples per task.
If <1, limit is a percentage of the total number of examples.
use_cache: A path to a sqlite db file for caching model responses.
`None` if not caching.
cache_requests: Speed up evaluation by caching the building of
dataset requests. `None` if not caching.
check_integrity: Whether to run the relevant part of the test suite
for the tasks.
write_out: Prints the prompt for the first few documents.
log_samples: If True, write out all model outputs and documents for
per-sample measurement and post-hoc analysis. Use with
--output_path.
show_config: If True, shows the the full config of all tasks at the
end of the evaluation.
include_path: Additional path to include if there are external tasks
to include.
predict_only: Use with --log_samples. Only model outputs will be
saved and metrics will not be evaluated.
seed: Set seed for python's random, numpy and torch.
Accepts a comma-separated list of 3 values for python's random,
numpy, and torch seeds, respectively, or a single integer to set
the same seed for all three. The values are either an integer
or ``None`` to not set the seed. Default is ``0,1234,1234`` (for
backward compatibility). E.g. ``--seed 0,None,8`` sets
``random.seed(0)`` and ``torch.manual_seed(8)``. Here numpy's seed
is not set since the second value is ``None``. E.g, ``--seed 42``
sets all three seeds to 42.
trust_remote_code: Sets trust_remote_code to True to execute code to
create HF Datasets from the Hub
verbosity: EEH logging level
max_length_generation: Maximum length of generated sequence (prompt+generation).
temperature: Sampling temperature used for generation.
top_k: Top-k parameter used for generation.
top_p: Top-p parameter used for nucleus sampling.
"""
tasks: Union[str, List[str]]
num_fewshot: Optional[int] = None
output_path: Optional[str] = None
limit: Optional[float] = None
use_cache: Optional[str] = None
cache_requests: Optional[Literal["true", "refresh", "delete"]] = None
check_integrity: bool = False
write_out: bool = False
log_samples: bool = False
show_config: bool = False
include_path: Optional[str] = None
predict_only: bool = False
seed: Union[int, str] = "0,1234,1234"
trust_remote_code: bool = False
verbosity: str = "INFO"
max_length_generation: Optional[int] = None
temperature: Optional[float] = None
top_k: Optional[int] = None
top_p: Optional[float] = None
def __post_init__(self):
"""Specially handle the seed."""
# Special handling of `seed` arg
self.seed = _int_or_none_list_arg_type(3, str(self.seed))
@named_class_logger("EvalHarnessRunner")
class EvalHarnessRunner(ClassLogger):
"""Util class for invoking EEH's run script with CSX-specific components."""
def __init__(self, eeh_args: EleutherCLIArgs):
"""
Args:
eeh_args: Eval Harness CLI args.
"""
super().__init__()
self.args = deepcopy(eeh_args)
self.task_manager: TaskManager = None
self.task_names: Union[str, List[Union[str, Dict, Task]]] = []
self.init_tasks()
def init_tasks(self):
# pylint: disable=line-too-long
"""Captures the task initialization logic from `Eleuther's run script <lm_eval_main>`_.
.. _lm_eval_main: https://github.com/EleutherAI/lm-evaluation-harness/blob/4600d6bf73ba2cf7037ae7feada03315839ef185/lm_eval/__main__.py#L271-L307
Includes CSX-specific validation for the user-specified eval harness tasks.
"""
if self.args.include_path is not None:
self.logger.info(
f"Including path: {self.args.include_path} for externally created tasks."
)
task_manager = TaskManager(
self.args.verbosity, include_path=self.args.include_path
)
if self.args.limit:
self.logger.warning(
" --limit should only be used for testing. "
"Real metrics should not be computed using limit."
)
if self.args.tasks is None:
raise ValueError("Need to specify task to evaluate.")
elif self.args.tasks == "list":
self.logger.info(
"Available Tasks:\n - {}".format(
"\n - ".join(task_manager.all_tasks)
)
)
sys.exit()
else:
if os.path.isdir(self.args.tasks):
task_names = []
yaml_path = os.path.join(self.args.tasks, "*.yaml")
for yaml_file in glob.glob(yaml_path):
self.logger.info(f"Loading task from file: {yaml_file}")
config = utils.load_yaml_config(yaml_file)
task_names.append(config)
else:
task_list = self.args.tasks.split(",")
task_names = task_manager.match_tasks(task_list)
for task in [
task for task in task_list if task not in task_names
]:
if os.path.isfile(task):
config = utils.load_yaml_config(task)
task_names.append(config)
task_missing = [
task
for task in task_list
if task not in task_names and "*" not in task
] # we don't want errors if a wildcard ("*") task name was used
if task_missing:
missing = ", ".join(task_missing)
raise ValueError(
f"Tasks not found: {missing}.\n"
f"{utils.SPACING}Try `lm-eval --tasks list` for list "
"of available tasks, or '--verbosity DEBUG' to "
"troubleshoot task registration issues."
)
# Validate tasks and cache task related properties.
self.task_names = EvalHarnessRunner.validate_and_sort_tasks(
task_names, task_manager
)
self.task_manager = task_manager
@cached_property
def task_dict(self) -> Dict[str, Any]:
"""Returns the task dictionary for the specified tasks."""
return get_task_dict(self.task_names, self.task_manager)
@staticmethod
def validate_and_sort_tasks(
task_names: Union[str, List[Union[str, Dict, Task]]],
task_manager: Optional[TaskManager] = None,
) -> None:
"""Validates user specification of eval harness tasks on CSX. In particular, for
a single run we do not support.
1) Tasks with `loglikelihood_rolling` output types
2) Combining non-generative and generative tasks
3) Running multiple generative tasks
Args:
task_names: List of task names or config dicts
task_manager: TaskManager object that stores indexed tasks
"""
task_dict = get_task_dict(task_names, task_manager)
gen_tasks, non_gen_tasks = [], []
for task_name in task_dict.keys():
task_obj = task_dict[task_name]
if isinstance(task_obj, tuple):
_, task_obj = task_obj
if task_obj is None:
continue
if task_obj.get_config("output_type") == "loglikelihood_rolling":
raise RuntimeError(
"Tasks with `loglikelihood_rolling` output types are not yet supported."
f"Please unspecify task {task_name} from the specified tasks list."
)
elif task_obj.get_config("output_type") == "generate_until":
if gen_tasks:
raise RuntimeError(
"Running multiple generative eval harness tasks in the same callback "
"is not currently supported. Please specify only one generative task per "
"eval harness callback. To run multiple generative tasks, create separate "
"callbacks."
)
else:
gen_tasks.append(task_name)
else:
non_gen_tasks.append(task_name)
# Put non generative task names after generative so EH will execute them first.
# This is needed minimize the amount of appliance restarts, so train->non_generative
# will use the same appliance.
return gen_tasks + non_gen_tasks
def evaluate(self, trainer, model: LM) -> dict:
# pylint: disable=line-too-long
"""Invoke's evaluation logic from `EEH's run script <lm_eval_main>`_ on the given model.
.. _lm_eval_main: https://github.com/EleutherAI/lm-evaluation-harness/blob/4600d6bf73ba2cf7037ae7feada03315839ef185/lm_eval/__main__.py#L240
Args:
trainer: The Trainer object to log to.
model: The language model object (subclass of EEH's LM abstract base class)
"""
if self.args.predict_only:
self.args.log_samples = True
if (
self.args.log_samples or self.args.predict_only
) and not self.args.output_path:
self.args.output_path = (
trainer.summary_dir / trainer.name_scope_path
)
if self.args.output_path:
path = Path(self.args.output_path)
if not path.is_absolute():
path = trainer.summary_dir / trainer.name_scope_path / path
if path.is_dir():
path.mkdir(parents=True, exist_ok=True)
else:
path.parent.mkdir(parents=True, exist_ok=True)
# check if file or 'dir/results.json' exists
if path.is_file():
raise FileExistsError(f"File already exists at {path}")
output_path_file = path.joinpath(DEFAULT_RESULTS_FILE)
output_path_file = (
output_path_file.parent
/ f"{output_path_file.stem}_{trainer.global_step}{output_path_file.suffix}"
)
if output_path_file.is_file():
self.logger.warning(
f"File {output_path_file} already exists. Results will be overwritten."
)
# if path json then get parent dir
elif path.suffix in (".json", ".jsonl"):
output_path_file = path
path.parent.mkdir(parents=True, exist_ok=True)
path = path.parent
else:
output_path_file = output_path_file.resolve()
path = path.resolve()
path.mkdir(parents=True, exist_ok=True)
self.logger.info(
f"Starting Eleuther evaluation harness on selected tasks: {self.task_names}"
)
request_caching_args = evaluator.request_caching_arg_to_dict(
cache_requests=self.args.cache_requests
)
# Set generative inference settings
gen_kwargs = {
"temperature": self.args.temperature,
"top_k": self.args.top_k,
"top_p": self.args.top_p,
"max_tokens": self.args.max_length_generation,
}
model.gen_kwargs = gen_kwargs
results = evaluator.simple_evaluate(
model=model,
tasks=self.task_names,
num_fewshot=self.args.num_fewshot,
use_cache=self.args.use_cache,
limit=self.args.limit,
check_integrity=self.args.check_integrity,
write_out=self.args.write_out,
log_samples=self.args.log_samples,
task_manager=self.task_manager,
verbosity=self.args.verbosity,
predict_only=self.args.predict_only,
random_seed=self.args.seed[0],
numpy_random_seed=self.args.seed[1],
torch_random_seed=self.args.seed[2],
**request_caching_args,
)
if results is not None:
if self.args.log_samples:
samples = results.pop("samples")
dumped = json.dumps(
results,
indent=2,
default=_handle_non_serializable,
ensure_ascii=False,
)
if self.args.show_config:
self.logger.info(dumped)
batch_sizes = ",".join(map(str, results["config"]["batch_sizes"]))
batch_size = None
model_args = None
try:
self.log_eval_results(trainer, results)
if self.args.log_samples:
self.log_eval_samples(trainer, samples, results)
except Exception as e: # pylint: disable=broad-except
self.logger.error(
f"Logging eval results/samples failed due to: {e}"
)
if self.args.output_path is not None:
self.logger.info(
f"Saving Eleuther Eval Harness results to {output_path_file}"
)
with output_path_file.open("w", encoding="utf-8") as f:
f.write(dumped)
if self.args.log_samples:
for task_name, _ in results["configs"].items():
filename = path.joinpath(
f"{task_name}_{trainer.global_step}.json"
)
samples_dumped = json.dumps(
samples[task_name],
indent=2,
default=_handle_non_serializable,
ensure_ascii=False,
)
filename.write_text(samples_dumped, encoding="utf-8")
self.logger.info(
f"{model} ({model_args}), gen_kwargs: ({gen_kwargs}), "
f"limit: {self.args.limit}, num_fewshot: {self.args.num_fewshot}, "
f"batch_size: {batch_size}{f' ({batch_sizes})' if batch_sizes else ''}"
)
self.logger.info("\n" + make_table(results))
if "groups" in results:
self.logger.info("\n" + make_table(results, "groups"))
def log_eval_results(self, trainer, results: Dict[str, Any]) -> None:
"""Logs the evaluation results to the trainer."""
results = deepcopy(results)
# TODO: Do we need to update the wandb config?
# configs = {
# "task_configs": results.get("configs", {}),
# "cli_configs": results.get("config", {}),
# }
# wandb.run.config.update(configs)
pattern = re.compile(r",none$")
# Log the evaluation metrics
trainer.log_metrics(
**{
# Remove None from the metric string name
pattern.sub("", f"{task_name}/{metric_name}"): metric_value
for task_name, task_value in results.get("results", {}).items()
for metric_name, metric_value in task_value.items()
}
)
self.log_eval_results_as_table(trainer, results)
# Log the results dict as json
self.log_as_json(trainer, "eval_results", results)
def log_eval_results_as_table( # pylint: disable=line-too-long
self, trainer, results: Dict[str, Any]
) -> None:
"""Logs the eval results as a table to the trainer's loggers.
Note, this method is adapted to construct a pandas DataFrame off the
`original WandB specific implementation <log_table>`_ in EEH.
.. _log_table: https://github.com/EleutherAI/lm-evaluation-harness/blob/4600d6bf73ba2cf7037ae7feada03315839ef185/lm_eval/logging_utils.py#L157-L205
"""
try:
import pandas as pd
except ImportError:
warn("Pandas not installed. Skipping logging of results as table.")
return
group_names = list(results.get("groups", {}))
def make_dataframe(column1: str, key: str = "results"):
data = []
for k, dic in results.get(key).items():
if k in group_names and key != "groups":
continue
version = results.get("versions").get(k)
if version == "N/A":
version = None
num_fewshot = results.get("n-shot").get(k)
for metric_filter, value in dic.items():
# pylint: disable=redefined-builtin
metric, _, filter = metric_filter.partition(",")
if metric.endswith("_stderr") or metric == "alias":
continue
if f"{metric}_stderr,{filter}" in dic:
stderr = dic[f"{metric}_stderr,{filter}"]
if stderr != "N/A":
stderr = f"{stderr:.4f}"
else:
stderr = ""
data.append(
{
column1: k,
"Version": version,
"Filter": filter,
"num_fewshot": num_fewshot,
"Metric": metric,
"Value": str(value),
"Stderr": str(stderr),
}
)
return pd.DataFrame(data=data)
if "results" in results:
trainer.log_metrics(
**{
"evaluation/eval_results": make_dataframe(
"Tasks", "results"
)
}
)
if "groups" in results:
trainer.log_metrics(
**{
"evaluation/group_eval_results": make_dataframe(
"Groups", "groups"
)
}
)
def log_as_json(self, trainer, key, results: Dict[str, Any]):
"""Serializes the results dict as json and logs it to the trainer."""
def _handle_non_serializable(o: Any) -> Union[int, str, list]:
if isinstance(o, (np.int32, np.int64)):
return int(o)
elif isinstance(o, set):
return list(o)
else:
return str(o)
trainer.log_metrics(
**{
key: json.dumps(
results,
indent=4,
default=_handle_non_serializable,
ensure_ascii=False,
)
}
)
def log_eval_samples(
self, trainer, samples: Dict[str, Any], results: Dict[str, Any]
) -> None:
"""Logs the evaluation samples to the trainer."""
try:
import pandas as pd
except ImportError:
warn("Pandas not installed. Skipping logging of eval samples")
return
samples = deepcopy(samples)
def generate_dataset(*args, **kwargs) -> pd.DataFrame:
from lm_eval.logging_utils import WandbLogger
# Its okay to pass in `None` as self as this method
# has no self uses
# pylint: disable=protected-access
return WandbLogger._generate_dataset(None, *args, **kwargs)
group_names = list(results.get("groups", {}))
task_names = [
x for x in results.get("results", {}) if x not in group_names
]
ungrouped_tasks = []
tasks_by_groups = defaultdict(list)
task_configs = results.get("configs", {})
for task_name in task_names:
group_names = task_configs[task_name].get("group", None)
if group_names:
if isinstance(group_names, str):
group_names = [group_names]
for group_name in group_names:
tasks_by_groups[group_name].append(task_name)
else:
ungrouped_tasks.append(task_name)
for task_name in ungrouped_tasks:
eval_preds = samples[task_name]
trainer.log_metrics(
**{
# log the samples as a table
f"{task_name}_eval_results": generate_dataset(
eval_preds,
task_configs.get(task_name),
),
}
)
# Log the samples dict as json
self.log_as_json(trainer, f"{task_name}_eval_samples", eval_preds)
for group, grouped_tasks in tasks_by_groups.items():
grouped_df = pd.DataFrame()
for task_name in grouped_tasks:
eval_preds = samples[task_name]
df = generate_dataset(eval_preds, task_configs.get(task_name))
df["group"] = group
df["task"] = task_name
grouped_df = pd.concat([grouped_df, df], ignore_index=True)
# Log the samples dict as json
self.log_as_json(
trainer, f"{task_name}_eval_samples", eval_preds
)
trainer.log_metrics(**{f"{group}_eval_results": grouped_df})