Plugin Interface Specification

Plugin Interface Specification

This document defines the REQUIRED contract for all bizsupply plugins and benchmarks.

Components that do not follow this specification will fail at registration or runtime.


Install the SDK

This is the first step. Everything below requires the SDK.

pip install bizsupply-sdk

All base classes, models, result types, and CLI tools are provided by the bizsupply-sdk package.


Quick Reference

RequirementStatusWhat Happens If Violated
Install bizsupply-sdkREQUIREDImport errors
Inherit from correct base classREQUIREDRegistration fails
Implement type-specific method(s)REQUIREDRuntime error
Method must be async def (plugins only)REQUIREDRuntime error
Correct return typeREQUIREDJob fails
Use await for async callsREQUIREDBlocks event loop, timeout

Plugin Types and Required Methods

Plugin TypeBase ClassRequired MethodReturn Type
SourceSourcePluginfetch() + has_new_data()AsyncIterator[DocumentInput] / bool
ClassificationClassificationPluginclassify()str | None
ExtractionExtractionPluginextract()ExtractionResult

Benchmark Type

TypeBase ClassRequired Methods (sync)Return Type
BenchmarkBaseBenchmarkscore(), compute(), compare()float | None, float, bool

Base Class Inheritance (REQUIRED)

Every plugin MUST inherit from the appropriate base class:

from bizsupply_sdk import ClassificationPlugin

class MyClassifier(ClassificationPlugin):
    async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
        ...

WRONG - Will Fail

# NO base class - WILL FAIL
class MyClassifier:
    async def classify(self, ...):
        pass

# Wrong base class - WILL FAIL
from bizsupply_sdk import ExtractionPlugin

class MyClassifier(ExtractionPlugin):  # Should be ClassificationPlugin
    async def classify(self, ...):
        pass

Classification: classify() Method

Classification plugins implement classify() to categorize a document at a single level of the ontology hierarchy. The Engine calls this method once per level and handles tree traversal automatically.

Signature

async def classify(
    self,
    document: Document,
    file_data: bytes | None,
    mime_type: str | None,
    available_labels: list[str],
    current_path: list[str],
    configs: dict[str, Any],
) -> str | None:

Parameters

ParameterTypeDescription
documentDocumentThe document being classified. See Document model for all attributes.
file_databytes | NoneRaw file bytes. Engine pre-fetches once and passes to every classify() call at each level. None if the document has no file.
mime_typestr | NoneMIME type of the file (e.g., "application/pdf", "image/png"). None if no file.
available_labelslist[str]Ontology labels at this level of the tree (e.g., ["invoice", "contract", "receipt"]). Your response must be one of these, or None.
current_pathlist[str]Labels already selected at previous levels (e.g., ["contract", "energy"]). Empty [] at the root level.
configsdict[str, Any]Runtime configuration from configurable_parameters. Access values with configs.get("param_name").

Return Value

ReturnWhat Happens
Label from available_labelsEngine continues traversal to children of that label
Label NOT in available_labelsTracked as llm_suggested, traversal stops
NoneEngine triggers suggestion workflow, traversal stops

Example

from bizsupply_sdk import ClassificationPlugin, Document

class InvoiceClassifier(ClassificationPlugin):
    async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
        path_str = " > ".join(current_path) if current_path else "Root"

        result = await self.prompt_llm(
            prompt=f"Path: {path_str}\nOptions: {available_labels}\nSelect the best category.",
            file_data=file_data,
            mime_type=mime_type,
        )

        return result.get("category") if result else None

Extraction: extract() Method

Extraction plugins implement extract() to pull structured data from a classified document. The Engine resolves ontology fields based on the document's labels and injects them.

Signature

async def extract(
    self,
    document: Document,
    file_data: bytes | None,
    mime_type: str | None,
    fields: list[OntologyField],
    configs: dict[str, Any],
) -> ExtractionResult:

Parameters

ParameterTypeDescription
documentDocumentThe document to extract from. Already classified (has document.labels). See Document model.
file_databytes | NoneRaw file bytes (Engine pre-fetches). None if the document has no file.
mime_typestr | NoneMIME type of the file. None if no file.
fieldslist[OntologyField]Fields to extract, resolved by Engine from document.labels. Each field has .name, .dtype, .description. See OntologyField model.
configsdict[str, Any]Runtime configuration from configurable_parameters.

Return Value

Return an ExtractionResult. See ExtractionResult model for all fields.

ExtractionResult(
    data={"invoice_total": 1500.00, "vendor_name": "ACME Corp"},
    llm_fields=["invoice_total", "vendor_name"],  # Optional: track which fields came from LLM
    document_type="invoice",  # Optional: for analytics
)

Example

from bizsupply_sdk import ExtractionPlugin, ExtractionResult, Document

