Create a Benchmark

Create a Benchmark

Goal: Score documents, compare metrics, and flag outliers across your document collection.

Use When: You need to calculate a metric per document (e.g., price per kWh), compute a reference value across documents, and compare each document against that reference.


What Benchmarks Do

Benchmarks are pure calculation -- no async, no services, no I/O. The Engine handles everything else.

Engine resolves fields     score() per doc     compute() once     compare() per doc
 (slot → semantic name)  →  ExtendedDocument  →  ScoredDocuments  →  bool (unfavorable?)
                             + aggregations       → single float
ResponsibilityOwner
Query documents from databaseEngine
Resolve slot names to semantic field namesEngine
Pre-fetch aggregations and attach to documentEngine
Calculate a score for one documentYou
Reduce scores to a single benchmark valueYou
Compare each document against the benchmarkYou
Persist score recordsEngine

Key difference from plugins: Benchmarks are synchronous and have no access to prompt_llm() or any services. They receive ExtendedDocument objects (not Document) with pre-resolved semantic field names.


Prerequisites

Before starting:


Step 1: Scaffold

Use the CLI to generate a starting template:

bizsupply init benchmark --name energy_price

This creates energy_price.py with a ready-to-edit benchmark class:

from bizsupply_sdk import (
    BaseBenchmark,
    ExtendedDocument,
    ScoredDocument,
    MatchCondition,
    MatchRule,
)


class EnergyPrice(BaseBenchmark):
    """Benchmark implementation."""

    name = "energy_price"
    target_labels = ["contract"]      # TODO: Set target labels
    metric_unit = "EUR"               # TODO: Set metric unit
    group_by = []                     # TODO: Set grouping dimensions

    MATCH_RULES = [
        MatchRule(
            name="energy_price_match",
            left_group=["contract"],   # TODO: Set source document labels
            right_group=["invoice"],   # TODO: Set target document labels
            conditions=[
                MatchCondition(
                    left_field="client_tax_id",
                    right_field="client_tax_id",
                    match_type="==",
                ),
            ],
            description="Match contracts to invoices by client tax ID",
        ),
    ]

    def score(self, document: ExtendedDocument) -> float | None:
        # TODO: Implement scoring logic
        ...

    def compute(self, results: list[ScoredDocument]) -> float:
        # TODO: Choose computation method
        ...

    def compare(self, document_score: float, benchmark_score: float) -> bool:
        # TODO: Implement comparison logic
        ...

Step 2: Identity Properties

Every benchmark must define these class attributes:

PropertyTypeDescriptionExample
namestrUnique identifier"energy_contract_price_portugal"
target_labelslist[str]Which documents to score (ontology labels)["contract", "energy"]
metric_unitstrUnit of measurement"EUR/kWh"
group_bylist[str]Optional grouping dimensions["region"]
class EnergyContractPrice(BaseBenchmark):
    name = "energy_contract_price_portugal"
    target_labels = ["contract", "energy"]
    metric_unit = "EUR/kWh"
    group_by = ["region"]

The Engine uses target_labels to select which documents to score and group_by to partition scores into separate benchmarks.


Step 3: Aggregation Rules

Aggregation rules tell the Engine which related documents to attach to each scored document. For example, an energy contract benchmark might need invoices linked by client tax ID and delivery point.

Define rules as a MATCH_RULES class attribute:

from bizsupply_sdk import MatchRule, MatchCondition

class EnergyContractPrice(BaseBenchmark):
    # ... identity properties ...

    MATCH_RULES = [
        MatchRule(
            name="contract_invoice_match",
            left_group=["contract", "energy"],
            right_group=["invoice", "energy"],
            conditions=[
                MatchCondition(
                    left_field="client_tax_id",
                    right_field="client_tax_id",
                    match_type="==",
                ),
                MatchCondition(
                    left_field="cpe_point_of_delivery",
                    right_field="cpe_point_of_delivery",
                    match_type="==",
                ),
            ],
            description="Match energy contracts to invoices by client and CPE",
        ),
    ]

How it works: The Engine finds all right_group documents where ALL conditions match (AND logic), then attaches them as document.aggregations.

MatchCondition Operators

