Skip to content

Evaluator

evaldeck.evaluator.Evaluator

Evaluator(graders=None, metrics=None, config=None)

Main evaluation engine.

Evaluates agent traces against test cases using graders and metrics.

Choosing sync vs async methods:

Use evaluate() (sync) when: - Running a single quick evaluation with code-based graders - Your graders are all CPU-bound (ContainsGrader, RegexGrader, etc.) - You're in a sync context without an event loop

Use evaluate_async() when: - Using LLMGrader or other I/O-bound graders - Running multiple graders that make API calls - You want concurrent grader execution for better throughput - Your custom graders/metrics make async API calls

Use evaluate_suite_async() when: - Running multiple test cases (concurrent execution) - Your agent function is async - You want to control concurrency with max_concurrent

Performance comparison::

# Sync: graders run sequentially
# 3 LLMGraders × 2 seconds each = ~6 seconds total
result = evaluator.evaluate(trace, test_case)

# Async: graders run concurrently
# 3 LLMGraders × 2 seconds each = ~2 seconds total
result = await evaluator.evaluate_async(trace, test_case)

Initialize the evaluator.

Parameters:

Name Type Description Default
graders list[BaseGrader] | None

List of graders to use. If None, uses defaults based on test case.

None
metrics list[BaseMetric] | None

List of metrics to calculate. If None, uses defaults.

None
config EvaldeckConfig | None

Evaldeck configuration.

None
Source code in src/evaldeck/evaluator.py
def __init__(
    self,
    graders: list[BaseGrader] | None = None,
    metrics: list[BaseMetric] | None = None,
    config: EvaldeckConfig | None = None,
) -> None:
    """Initialize the evaluator.

    Args:
        graders: List of graders to use. If None, uses defaults based on test case.
        metrics: List of metrics to calculate. If None, uses defaults.
        config: Evaldeck configuration.
    """
    self.graders = graders
    self.metrics = metrics or self._default_metrics()
    self.config = config

evaluate

evaluate(trace, test_case)

Evaluate a single trace against a test case (sync).

Runs graders and metrics sequentially. Best for: - Code-based graders (ContainsGrader, RegexGrader, etc.) - Quick evaluations without I/O-bound operations - Contexts without an async event loop

For I/O-bound graders (LLMGrader) or concurrent execution, use evaluate_async() instead.

Parameters:

Name Type Description Default
trace Trace

The execution trace to evaluate.

required
test_case EvalCase

The test case defining expected behavior.

required

Returns:

Type Description
EvaluationResult

EvaluationResult with grades and metrics.

Source code in src/evaldeck/evaluator.py
def evaluate(
    self,
    trace: Trace,
    test_case: EvalCase,
) -> EvaluationResult:
    """Evaluate a single trace against a test case (sync).

    Runs graders and metrics sequentially. Best for:
    - Code-based graders (ContainsGrader, RegexGrader, etc.)
    - Quick evaluations without I/O-bound operations
    - Contexts without an async event loop

    For I/O-bound graders (LLMGrader) or concurrent execution,
    use evaluate_async() instead.

    Args:
        trace: The execution trace to evaluate.
        test_case: The test case defining expected behavior.

    Returns:
        EvaluationResult with grades and metrics.
    """
    started_at = datetime.now()

    # Build graders
    graders = self.graders if self.graders else self._build_graders(test_case)

    # Create result
    result = EvaluationResult(
        test_case_name=test_case.name,
        status=GradeStatus.PASS,  # Start optimistic
        started_at=started_at,
        trace_id=trace.id,
    )

    # Run graders sequentially
    for grader in graders:
        try:
            grade = grader.grade(trace, test_case)
            result.add_grade(grade)
        except Exception as e:
            result.add_grade(GradeResult.error_result(grader.name, f"Grader error: {e}"))

    # Calculate metrics
    for metric in self.metrics:
        try:
            metric_result = metric.calculate(trace, test_case)
            result.add_metric(metric_result)
        except Exception:
            pass  # Metrics are optional, don't fail on error

    # Finalize
    result.completed_at = datetime.now()
    result.duration_ms = (result.completed_at - started_at).total_seconds() * 1000

    return result