class InvoiceExtractor(ExtractionPlugin):
    async def extract(self, document, file_data, mime_type, fields, configs):
        if not fields:
            return ExtractionResult(data={})

        fields_json = self.format_fields_for_prompt(fields)

        result = await self.prompt_llm(
            prompt=f"Extract these fields: {fields_json}",
            file_data=file_data,
            mime_type=mime_type,
        )

        return ExtractionResult(
            data=result or {},
            llm_fields=list(result.keys()) if result else None,
        )

Source: fetch() and has_new_data() Methods

Source plugins implement fetch() to ingest documents from external systems, and has_new_data() for auto-sync support. The Engine handles credentials, state persistence, and document creation.

fetch() Signature

async def fetch(
    self,
    credentials: DynamicCredential,
    state: YourStateModel,
    configs: dict[str, Any],
) -> AsyncIterator[DocumentInput]:

has_new_data() Signature

async def has_new_data(
    self,
    credentials: DynamicCredential,
    state: YourStateModel,
    configs: dict[str, Any],
) -> bool:

Parameters (both methods share the same parameters)

ParameterTypeDescription
credentialsDynamicCredentialCredentials injected by Engine from secure storage. Access via attribute (credentials.api_key) or .get("field_name", default). See DynamicCredential model for all methods.
stateYour BaseSourceState subclassTyped state model loaded from database by Engine. Mutate fields directly (state.cursor = "123") - Engine auto-saves after each yielded document. See BaseSourceState model.
configsdict[str, Any]Runtime configuration from configurable_parameters.

Required Class Attributes

AttributeTypeRequiredDescription
source_typestrYesUnique identifier for this source (e.g., "gmail", "salesforce")
source_state_modeltype[BaseSourceState]YesYour state model class (must inherit BaseSourceState)
credential_fieldslist[str]NoCredential field names users must configure (e.g., ["api_key", "api_url"])
configurable_parameterslist[dict]NoRuntime-configurable parameters

Example

from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential

class MySourceState(BaseSourceState):
    cursor: str | None = None

class MySource(SourcePlugin):
    source_type = "my_api"
    source_state_model = MySourceState
    credential_fields = ["api_key", "api_url"]

    async def fetch(self, credentials, state, configs):
        api_key = credentials.api_key

        yield DocumentInput(
            file_data=b"...",
            filename="document.pdf",
            metadata={"source_id": "123"},
        )
        state.cursor = "123"  # Engine auto-saves

    async def has_new_data(self, credentials, state, configs):
        return True  # Check if source has new items

Benchmark: score(), compute(), compare() Methods

Benchmarks are synchronous (no async/await) and calculate scores to compare documents. The Engine handles document retrieval, aggregation pre-fetching, slot-to-field resolution, and score persistence.

Required Class Attributes

AttributeTypeRequiredDescription
namestrYesUnique benchmark identifier (e.g., "energy_contract_price_portugal")
target_labelslist[str]YesDocument labels this benchmark applies to (e.g., ["contract", "energy"])
metric_unitstrYesUnit of measurement (e.g., "EUR/kWh", "%", "days")
group_bylist[str]NoField names to group documents by (e.g., ["region"]). Default: []
MATCH_RULESlist[MatchRule]NoAggregation rules for linking related documents. See MatchRule model.

score() - Score a Single Document

def score(self, document: ExtendedDocument) -> float | None:
ParameterTypeDescription
documentExtendedDocumentDocument with semantic field access and aggregations. See ExtendedDocument model.

Returns: float (the score) or None (document cannot be scored, Engine skips it).

compute() - Compute Benchmark Value

def compute(self, results: list[ScoredDocument]) -> float:
ParameterTypeDescription
resultslist[ScoredDocument]Non-empty list of scored documents. Each has .document (ExtendedDocument) and .score (float). See ScoredDocument model.

Returns: float - Single benchmark value (e.g., min price, average score).

compare() - Compare Score to Benchmark

def compare(self, document_score: float, benchmark_score: float) -> bool:
ParameterTypeDescription
document_scorefloatThe document's calculated score from score()
benchmark_scorefloatThe benchmark value from compute()

Returns: bool - True if the document scores unfavorably (requires action).

Example

from bizsupply_sdk import BaseBenchmark, ExtendedDocument, ScoredDocument, MatchRule, MatchCondition

class EnergyPriceBenchmark(BaseBenchmark):
    name = "energy_contract_price_portugal"
    target_labels = ["contract", "energy"]
    metric_unit = "EUR/kWh"
    group_by = ["region"]

    MATCH_RULES = [
        MatchRule(
            name="contract_invoice_match",
            left_group=["contract", "energy"],
            right_group=["invoice", "energy"],
            conditions=[
                MatchCondition(
                    left_field="client_tax_id",
                    right_field="client_tax_id",
                    match_type="==",
                ),
            ],
        ),
    ]

    def score(self, document):
        # document.aggregations = related invoices (linked by MATCH_RULES)
        prices = [inv.get("price_per_kwh") for inv in document.aggregations]
        prices = [p for p in prices if p is not None]
        if not prices:
            return None
        return sum(prices) / len(prices)

    def compute(self, results):
        # results = list of ScoredDocument (each has .document and .score)
        return min(r.score for r in results)

    def compare(self, document_score, benchmark_score):
        # True = document is worse than benchmark (higher price)
        return document_score > benchmark_score