OperatorDescriptionExample
==EqualSame client tax ID
!=Not equalDifferent supplier
<Less thanStart date before end date
<=Less than or equalScore at most threshold
>Greater thanAmount exceeds minimum
>=Greater than or equalEnd date on or after emission
containsLeft contains right (string)Address contains postal code
starts_withLeft starts with right (string)Reference starts with prefix

All field names use semantic names (e.g., client_tax_id, price_per_kwh), not slot names. The Engine resolves semantic names to physical slots automatically.


Step 4: Implement score()

score() calculates a score for a single document. The document arrives as an ExtendedDocument with its aggregations already attached.

ExtendedDocument API

# Access fields by semantic name
price = document.get("price_per_kwh")        # Returns value or None
supplier = document.get("supplier")

# Access related documents
for invoice in document.aggregations:
    inv_price = invoice.get("price_per_kwh")

# Escape hatch for direct dict access
raw_data = document.raw

Full Example

def score(self, document: ExtendedDocument) -> float | None:
    """Calculate average price per kWh from linked invoices."""
    if not document.aggregations:
        return None

    prices = [inv.get("price_per_kwh") for inv in document.aggregations]
    prices = [p for p in prices if p is not None]

    if not prices:
        return None

    return sum(prices) / len(prices)

Return None when a document cannot be scored (missing data, no aggregations). The Engine skips None-scored documents.


Step 5: Implement compute()

compute() reduces all scored documents to a single benchmark value. It receives a list of ScoredDocument objects, each pairing a document with its score.

ScoredDocument

AttributeTypeDescription
.documentExtendedDocumentThe original document (with aggregations)
.scorefloatThe score calculated by score()

Patterns

# Minimum (best/lowest price)
def compute(self, results: list[ScoredDocument]) -> float:
    return min(r.score for r in results)

# Maximum (best/highest rating)
def compute(self, results: list[ScoredDocument]) -> float:
    return max(r.score for r in results)

# Weighted average (by supplier volume)
def compute(self, results: list[ScoredDocument]) -> float:
    total_weight = sum(r.document.get("volume") or 1 for r in results)
    weighted = sum(r.score * (r.document.get("volume") or 1) for r in results)
    return weighted / total_weight

Full Example

def compute(self, results: list[ScoredDocument]) -> float:
    """Benchmark is the lowest average price across all contracts."""
    return min(r.score for r in results)

Step 6: Implement compare()

compare() determines whether a document's score is unfavorable compared to the benchmark. Returns True if the document needs attention.

Patterns

# Lower is better (price benchmarks)
def compare(self, document_score: float, benchmark_score: float) -> bool:
    return document_score > benchmark_score

# Higher is better (quality/rating benchmarks)
def compare(self, document_score: float, benchmark_score: float) -> bool:
    return document_score < benchmark_score

# Threshold (flag if deviation exceeds 10%)
def compare(self, document_score: float, benchmark_score: float) -> bool:
    return abs(document_score - benchmark_score) / benchmark_score > 0.10

Full Example

def compare(self, document_score: float, benchmark_score: float) -> bool:
    """Flag contracts paying more than the best price."""
    return document_score > benchmark_score

Step 7: Validate and Register

Validate

Run the SDK validator to check your benchmark before registering:

bizsupply validate energy_price.py

The validator checks:

  • Inherits from BaseBenchmark
  • Implements all three methods (score, compute, compare)
  • Methods are synchronous (not async def)
  • Required class attributes are defined (name, target_labels, metric_unit)

Register

curl -X POST "https://api.bizsupply.com/api/v1/plugins" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -F "name=Energy Contract Price" \
  -F "description=Benchmarks energy contract prices against lowest invoice average" \
  -F "code_file=@energy_price.py"

The benchmark type and identity properties are automatically extracted from your code.


Complete Example

A full benchmark that scores energy contracts by average invoice price:

"""Benchmark: Energy contract price comparison."""

from bizsupply_sdk import (
    BaseBenchmark,
    ExtendedDocument,
    ScoredDocument,
    MatchCondition,
    MatchRule,
)