evaluate_async async

evaluate_async(trace, test_case)

Evaluate a single trace against a test case (async).

Runs graders and metrics concurrently using asyncio.gather(). Recommended for: - LLMGrader (makes async API calls to OpenAI/Anthropic) - Custom async graders that call external services - Custom async metrics that fetch benchmark data - Any scenario with multiple I/O-bound operations

Performance benefit: With 3 LLMGraders each taking 2 seconds, sync evaluate() takes ~6 seconds while evaluate_async() takes ~2 seconds.

Code-based graders (ContainsGrader, etc.) automatically run in a thread pool via asyncio.to_thread() to avoid blocking the event loop.

Parameters:

Name Type Description Default
trace Trace

The execution trace to evaluate.

required
test_case EvalCase

The test case defining expected behavior.

required

Returns:

Type Description
EvaluationResult

EvaluationResult with grades and metrics.

Source code in src/evaldeck/evaluator.py
async def evaluate_async(
    self,
    trace: Trace,
    test_case: EvalCase,
) -> EvaluationResult:
    """Evaluate a single trace against a test case (async).

    Runs graders and metrics concurrently using asyncio.gather().
    Recommended for:
    - LLMGrader (makes async API calls to OpenAI/Anthropic)
    - Custom async graders that call external services
    - Custom async metrics that fetch benchmark data
    - Any scenario with multiple I/O-bound operations

    Performance benefit: With 3 LLMGraders each taking 2 seconds,
    sync evaluate() takes ~6 seconds while evaluate_async() takes ~2 seconds.

    Code-based graders (ContainsGrader, etc.) automatically run in a
    thread pool via asyncio.to_thread() to avoid blocking the event loop.

    Args:
        trace: The execution trace to evaluate.
        test_case: The test case defining expected behavior.

    Returns:
        EvaluationResult with grades and metrics.
    """
    started_at = datetime.now()

    # Build graders
    graders = self.graders if self.graders else self._build_graders(test_case)

    # Create result
    result = EvaluationResult(
        test_case_name=test_case.name,
        status=GradeStatus.PASS,  # Start optimistic
        started_at=started_at,
        trace_id=trace.id,
    )

    # Run graders concurrently
    async def run_grader(grader: BaseGrader) -> GradeResult:
        try:
            return await grader.grade_async(trace, test_case)
        except Exception as e:
            return GradeResult.error_result(grader.name, f"Grader error: {e}")

    grade_results = await asyncio.gather(*[run_grader(g) for g in graders])

    for grade in grade_results:
        result.add_grade(grade)

    # Calculate metrics concurrently (supports async custom metrics)
    async def run_metric(metric: BaseMetric) -> MetricResult | None:
        try:
            return await metric.calculate_async(trace, test_case)
        except Exception:
            return None  # Metrics are optional, don't fail on error

    metric_results = await asyncio.gather(*[run_metric(m) for m in self.metrics])

    for metric_result in metric_results:
        if metric_result is not None:
            result.add_metric(metric_result)

    # Finalize
    result.completed_at = datetime.now()
    result.duration_ms = (result.completed_at - started_at).total_seconds() * 1000

    return result

evaluate_suite

evaluate_suite(suite, agent_func, on_result=None, max_concurrent=0)

Evaluate all test cases in a suite (sync wrapper).

Parameters:

Name Type Description Default
suite EvalSuite

The test suite to evaluate.

required
agent_func Callable[[str], Trace] | Callable[[str], Awaitable[Trace]]

Function that takes input string and returns a Trace. Can be sync or async.

required
on_result Callable[[EvaluationResult], None] | None

Optional callback called after each test case.

None
max_concurrent int

Maximum concurrent tests. 0 = unlimited.

0

Returns:

Type Description
SuiteResult

SuiteResult with all evaluation results.