Key Differences from Plugins

AspectPluginsBenchmarks
Asyncasync def requiredSync def (no async/await)
Servicesprompt_llm(), get_prompt()None (pure calculation, no I/O)
Input modelDocumentExtendedDocument (has .aggregations, .get())
PersistenceReturn results, Engine persistsReturn floats/bools, Engine builds score records

Async/Await (REQUIRED for Plugins)

CRITICAL: Plugin service methods are async and MUST use await. Forgetting await will cause your plugin to fail or hang.

# CORRECT - using await
result = await self.prompt_llm(prompt="Classify this document")
template = await self.get_prompt(prompt_id)

# WRONG - missing await (will return coroutine, not result)
result = self.prompt_llm(prompt="Classify this document")  # Returns coroutine!

Async Methods (Must Use await)

MethodReturns
await self.prompt_llm(prompt, file_data=None, mime_type=None, schema=None, model_name=None)dict | list | None
await self.get_prompt(prompt_id)str

The model_name parameter in prompt_llm() allows you to override the default LLM model for a specific call (e.g., model_name="gemini-2.0-flash").

Sync Methods (No await)

MethodReturns
self.format_fields_for_prompt(fields)str
self.logger.info(msg)None

Note: Benchmark methods (score, compute, compare) are all synchronous.


Configurable Parameters

Plugins and benchmarks can declare runtime-configurable parameters as class attributes. Each parameter entry has these fields:

FieldTypeDescription
parameter_namestrName used to retrieve the value from configs dict
parameter_typestrType hint for the UI: "str", "int", "float", "bool"
default_valueAnyDefault value when user doesn't override
descriptionstrHuman-readable description shown in platform UI
from bizsupply_sdk import ClassificationPlugin, ConfigurableParameter

class MyClassifier(ClassificationPlugin):
    configurable_parameters = [
        ConfigurableParameter(
            parameter_name="classification_prompt_id",
            parameter_type="str",
            default_value=None,
            description="Prompt ID for classification",
        ),
        ConfigurableParameter(
            parameter_name="confidence_threshold",
            parameter_type="float",
            default_value=0.8,
            description="Minimum confidence score",
        ),
    ]

    async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
        prompt_id = configs.get("classification_prompt_id")
        threshold = configs.get("confidence_threshold", 0.8)
        ...

Common Mistakes

1. No Base Class

# WRONG
class ComplianceClassifier:
    async def classify(self, ...):
        pass

# CORRECT
from bizsupply_sdk import ClassificationPlugin

class ComplianceClassifier(ClassificationPlugin):
    async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
        ...

2. Wrong Method Name

# WRONG - using old v1.0 method name
class MyPlugin(ClassificationPlugin):
    async def execute(self, context):  # Wrong!
        pass

# CORRECT - use type-specific method
class MyPlugin(ClassificationPlugin):
    async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
        ...

3. Wrong Return Type

# WRONG - classify() must return str | None, not list
return ["invoice", "utility"]

# CORRECT
return "invoice"  # Single label or None

# WRONG - extract() must return ExtractionResult, not dict
return {"invoice_total": 1500.00}

# CORRECT
return ExtractionResult(data={"invoice_total": 1500.00})

4. Missing SDK Import

# WRONG - no import
class MyPlugin(ClassificationPlugin):  # NameError!
    ...

# CORRECT
from bizsupply_sdk import ClassificationPlugin

class MyPlugin(ClassificationPlugin):
    ...

5. Synchronous Plugin Method

# WRONG - Not async (for plugins)
def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
    ...

# CORRECT - Async
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
    ...

6. Using Async in Benchmarks

# WRONG - benchmarks must be synchronous
async def score(self, document):
    ...

# CORRECT - sync
def score(self, document):
    ...

Validation

Use the SDK CLI to validate your code before registering:

bizsupply validate my_plugin.py
bizsupply validate my_benchmark.py

The validator checks:

  • Correct base class inheritance
  • Required method name(s) and signature
  • async def for plugins / def for benchmarks
  • Correct return type annotations
  • SDK import presence

Full Validation Checklist

  • bizsupply-sdk installed (pip install bizsupply-sdk)
  • Class imports from bizsupply_sdk
  • Class inherits from correct base class
  • Correct method name(s)
  • Method is async def (plugins) or def (benchmarks)
  • Correct return type
  • All async operations use await
  • Uses self.logger for logging (not print)
  • Configuration accessed via configs parameter
  • bizsupply validate <file> passes

Next Steps