class EnergyContractPrice(BaseBenchmark):
    """Compare energy contract prices against the best available rate.

    Scores each contract by the average price_per_kwh from its linked
    invoices, computes the benchmark as the lowest average, and flags
    contracts paying above the best price.
    """

    name = "energy_contract_price_portugal"
    target_labels = ["contract", "energy"]
    metric_unit = "EUR/kWh"
    group_by = ["region"]

    MATCH_RULES = [
        MatchRule(
            name="contract_invoice_match",
            left_group=["contract", "energy"],
            right_group=["invoice", "energy"],
            conditions=[
                MatchCondition(
                    left_field="client_tax_id",
                    right_field="client_tax_id",
                    match_type="==",
                ),
                MatchCondition(
                    left_field="cpe_point_of_delivery",
                    right_field="cpe_point_of_delivery",
                    match_type="==",
                ),
            ],
            description="Match energy contracts to invoices by client and CPE",
        ),
    ]

    def score(self, document: ExtendedDocument) -> float | None:
        """Average price per kWh from linked invoices."""
        if not document.aggregations:
            return None

        prices = [inv.get("price_per_kwh") for inv in document.aggregations]
        prices = [p for p in prices if p is not None]

        if not prices:
            return None

        return sum(prices) / len(prices)

    def compute(self, results: list[ScoredDocument]) -> float:
        """Benchmark is the lowest average price."""
        return min(r.score for r in results)

    def compare(self, document_score: float, benchmark_score: float) -> bool:
        """Flag contracts paying above the best price."""
        return document_score > benchmark_score

Key Concepts Reference

ExtendedDocument

Method / PropertyReturnsDescription
.get("field_name")Any | NoneGet field value by semantic name
.aggregationslist[ExtendedDocument]Related documents matched by MATCH_RULES
.rawdict[str, Any]Direct dict access (escape hatch)

ScoredDocument

AttributeTypeDescription
.documentExtendedDocumentThe scored document with aggregations
.scorefloatScore calculated by score()

MatchCondition Operators

Operatorleft vs rightNone handling
==EqualNone == None returns False
!=Not equalNone != None returns False
<Left less than rightEither None returns False
<=Left less than or equalEither None returns False
>Left greater than rightEither None returns False
>=Left greater than or equalEither None returns False
containsLeft string contains rightEither None returns False
starts_withLeft string starts with rightEither None returns False

Common Mistakes

Using async/await

# WRONG - benchmarks are synchronous
async def score(self, document):
    return await some_calculation()

# CORRECT - plain def, no await
def score(self, document):
    return some_calculation()

Using Document Instead of ExtendedDocument

# WRONG - Document is for plugins, not benchmarks
from bizsupply_sdk import Document

def score(self, document: Document) -> float | None:
    return document.data.get("price")

# CORRECT - benchmarks receive ExtendedDocument
from bizsupply_sdk import ExtendedDocument

def score(self, document: ExtendedDocument) -> float | None:
    return document.get("price")

Using Slot Names Instead of Semantic Names

# WRONG - slot names (the Engine resolves these for you)
price = document.get("number_1")

# CORRECT - semantic names (from your ontology)
price = document.get("price_per_kwh")

Not Handling None Values

# WRONG - crashes if any invoice has no price
def score(self, document):
    prices = [inv.get("price_per_kwh") for inv in document.aggregations]
    return sum(prices) / len(prices)  # TypeError if None in list

# CORRECT - filter None values
def score(self, document):
    prices = [inv.get("price_per_kwh") for inv in document.aggregations]
    prices = [p for p in prices if p is not None]
    if not prices:
        return None
    return sum(prices) / len(prices)

Missing MATCH_RULES

# WRONG - no aggregation rules, document.aggregations will be empty
class MyBenchmark(BaseBenchmark):
    name = "my_benchmark"
    target_labels = ["contract"]
    metric_unit = "EUR"

    def score(self, document):
        # document.aggregations is always [] without MATCH_RULES
        for inv in document.aggregations:  # Never executes
            ...

# CORRECT - define MATCH_RULES to link related documents
class MyBenchmark(BaseBenchmark):
    name = "my_benchmark"
    target_labels = ["contract"]
    metric_unit = "EUR"

    MATCH_RULES = [
        MatchRule(
            name="contract_invoice_match",
            left_group=["contract"],
            right_group=["invoice"],
            conditions=[
                MatchCondition(
                    left_field="client_tax_id",
                    right_field="client_tax_id",
                    match_type="==",
                ),
            ],
        ),
    ]

Next Steps