Create a Source Plugin

Create a Source Plugin

Goal: Build a plugin that ingests documents from external sources (email, APIs, file systems)

Use When: You need to automatically pull documents from Salesforce, Dropbox, custom APIs, or any external system.


What Source Plugins Do

Source plugins connect to external systems, fetch documents, and yield them for the Engine to create. Unlike other plugins that process existing documents, source plugins create new documents.

Key Method: Yield DocumentInput from fetch() - the Engine handles document creation, state persistence, and credential management.


Prerequisites

Before starting:

  • Install the SDK: pip install bizsupply-sdk
  • Read Plugin Interface Specification
  • Decide what credentials your source needs (API keys, OAuth tokens, etc.)

How Source Plugins Work

1. Engine loads credentials from secure storage -> injects as DynamicCredential
2. Engine loads your typed state model from database -> injects as state
3. Engine calls fetch(credentials, state, configs)
4. For each DocumentInput you yield, Engine creates the document
5. Engine auto-saves state after each document (crash-resilient)

Step 1: Define Your State Model

Source plugins use a typed state model to track sync progress between runs:

from bizsupply_sdk import BaseSourceState


class MySourceState(BaseSourceState):
    """State model for tracking sync progress."""
    last_item_id: str | None = None
    last_sync_time: str | None = None
    page: int = 0

The Engine loads this state automatically and saves it after each yielded document.


Step 2: Create the Plugin Code

Create my_source.py:

from datetime import datetime, timezone

from bizsupply_sdk import (
    SourcePlugin,
    BaseSourceState,
    DocumentInput,
    DynamicCredential,
)


class MySourceState(BaseSourceState):
    """State model for tracking sync progress."""
    last_item_id: str | None = None
    last_sync_time: str | None = None


class MySourcePlugin(SourcePlugin):
    """
    Ingests documents from a custom API.

    The Engine handles credentials, state, and document creation.
    Just yield DocumentInput objects from fetch()!
    """

    # REQUIRED: Unique identifier for this source type
    source_type = "my_custom_api"

    # REQUIRED: State model class for tracking sync progress
    source_state_model = MySourceState

    # Optional: Credentials the user must configure for this source
    credential_fields = ["api_url", "api_key", "timeout_seconds"]

    # Optional: Configurable parameters
    configurable_parameters = [
        {
            "parameter_name": "max_documents",
            "parameter_type": "int",
            "default_value": 100,
            "description": "Maximum documents to fetch per run",
        },
        {
            "parameter_name": "batch_size",
            "parameter_type": "int",
            "default_value": 50,
            "description": "Number of items per API request",
        },
    ]

    async def fetch(self, credentials, state, configs):
        """
        Fetch documents from external source.

        Args:
            credentials: DynamicCredential with configured fields
            state: MySourceState loaded from database
            configs: Runtime configuration (max_documents, batch_size, etc.)

        Yields:
            DocumentInput for each document to create
        """
        self.logger.info("Starting document ingestion")

        # Access credentials by attribute or .get()
        api_url = credentials.api_url
        api_key = credentials.api_key
        timeout = int(credentials.get("timeout_seconds", 30))

        self.logger.info(f"Connecting to: {api_url}")
        self.logger.info(f"Last synced item: {state.last_item_id}")

        max_documents = int(configs.get("max_documents", 100))

        # =========================================================
        # YOUR FETCH LOGIC HERE
        # =========================================================
        # Replace with your actual API/service fetching code
        # Example:
        #
        # import httpx
        # async with httpx.AsyncClient() as client:
        #     response = await client.get(
        #         f"{api_url}/documents",
        #         headers={"Authorization": f"Bearer {api_key}"},
        #         params={"after": state.last_item_id},
        #         timeout=timeout,
        #     )
        #     items = response.json()["documents"]

        # Placeholder - replace with real fetch logic
        items = []

        count = 0
        for item in items:
            if count >= max_documents:
                break

            try:
                file_bytes = item.get("content")  # bytes
                filename = item.get("filename", "document.pdf")

                # Yield DocumentInput - Engine creates the document
                yield DocumentInput(
                    file_data=file_bytes,
                    filename=filename,
                    metadata={
                        "source": "my_api",
                        "external_id": item.get("id"),
                        "title": item.get("title"),
                        "created_at": item.get("created_at"),
                    },
                )

                # Update state - Engine auto-saves after each yield
                state.last_item_id = item.get("id")
                state.last_sync_time = datetime.now(timezone.utc).isoformat()
                count += 1

            except Exception as e:
                self.logger.error(f"Failed to process item: {e}")
                continue

        self.logger.info(f"Ingestion complete. Yielded {count} documents")

    async def has_new_data(self, credentials, state, configs):
        """
        Check if source has new data since last sync.

        Used by auto-sync to decide whether to trigger a full fetch.
        Should be lightweight (e.g., HEAD request or count query).
        """
        # Implement a lightweight check
        # Example: return await check_api_for_new_items(credentials, state)
        return True

Step 3: Validate and Register

Validate your plugin before registering:

bizsupply validate my_source.py

Then register:

curl -X POST "https://api.bizsupply.com/api/v1/plugins" \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -F "name=My API Source" \
  -F "description=Ingests documents from My Custom API" \
  -F "code_file=@my_source.py"

