Plugin Interface Specification
Plugin Interface Specification
This document defines the REQUIRED contract for all bizsupply plugins and benchmarks.
Components that do not follow this specification will fail at registration or runtime.
Install the SDK
This is the first step. Everything below requires the SDK.
pip install bizsupply-sdkAll base classes, models, result types, and CLI tools are provided by the bizsupply-sdk package.
Quick Reference
| Requirement | Status | What Happens If Violated |
|---|---|---|
Install bizsupply-sdk | REQUIRED | Import errors |
| Inherit from correct base class | REQUIRED | Registration fails |
| Implement type-specific method(s) | REQUIRED | Runtime error |
Method must be async def (plugins only) | REQUIRED | Runtime error |
| Correct return type | REQUIRED | Job fails |
Use await for async calls | REQUIRED | Blocks event loop, timeout |
Plugin Types and Required Methods
| Plugin Type | Base Class | Required Method | Return Type |
|---|---|---|---|
| Source | SourcePlugin | fetch() + has_new_data() | AsyncIterator[DocumentInput] / bool |
| Classification | ClassificationPlugin | classify() | str | None |
| Extraction | ExtractionPlugin | extract() | ExtractionResult |
Benchmark Type
| Type | Base Class | Required Methods (sync) | Return Type |
|---|---|---|---|
| Benchmark | BaseBenchmark | score(), compute(), compare() | float | None, float, bool |
Base Class Inheritance (REQUIRED)
Every plugin MUST inherit from the appropriate base class:
from bizsupply_sdk import ClassificationPlugin
class MyClassifier(ClassificationPlugin):
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
...WRONG - Will Fail
# NO base class - WILL FAIL
class MyClassifier:
async def classify(self, ...):
pass
# Wrong base class - WILL FAIL
from bizsupply_sdk import ExtractionPlugin
class MyClassifier(ExtractionPlugin): # Should be ClassificationPlugin
async def classify(self, ...):
passClassification: classify() Method
Classification plugins implement classify() to categorize a document at a single level of the ontology hierarchy. The Engine calls this method once per level and handles tree traversal automatically.
Signature
async def classify(
self,
document: Document,
file_data: bytes | None,
mime_type: str | None,
available_labels: list[str],
current_path: list[str],
configs: dict[str, Any],
) -> str | None:Parameters
| Parameter | Type | Description |
|---|---|---|
document | Document | The document being classified. See Document model for all attributes. |
file_data | bytes | None | Raw file bytes. Engine pre-fetches once and passes to every classify() call at each level. None if the document has no file. |
mime_type | str | None | MIME type of the file (e.g., "application/pdf", "image/png"). None if no file. |
available_labels | list[str] | Ontology labels at this level of the tree (e.g., ["invoice", "contract", "receipt"]). Your response must be one of these, or None. |
current_path | list[str] | Labels already selected at previous levels (e.g., ["contract", "energy"]). Empty [] at the root level. |
configs | dict[str, Any] | Runtime configuration from configurable_parameters. Access values with configs.get("param_name"). |
Return Value
| Return | What Happens |
|---|---|
Label from available_labels | Engine continues traversal to children of that label |
Label NOT in available_labels | Tracked as llm_suggested, traversal stops |
None | Engine triggers suggestion workflow, traversal stops |
Example
from bizsupply_sdk import ClassificationPlugin, Document
class InvoiceClassifier(ClassificationPlugin):
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
path_str = " > ".join(current_path) if current_path else "Root"
result = await self.prompt_llm(
prompt=f"Path: {path_str}\nOptions: {available_labels}\nSelect the best category.",
file_data=file_data,
mime_type=mime_type,
)
return result.get("category") if result else NoneExtraction: extract() Method
Extraction plugins implement extract() to pull structured data from a classified document. The Engine resolves ontology fields based on the document's labels and injects them.
Signature
async def extract(
self,
document: Document,
file_data: bytes | None,
mime_type: str | None,
fields: list[OntologyField],
configs: dict[str, Any],
) -> ExtractionResult:Parameters
| Parameter | Type | Description |
|---|---|---|
document | Document | The document to extract from. Already classified (has document.labels). See Document model. |
file_data | bytes | None | Raw file bytes (Engine pre-fetches). None if the document has no file. |
mime_type | str | None | MIME type of the file. None if no file. |
fields | list[OntologyField] | Fields to extract, resolved by Engine from document.labels. Each field has .name, .dtype, .description. See OntologyField model. |
configs | dict[str, Any] | Runtime configuration from configurable_parameters. |
Return Value
Return an ExtractionResult. See ExtractionResult model for all fields.
ExtractionResult(
data={"invoice_total": 1500.00, "vendor_name": "ACME Corp"},
llm_fields=["invoice_total", "vendor_name"], # Optional: track which fields came from LLM
document_type="invoice", # Optional: for analytics
)Example
from bizsupply_sdk import ExtractionPlugin, ExtractionResult, Document
class InvoiceExtractor(ExtractionPlugin):
async def extract(self, document, file_data, mime_type, fields, configs):
if not fields:
return ExtractionResult(data={})
fields_json = self.format_fields_for_prompt(fields)
result = await self.prompt_llm(
prompt=f"Extract these fields: {fields_json}",
file_data=file_data,
mime_type=mime_type,
)
return ExtractionResult(
data=result or {},
llm_fields=list(result.keys()) if result else None,
)Source: fetch() and has_new_data() Methods
Source plugins implement fetch() to ingest documents from external systems, and has_new_data() for auto-sync support. The Engine handles credentials, state persistence, and document creation.
fetch() Signature
async def fetch(
self,
credentials: DynamicCredential,
state: YourStateModel,
configs: dict[str, Any],
) -> AsyncIterator[DocumentInput]:has_new_data() Signature
async def has_new_data(
self,
credentials: DynamicCredential,
state: YourStateModel,
configs: dict[str, Any],
) -> bool:Parameters (both methods share the same parameters)
| Parameter | Type | Description |
|---|---|---|
credentials | DynamicCredential | Credentials injected by Engine from secure storage. Access via attribute (credentials.api_key) or .get("field_name", default). See DynamicCredential model for all methods. |
state | Your BaseSourceState subclass | Typed state model loaded from database by Engine. Mutate fields directly (state.cursor = "123") - Engine auto-saves after each yielded document. See BaseSourceState model. |
configs | dict[str, Any] | Runtime configuration from configurable_parameters. |
Required Class Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
source_type | str | Yes | Unique identifier for this source (e.g., "gmail", "salesforce") |
source_state_model | type[BaseSourceState] | Yes | Your state model class (must inherit BaseSourceState) |
credential_fields | list[str] | No | Credential field names users must configure (e.g., ["api_key", "api_url"]) |
configurable_parameters | list[dict] | No | Runtime-configurable parameters |
Example
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential
class MySourceState(BaseSourceState):
cursor: str | None = None
class MySource(SourcePlugin):
source_type = "my_api"
source_state_model = MySourceState
credential_fields = ["api_key", "api_url"]
async def fetch(self, credentials, state, configs):
api_key = credentials.api_key
yield DocumentInput(
file_data=b"...",
filename="document.pdf",
metadata={"source_id": "123"},
)
state.cursor = "123" # Engine auto-saves
async def has_new_data(self, credentials, state, configs):
return True # Check if source has new itemsBenchmark: score(), compute(), compare() Methods
Benchmarks are synchronous (no async/await) and calculate scores to compare documents. The Engine handles document retrieval, aggregation pre-fetching, slot-to-field resolution, and score persistence.
Required Class Attributes
| Attribute | Type | Required | Description |
|---|---|---|---|
name | str | Yes | Unique benchmark identifier (e.g., "energy_contract_price_portugal") |
target_labels | list[str] | Yes | Document labels this benchmark applies to (e.g., ["contract", "energy"]) |
metric_unit | str | Yes | Unit of measurement (e.g., "EUR/kWh", "%", "days") |
group_by | list[str] | No | Field names to group documents by (e.g., ["region"]). Default: [] |
MATCH_RULES | list[MatchRule] | No | Aggregation rules for linking related documents. See MatchRule model. |
score() - Score a Single Document
def score(self, document: ExtendedDocument) -> float | None:| Parameter | Type | Description |
|---|---|---|
document | ExtendedDocument | Document with semantic field access and aggregations. See ExtendedDocument model. |
Returns: float (the score) or None (document cannot be scored, Engine skips it).
compute() - Compute Benchmark Value
def compute(self, results: list[ScoredDocument]) -> float:| Parameter | Type | Description |
|---|---|---|
results | list[ScoredDocument] | Non-empty list of scored documents. Each has .document (ExtendedDocument) and .score (float). See ScoredDocument model. |
Returns: float - Single benchmark value (e.g., min price, average score).
compare() - Compare Score to Benchmark
def compare(self, document_score: float, benchmark_score: float) -> bool:| Parameter | Type | Description |
|---|---|---|
document_score | float | The document's calculated score from score() |
benchmark_score | float | The benchmark value from compute() |
Returns: bool - True if the document scores unfavorably (requires action).
Example
from bizsupply_sdk import BaseBenchmark, ExtendedDocument, ScoredDocument, MatchRule, MatchCondition
class EnergyPriceBenchmark(BaseBenchmark):
name = "energy_contract_price_portugal"
target_labels = ["contract", "energy"]
metric_unit = "EUR/kWh"
group_by = ["region"]
MATCH_RULES = [
MatchRule(
name="contract_invoice_match",
left_group=["contract", "energy"],
right_group=["invoice", "energy"],
conditions=[
MatchCondition(
left_field="client_tax_id",
right_field="client_tax_id",
match_type="==",
),
],
),
]
def score(self, document):
# document.aggregations = related invoices (linked by MATCH_RULES)
prices = [inv.get("price_per_kwh") for inv in document.aggregations]
prices = [p for p in prices if p is not None]
if not prices:
return None
return sum(prices) / len(prices)
def compute(self, results):
# results = list of ScoredDocument (each has .document and .score)
return min(r.score for r in results)
def compare(self, document_score, benchmark_score):
# True = document is worse than benchmark (higher price)
return document_score > benchmark_scoreKey Differences from Plugins
| Aspect | Plugins | Benchmarks |
|---|---|---|
| Async | async def required | Sync def (no async/await) |
| Services | prompt_llm(), get_prompt() | None (pure calculation, no I/O) |
| Input model | Document | ExtendedDocument (has .aggregations, .get()) |
| Persistence | Return results, Engine persists | Return floats/bools, Engine builds score records |
Async/Await (REQUIRED for Plugins)
CRITICAL: Plugin service methods are async and MUST use await. Forgetting await will cause your plugin to fail or hang.
# CORRECT - using await
result = await self.prompt_llm(prompt="Classify this document")
template = await self.get_prompt(prompt_id)
# WRONG - missing await (will return coroutine, not result)
result = self.prompt_llm(prompt="Classify this document") # Returns coroutine!Async Methods (Must Use await)
await)| Method | Returns |
|---|---|
await self.prompt_llm(prompt, file_data=None, mime_type=None, schema=None, model_name=None) | dict | list | None |
await self.get_prompt(prompt_id) | str |
The model_name parameter in prompt_llm() allows you to override the default LLM model for a specific call (e.g., model_name="gemini-2.0-flash").
Sync Methods (No await)
await)| Method | Returns |
|---|---|
self.format_fields_for_prompt(fields) | str |
self.logger.info(msg) | None |
Note: Benchmark methods (score, compute, compare) are all synchronous.
Configurable Parameters
Plugins and benchmarks can declare runtime-configurable parameters as class attributes. Each parameter entry has these fields:
| Field | Type | Description |
|---|---|---|
parameter_name | str | Name used to retrieve the value from configs dict |
parameter_type | str | Type hint for the UI: "str", "int", "float", "bool" |
default_value | Any | Default value when user doesn't override |
description | str | Human-readable description shown in platform UI |
from bizsupply_sdk import ClassificationPlugin, ConfigurableParameter
class MyClassifier(ClassificationPlugin):
configurable_parameters = [
ConfigurableParameter(
parameter_name="classification_prompt_id",
parameter_type="str",
default_value=None,
description="Prompt ID for classification",
),
ConfigurableParameter(
parameter_name="confidence_threshold",
parameter_type="float",
default_value=0.8,
description="Minimum confidence score",
),
]
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
prompt_id = configs.get("classification_prompt_id")
threshold = configs.get("confidence_threshold", 0.8)
...Common Mistakes
1. No Base Class
# WRONG
class ComplianceClassifier:
async def classify(self, ...):
pass
# CORRECT
from bizsupply_sdk import ClassificationPlugin
class ComplianceClassifier(ClassificationPlugin):
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
...2. Wrong Method Name
# WRONG - using old v1.0 method name
class MyPlugin(ClassificationPlugin):
async def execute(self, context): # Wrong!
pass
# CORRECT - use type-specific method
class MyPlugin(ClassificationPlugin):
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
...3. Wrong Return Type
# WRONG - classify() must return str | None, not list
return ["invoice", "utility"]
# CORRECT
return "invoice" # Single label or None
# WRONG - extract() must return ExtractionResult, not dict
return {"invoice_total": 1500.00}
# CORRECT
return ExtractionResult(data={"invoice_total": 1500.00})4. Missing SDK Import
# WRONG - no import
class MyPlugin(ClassificationPlugin): # NameError!
...
# CORRECT
from bizsupply_sdk import ClassificationPlugin
class MyPlugin(ClassificationPlugin):
...5. Synchronous Plugin Method
# WRONG - Not async (for plugins)
def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
...
# CORRECT - Async
async def classify(self, document, file_data, mime_type, available_labels, current_path, configs):
...6. Using Async in Benchmarks
# WRONG - benchmarks must be synchronous
async def score(self, document):
...
# CORRECT - sync
def score(self, document):
...Validation
Use the SDK CLI to validate your code before registering:
bizsupply validate my_plugin.py
bizsupply validate my_benchmark.pyThe validator checks:
- Correct base class inheritance
- Required method name(s) and signature
async deffor plugins /deffor benchmarks- Correct return type annotations
- SDK import presence
Full Validation Checklist
-
bizsupply-sdkinstalled (pip install bizsupply-sdk) - Class imports from
bizsupply_sdk - Class inherits from correct base class
- Correct method name(s)
- Method is
async def(plugins) ordef(benchmarks) - Correct return type
- All async operations use
await - Uses
self.loggerfor logging (notprint) - Configuration accessed via
configsparameter -
bizsupply validate <file>passes
Next Steps
- Plugin Service API - All service methods and complete data model reference
- Create a Plugin - Step-by-step plugin development guide
Updated about 1 month ago