Source code in src/evaldeck/evaluator.py
def evaluate_suite(
    self,
    suite: EvalSuite,
    agent_func: Callable[[str], Trace] | Callable[[str], Awaitable[Trace]],
    on_result: Callable[[EvaluationResult], None] | None = None,
    max_concurrent: int = 0,
) -> SuiteResult:
    """Evaluate all test cases in a suite (sync wrapper).

    Args:
        suite: The test suite to evaluate.
        agent_func: Function that takes input string and returns a Trace.
            Can be sync or async.
        on_result: Optional callback called after each test case.
        max_concurrent: Maximum concurrent tests. 0 = unlimited.

    Returns:
        SuiteResult with all evaluation results.
    """
    return asyncio.run(self.evaluate_suite_async(suite, agent_func, on_result, max_concurrent))

evaluate_suite_async async

evaluate_suite_async(suite, agent_func, on_result=None, max_concurrent=0)

Evaluate all test cases in a suite concurrently.

Parameters:

Name Type Description Default
suite EvalSuite

The test suite to evaluate.

required
agent_func Callable[[str], Trace] | Callable[[str], Awaitable[Trace]]

Function that takes input string and returns a Trace. Can be sync or async.

required
on_result Callable[[EvaluationResult], None] | None

Optional callback called after each test case.

None
max_concurrent int

Maximum concurrent tests. 0 = unlimited.

0

Returns:

Type Description
SuiteResult

SuiteResult with all evaluation results.

Source code in src/evaldeck/evaluator.py
async def evaluate_suite_async(
    self,
    suite: EvalSuite,
    agent_func: Callable[[str], Trace] | Callable[[str], Awaitable[Trace]],
    on_result: Callable[[EvaluationResult], None] | None = None,
    max_concurrent: int = 0,
) -> SuiteResult:
    """Evaluate all test cases in a suite concurrently.

    Args:
        suite: The test suite to evaluate.
        agent_func: Function that takes input string and returns a Trace.
            Can be sync or async.
        on_result: Optional callback called after each test case.
        max_concurrent: Maximum concurrent tests. 0 = unlimited.

    Returns:
        SuiteResult with all evaluation results.
    """
    suite_result = SuiteResult(
        suite_name=suite.name,
        started_at=datetime.now(),
    )

    # Detect if agent is async
    is_async = asyncio.iscoroutinefunction(agent_func)

    # Create semaphore if limiting concurrency
    semaphore = asyncio.Semaphore(max_concurrent) if max_concurrent > 0 else None

    @asynccontextmanager
    async def maybe_semaphore() -> AsyncIterator[None]:
        """Context manager that optionally acquires semaphore."""
        if semaphore:
            async with semaphore:
                yield
        else:
            yield

    async def run_test(index: int, test_case: EvalCase) -> tuple[int, EvaluationResult]:
        """Run a single test case."""
        async with maybe_semaphore():
            result = await self._evaluate_single_async(test_case, agent_func, is_async)
            if on_result:
                on_result(result)
            return index, result

    # Run all tests concurrently
    tasks = [run_test(i, tc) for i, tc in enumerate(suite.test_cases)]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Add results in original order
    results_by_index: dict[int, EvaluationResult] = {}
    for item in results:
        if isinstance(item, BaseException):
            # This shouldn't happen since _evaluate_single_async catches exceptions
            continue
        idx, res = item
        results_by_index[idx] = res

    for i in range(len(suite.test_cases)):
        if i in results_by_index:
            suite_result.add_result(results_by_index[i])
        else:
            # Handle case where gather returned an exception
            suite_result.add_result(
                EvaluationResult(
                    test_case_name=suite.test_cases[i].name,
                    status=GradeStatus.ERROR,
                    error="Test execution failed unexpectedly",
                )
            )

    suite_result.completed_at = datetime.now()
    return suite_result

evaldeck.evaluator.EvaluationRunner

EvaluationRunner(config=None)

High-level runner for executing evaluations.

Initialize the runner.

Parameters:

Name Type Description Default
config EvaldeckConfig | None

Evaldeck configuration. If None, loads from file.

None
Source code in src/evaldeck/evaluator.py
def __init__(self, config: EvaldeckConfig | None = None) -> None:
    """Initialize the runner.

    Args:
        config: Evaldeck configuration. If None, loads from file.
    """
    if config is None:
        from evaldeck.config import EvaldeckConfig

        config = EvaldeckConfig.load()
    self.config = config
    self.evaluator = Evaluator(config=config)