When you register:

  1. The plugin is validated
  2. Plugin type, source_type, credential_fields, and other metadata are automatically extracted from your code
  3. If source_type is new, it's automatically registered
  4. Your credential schema (credential_fields) is stored for this source type

Step 4: Configure Source Credentials

After registration, users configure credentials through the bizsupply UI:

  1. Go to Settings > Sources > Add Source
  2. Select your source type
  3. Fill in the credential fields you defined
  4. Save

The credentials are securely stored and injected into your plugin via the credentials parameter.


Key Methods

MethodPurpose
await self.prompt_llm(prompt, file_data, mime_type)Call LLM (if needed)
await self.get_prompt(prompt_id)Load prompt template (if needed)
self.loggerPlugin-specific logger

The Engine handles credential injection, state loading/saving, and document creation automatically.


Accessing Credentials

Credentials are injected as a DynamicCredential object. Access fields by attribute or .get():

async def fetch(self, credentials, state, configs):
    # Attribute access (raises AttributeError if missing)
    api_key = credentials.api_key
    api_url = credentials.api_url

    # .get() access (returns None or default if missing)
    timeout = credentials.get("timeout_seconds", 30)

    # Validate required fields early
    credentials.validate_required_fields(["api_key", "api_url"])

    # Check if a field exists
    if credentials.has_field("refresh_token"):
        # Handle OAuth refresh
        ...

All credential values are stored as strings. Convert to other types in your plugin code as needed:

timeout = int(credentials.get("timeout_seconds", "30"))
enabled = credentials.get("enabled") == "true"

State Management

State is injected as your typed model and auto-saved by the Engine after each yielded document:

class MySourceState(BaseSourceState):
    cursor: str | None = None
    page: int = 0

async def fetch(self, credentials, state, configs):
    # State is loaded from database with previous values
    self.logger.info(f"Resuming from cursor: {state.cursor}")

    for item in items:
        yield DocumentInput(...)

        # Mutate state directly - Engine auto-saves
        state.cursor = item.id
        state.page += 1

If the plugin crashes mid-run, the Engine resumes from the last saved state.


Document Metadata

When yielding documents, include relevant metadata:

yield DocumentInput(
    file_data=file_bytes,
    filename="invoice_123.pdf",
    mime_type="application/pdf",  # Optional: auto-detected if not provided
    metadata={
        "source": "my_api",
        "external_id": "doc_123",
        "title": "Invoice #456",
        "author": "[email protected]",
        "created_at": "2025-01-15",
    },
)

Common Mistakes

Using Old execute() Method

# WRONG - old v1.0 API
async def execute(self, context: PluginContext):
    credentials = self.get_source_credentials()
    state = await self.get_source_state()
    for item in items:
        doc = await self.create_document(file_data, filename, metadata)
    await self.update_source_state(state)
    return created_documents

# CORRECT - yield DocumentInput from fetch()
async def fetch(self, credentials, state, configs):
    for item in items:
        yield DocumentInput(file_data=..., filename=...)
        state.cursor = item.id  # Engine auto-saves

Missing source_state_model

# WRONG - no state model defined
class MySource(SourcePlugin):
    source_type = "my_api"
    # Missing source_state_model!

# CORRECT
class MySource(SourcePlugin):
    source_type = "my_api"
    source_state_model = MySourceState

Hardcoding Credentials

# WRONG - security risk
api_key = "sk-1234567890"

# CORRECT - use injected credentials
api_key = credentials.api_key

Missing SDK Import

# WRONG
class MyPlugin(SourcePlugin):  # NameError!
    ...

# CORRECT
from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential

class MyPlugin(SourcePlugin):
    ...

Example: OAuth Source (Gmail/Outlook)

For OAuth sources like Gmail, define OAuth-specific credential fields:

from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential


class GmailState(BaseSourceState):
    last_message_id: str | None = None


class GmailSourcePlugin(SourcePlugin):
    source_type = "gmail"
    source_state_model = GmailState
    credential_fields = ["access_token", "refresh_token", "token_uri"]

    async def fetch(self, credentials, state, configs):
        access_token = credentials.access_token
        # ... use credentials to connect to Gmail API
        # ... yield DocumentInput for each email/attachment

Example: Salesforce Source

from bizsupply_sdk import SourcePlugin, BaseSourceState, DocumentInput, DynamicCredential


class SalesforceState(BaseSourceState):
    last_sync_id: str | None = None


class SalesforceSourcePlugin(SourcePlugin):
    """Ingests documents from Salesforce."""

    source_type = "salesforce"
    source_state_model = SalesforceState
    credential_fields = ["instance_url", "access_token", "api_version"]

    configurable_parameters = [
        {
            "parameter_name": "object_type",
            "parameter_type": "str",
            "default_value": "ContentDocument",
            "description": "Salesforce object to query",
        },
    ]

    async def fetch(self, credentials, state, configs):
        instance_url = credentials.instance_url
        access_token = credentials.access_token
        # ... connect to Salesforce, yield DocumentInput objects

    async def has_new_data(self, credentials, state, configs):
        # Check Salesforce for new items since last sync
        return True

Next Steps