Create a Source Plugin
Create a Source Plugin
Goal: Build a plugin that ingests documents from external sources (email, APIs, file systems)
Use When: You need to automatically pull documents from Salesforce, Dropbox, custom APIs, or any external system.
What Source Plugins Do
Source plugins connect to external systems, fetch documents, and yield them for the Engine to create. Unlike other plugins that process existing documents, source plugins create new documents.
Key Method: Yield DocumentInput from fetch() - the Engine handles document creation, state persistence, and credential management.
Prerequisites
Before starting:
- Install the SDK:
pip install bizsupply-sdk - Read Plugin Interface Specification
- Decide what credentials your source needs (API keys, OAuth tokens, etc.)
How Source Plugins Work
1. Engine loads credentials from secure storage -> injects as DynamicCredential
2. Engine loads your typed state model from database -> injects as state
3. Engine calls fetch(credentials, state, configs)
4. For each DocumentInput you yield, Engine creates the document
5. Engine auto-saves state after each document (crash-resilient)
Step 1: Define Your State Model
Source plugins use a typed state model to track sync progress between runs:
from bizsupply_sdk import BaseSourceState
class MySourceState(BaseSourceState):
"""State model for tracking sync progress."""
last_item_id: str | None = None
last_sync_time: str | None = None
page: int = 0The Engine loads this state automatically and saves it after each yielded document.
Step 2: Create the Plugin Code
Create my_source.py:
from datetime import datetime, timezone
from bizsupply_sdk import (
SourcePlugin,
BaseSourceState,
DocumentInput,
DynamicCredential,
)
class MySourceState(BaseSourceState):
"""State model for tracking sync progress."""
last_item_id: str | None = None
last_sync_time: str | None = None
class MySourcePlugin(SourcePlugin):
"""
Ingests documents from a custom API.
The Engine handles credentials, state, and document creation.
Just yield DocumentInput objects from fetch()!
"""
# REQUIRED: Unique identifier for this source type
source_type = "my_custom_api"
# REQUIRED: State model class for tracking sync progress
source_state_model = MySourceState
# Optional: Credentials the user must configure for this source
credential_fields = ["api_url", "api_key", "timeout_seconds"]
# Optional: Configurable parameters
configurable_parameters = [
{
"parameter_name": "max_documents",
"parameter_type": "int",
"default_value": 100,
"description": "Maximum documents to fetch per run",
},
{
"parameter_name": "batch_size",
"parameter_type": "int",
"default_value": 50,
"description": "Number of items per API request",
},
]
async def fetch(self, credentials, state, configs):
"""
Fetch documents from external source.
Args:
credentials: DynamicCredential with configured fields
state: MySourceState loaded from database
configs: Runtime configuration (max_documents, batch_size, etc.)
Yields:
DocumentInput for each document to create
"""
self.logger.info("Starting document ingestion")
# Access credentials by attribute or .get()
api_url = credentials.api_url
api_key = credentials.api_key
timeout = int(credentials.get("timeout_seconds", 30))
self.logger.info(f"Connecting to: {api_url}")
self.logger.info(f"Last synced item: {state.last_item_id}")
max_documents = int(configs.get("max_documents", 100))
# =========================================================
# YOUR FETCH LOGIC HERE
# =========================================================
# Replace with your actual API/service fetching code
# Example:
#
# import httpx
# async with httpx.AsyncClient() as client:
# response = await client.get(
# f"{api_url}/documents",
# headers={"Authorization": f"Bearer {api_key}"},
# params={"after": state.last_item_id},
# timeout=timeout,
# )
# items = response.json()["documents"]
# Placeholder - replace with real fetch logic
items = []
count = 0
for item in items:
if count >= max_documents:
break
try:
file_bytes = item.get("content") # bytes
filename = item.get("filename", "document.pdf")
# Yield DocumentInput - Engine creates the document
yield DocumentInput(
file_data=file_bytes,
filename=filename,
metadata={
"source": "my_api",
"external_id": item.get("id"),
"title": item.get("title"),
"created_at": item.get("created_at"),
},
)
# Update state - Engine auto-saves after each yield
state.last_item_id = item.get("id")
state.last_sync_time = datetime.now(timezone.utc).isoformat()
count += 1
except Exception as e:
self.logger.error(f"Failed to process item: {e}")
continue
self.logger.info(f"Ingestion complete. Yielded {count} documents")
async def has_new_data(self, credentials, state, configs):
"""
Check if source has new data since last sync.
Used by auto-sync to decide whether to trigger a full fetch.
Should be lightweight (e.g., HEAD request or count query).
"""
# Implement a lightweight check
# Example: return await check_api_for_new_items(credentials, state)
return TrueStep 3: Validate and Register
Validate your plugin before registering:
bizsupply validate my_source.pyThen register:
curl -X POST "https://api.bizsupply.com/api/v1/plugins" \
-H "Authorization: Bearer YOUR_TOKEN" \
-F "name=My API Source" \
-F "description=Ingests documents from My Custom API" \
-F "code_file=@my_source.py"When you register:
- The plugin is validated
- Plugin type, source_type, credential_fields, and other metadata are automatically extracted from your code
- If
source_typeis new, it's automatically registered - Your credential schema (
credential_fields) is stored for this source type
Step 4: Configure Source Credentials
After registration, users configure credentials through the bizsupply UI:
- Go to Settings > Sources > Add Source
- Select your source type
- Fill in the credential fields you defined
- Save
The credentials are securely stored and injected into your plugin via the credentials parameter.
Key Methods
| Method | Purpose |
|---|---|
await self.prompt_llm(prompt, file_data, mime_type) | Call LLM (if needed) |
await self.get_prompt(prompt_id) | Load prompt template (if needed) |
self.logger | Plugin-specific logger |
The Engine handles credential injection, state loading/saving, and document creation automatically.
Accessing Credentials
Credentials are injected as a DynamicCredential object. Access fields by attribute or .get():
async def fetch(self, credentials, state, configs):
# Attribute access (raises AttributeError if missing)
api_key = credentials.api_key
api_url = credentials.api_url
# .get() access (returns None or default if missing)
timeout = credentials.get("timeout_seconds", 30)
# Validate required fields early
credentials.validate_required_fields(["api_key", "api_url"])
# Check if a field exists
if credentials.has_field("refresh_token"):
# Handle OAuth refresh
...All credential values are stored as strings. Convert to other types in your plugin code as needed:
timeout = int(credentials.get("timeout_seconds", "30"))
enabled = credentials.get("enabled") == "true"State Management
State is injected as your typed model and auto-saved by the Engine after each yielded document:
class MySourceState(BaseSourceState):
cursor: str | None = None
page: int = 0
async def fetch(self, credentials, state, configs):
# State is loaded from database with previous values
self.logger.info(f"Resuming from cursor: {state.cursor}")
for item in items:
yield DocumentInput(...)
# Mutate state directly - Engine auto-saves
state.cursor = item.id
state.page += 1If the plugin crashes mid-run, the Engine resumes from the last saved state.
Document Metadata
When yielding documents, include relevant metadata:
yield DocumentInput(
file_data=file_bytes,
filename="invoice_123.pdf",
mime_type="application/pdf", # Optional: auto-detected if not provided
metadata={
"source": "my_api",
"external_id": "doc_123",
"title": "Invoice #456",
"author": "[email protected]",
"created_at": "2025-01-15",
},
)Common Mistakes
Using Old execute() Method
# WRONG - old v1.0 API
async def execute(self, context: PluginContext):
credentials = self.get_source_credentials()
state = await self.get_source_state()
for item in items:
doc = await self.create_document(file_data, filename, metadata)
await self.update_source_state(state)
return created_documents
# CORRECT - yield DocumentInput from fetch()
async def fetch(self, credentials, state, configs):
for item in items:
yield DocumentInput(file_data=..., filename=...)
state.cursor = item.id # Engine auto-savesMissing source_state_model
# WRONG - no state model defined
class MySource(SourcePlugin):
source_type = "my_api"
# Missing source_state_model!
# CORRECT
class MySource(SourcePlugin):
source_type = "my_api"
source_state_model = MySourceStateHardcoding Credentials
# WRONG - security risk
api_key = "sk-1234567890"
# CORRECT - use injected credentials
api_key = credentials.api_keyMissing SDK Import
# WRONG
class MyPlugin(SourcePlugin): # NameError!
...
# CORRECT
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential
class MyPlugin(SourcePlugin):
...Example: OAuth Source (Gmail/Outlook)
For OAuth sources like Gmail, define OAuth-specific credential fields:
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential
class GmailState(BaseSourceState):
last_message_id: str | None = None
class GmailSourcePlugin(SourcePlugin):
source_type = "gmail"
source_state_model = GmailState
credential_fields = ["access_token", "refresh_token", "token_uri"]
async def fetch(self, credentials, state, configs):
access_token = credentials.access_token
# ... use credentials to connect to Gmail API
# ... yield DocumentInput for each email/attachmentExample: Salesforce Source
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential
class SalesforceState(BaseSourceState):
last_sync_id: str | None = None
class SalesforceSourcePlugin(SourcePlugin):
"""Ingests documents from Salesforce."""
source_type = "salesforce"
source_state_model = SalesforceState
credential_fields = ["instance_url", "access_token", "api_version"]
configurable_parameters = [
{
"parameter_name": "object_type",
"parameter_type": "str",
"default_value": "ContentDocument",
"description": "Salesforce object to query",
},
]
async def fetch(self, credentials, state, configs):
instance_url = credentials.instance_url
access_token = credentials.access_token
# ... connect to Salesforce, yield DocumentInput objects
async def has_new_data(self, credentials, state, configs):
# Check Salesforce for new items since last sync
return TrueNext Steps
- Use Plugins - Execute your plugin in a pipeline
- Create a Classification Plugin - Classify ingested documents
- Plugin Service API - All available service methods
Updated 2 months ago