run

run(suites=None, agent_func=None, tags=None, on_result=None, max_concurrent=None)

Run evaluation on multiple suites (sync wrapper).

Parameters:

Name Type Description Default
suites list[EvalSuite] | None

Test suites to run. If None, discovers from config.

None
agent_func Callable[[str], Trace] | Callable[[str], Awaitable[Trace]] | None

Function to run agent. If None, loads from config. Can be sync or async.

None
tags list[str] | None

Filter test cases by tags.

None
on_result Callable[[EvaluationResult], None] | None

Callback for each result.

None
max_concurrent int | None

Max concurrent tests per suite. None = use config.

None

Returns:

Type Description
RunResult

RunResult with all suite results.

Source code in src/evaldeck/evaluator.py
def run(
    self,
    suites: list[EvalSuite] | None = None,
    agent_func: Callable[[str], Trace] | Callable[[str], Awaitable[Trace]] | None = None,
    tags: list[str] | None = None,
    on_result: Callable[[EvaluationResult], None] | None = None,
    max_concurrent: int | None = None,
) -> RunResult:
    """Run evaluation on multiple suites (sync wrapper).

    Args:
        suites: Test suites to run. If None, discovers from config.
        agent_func: Function to run agent. If None, loads from config.
            Can be sync or async.
        tags: Filter test cases by tags.
        on_result: Callback for each result.
        max_concurrent: Max concurrent tests per suite. None = use config.

    Returns:
        RunResult with all suite results.
    """
    return asyncio.run(self.run_async(suites, agent_func, tags, on_result, max_concurrent))

run_async async

run_async(suites=None, agent_func=None, tags=None, on_result=None, max_concurrent=None)

Run evaluation on multiple suites asynchronously.

Parameters:

Name Type Description Default
suites list[EvalSuite] | None

Test suites to run. If None, discovers from config.

None
agent_func Callable[[str], Trace] | Callable[[str], Awaitable[Trace]] | None

Function to run agent. If None, loads from config. Can be sync or async.

None
tags list[str] | None

Filter test cases by tags.

None
on_result Callable[[EvaluationResult], None] | None

Callback for each result.

None
max_concurrent int | None

Max concurrent tests per suite. None = use config.

None

Returns:

Type Description
RunResult

RunResult with all suite results.

Source code in src/evaldeck/evaluator.py
async def run_async(
    self,
    suites: list[EvalSuite] | None = None,
    agent_func: Callable[[str], Trace] | Callable[[str], Awaitable[Trace]] | None = None,
    tags: list[str] | None = None,
    on_result: Callable[[EvaluationResult], None] | None = None,
    max_concurrent: int | None = None,
) -> RunResult:
    """Run evaluation on multiple suites asynchronously.

    Args:
        suites: Test suites to run. If None, discovers from config.
        agent_func: Function to run agent. If None, loads from config.
            Can be sync or async.
        tags: Filter test cases by tags.
        on_result: Callback for each result.
        max_concurrent: Max concurrent tests per suite. None = use config.

    Returns:
        RunResult with all suite results.
    """
    # Load suites if not provided
    if suites is None:
        suites = self._discover_suites()

    # Load agent function if not provided
    if agent_func is None:
        agent_func = self._load_agent_func()

    # Filter by tags if specified
    if tags:
        suites = [s.filter_by_tags(tags) for s in suites]

    # Determine worker count
    effective_max_concurrent = (
        max_concurrent if max_concurrent is not None else self.config.execution.workers
    )

    # Run evaluation
    run_result = RunResult(
        started_at=datetime.now(),
        config=self.config.model_dump(),
    )

    for suite in suites:
        if not suite.test_cases:
            continue

        suite_result = await self.evaluator.evaluate_suite_async(
            suite=suite,
            agent_func=agent_func,
            on_result=on_result,
            max_concurrent=effective_max_concurrent,
        )
        run_result.add_suite(suite_result)

    run_result.completed_at = datetime.now()
    return run_result