Create a Pipeline

Create a Pipeline

Goal: Configure and execute document processing pipelines

Prerequisites:


Overview

A pipeline combines plugins and ontologies into a reusable document processing workflow. When executed, the pipeline runs plugins in the correct order: source → classification → extraction → aggregation.

Two execution options:

  1. Pipeline Execution (recommended) - Create a reusable pipeline configuration
  2. Direct Execution - Specify plugin_ids directly (for quick testing)

Step 1: Gather Your Components

List Your Plugins

GET /api/v1/plugins
Authorization: Bearer <your-jwt-token>

Response:

{
  "count": 3,
  "plugins": [
    {
      "plugin_id": "01HQZX1A2B3C4D5E6F7G8H9J0K",
      "name": "Gmail Source Plugin",
      "plugin_type": "source"
    },
    {
      "plugin_id": "01HQZX3K4M2N5P7Q8R9S0T1U2V",
      "name": "Invoice Classifier",
      "plugin_type": "classification"
    },
    {
      "plugin_id": "01HQZX5W6X7Y8Z9A0B1C2D3E4F",
      "name": "Invoice Data Extractor",
      "plugin_type": "extraction"
    }
  ]
}

List Your Ontologies (if using extraction)

GET /api/v1/ontologies
Authorization: Bearer <your-jwt-token>

Response:

{
  "count": 1,
  "ontologies": [
    {
      "ontology_id": "01HQZY1M2N3P4Q5R6S7T8U9V0W",
      "name": "Invoice Ontology"
    }
  ]
}

Step 2: Create a Pipeline

POST /api/v1/pipelines
Authorization: Bearer <your-jwt-token>
Content-Type: application/json

{
  "name": "Invoice Processing Pipeline",
  "description": "Ingest, classify, and extract invoice data from Gmail",
  "plugin_ids": [
    "01HQZX1A2B3C4D5E6F7G8H9J0K",
    "01HQZX3K4M2N5P7Q8R9S0T1U2V",
    "01HQZX5W6X7Y8Z9A0B1C2D3E4F"
  ],
  "ontology_catalogs_ids": [
    "01HQZY1M2N3P4Q5R6S7T8U9V0W"
  ]
}

Response:

{
  "pipeline_id": "01HR001A2B3C4D5E6F7G8H9J0K",
  "name": "Invoice Processing Pipeline",
  "message": "Pipeline created successfully"
}

Step 3: Execute the Pipeline

POST /api/v1/jobs/from-pipeline
Authorization: Bearer <your-jwt-token>
Content-Type: application/json

{
  "pipeline_id": "01HR001A2B3C4D5E6F7G8H9J0K"
}

Response:

{
  "job_ids": ["01HQZZ8P9Q0R1S2T3U4V5W6X7Y"],
  "message": "Execution started for 1 job(s)"
}

Save the job_id to monitor progress.


Step 4: Monitor Job Status

Poll the status endpoint:

GET /api/v1/jobs/01HQZZ8P9Q0R1S2T3U4V5W6X7Y
Authorization: Bearer <your-jwt-token>

Running:

{
  "job_id": "01HQZZ8P9Q0R1S2T3U4V5W6X7Y",
  "status": "running",
  "current_plugin": "Invoice Classifier",
  "progress": "2/3 plugins executed",
  "started_at": "2025-10-28T10:30:00Z"
}

Completed:

{
  "job_id": "01HQZZ8P9Q0R1S2T3U4V5W6X7Y",
  "status": "completed",
  "documents_processed": 15,
  "documents_with_labels": 12,
  "documents_with_data": 12,
  "execution_time_seconds": 45.2,
  "started_at": "2025-10-28T10:30:00Z",
  "completed_at": "2025-10-28T10:30:45Z"
}

Failed:

{
  "job_id": "01HQZZ8P9Q0R1S2T3U4V5W6X7Y",
  "status": "failed",
  "error": "Plugin 'Invoice Classifier' failed: LLM API rate limit exceeded",
  "failed_plugin": "Invoice Classifier",
  "started_at": "2025-10-28T10:30:00Z",
  "failed_at": "2025-10-28T10:30:23Z"
}

Step 5: Retrieve Processed Documents

Get all documents:

GET /api/v1/documents
Authorization: Bearer <your-jwt-token>

Filter by labels:

GET /api/v1/documents?labels=invoice
Authorization: Bearer <your-jwt-token>

Filter by date range:

GET /api/v1/documents?labels=invoice&start_date=2025-10-01&end_date=2025-10-31
Authorization: Bearer <your-jwt-token>

Response:

{
  "count": 12,
  "documents": [
    {
      "id": "01HR001A2B3C4D5E6F7G8H9J0K",
      "metadata": {
        "source": "gmail",
        "subject": "Invoice #12345",
        "sender": "[email protected]"
      },
      "labels": ["invoice"],
      "data": {
        "invoice_total": 1500.00,
        "invoice_date": "2025-10-15",
        "invoice_number": "INV-12345",
        "vendor_name": "Acme Corp"
      }
    }
  ]
}

Plugin Execution Order

Plugins execute in this order, regardless of how you list them:

OrderTypePurpose
1SourceIngest documents from external sources
2ClassificationAnalyze content, add labels
3ExtractionExtract structured data
4AggregationLink related documents

Example flow:

1. Gmail Source Plugin (source)
2. Invoice Classifier (classification)
3. Invoice Extractor (extraction)

Pre-Conditions

Plugins can define conditions that filter which documents they process:

spec:
  type: "extraction"
  pre_conditions:
    - key: "document.labels"
      operator: "contains"
      value: "invoice"

This extraction plugin only runs on documents labeled "invoice".

Operators:

  • contains - Array contains value
  • equals - Exact match
  • in - Value in list
  • greater_than, less_than - Numeric comparison

Direct Execution (Alternative)

For quick testing without creating a pipeline:

POST /api/v1/jobs
Authorization: Bearer <your-jwt-token>
Content-Type: application/json

{
  "plugin_ids": [
    "01HQZX1A2B3C4D5E6F7G8H9J0K",
    "01HQZX3K4M2N5P7Q8R9S0T1U2V",
    "01HQZX5W6X7Y8Z9A0B1C2D3E4F"
  ],
  "ontology_catalogs_ids": [
    "01HQZY1M2N3P4Q5R6S7T8U9V0W"
  ],
  "source_ids": ["[email protected]"]
}

Pipeline Management

List Pipelines

GET /api/v1/pipelines
Authorization: Bearer <your-jwt-token>

Get Pipeline Details

GET /api/v1/pipelines/{pipeline_id}
Authorization: Bearer <your-jwt-token>

Update Pipeline

PUT /api/v1/pipelines/{pipeline_id}
Authorization: Bearer <your-jwt-token>
Content-Type: application/json

{
  "name": "Updated Invoice Pipeline",
  "plugin_ids": [...],
  "ontology_catalogs_ids": [...]
}

Delete Pipeline

DELETE /api/v1/pipelines/{pipeline_id}
Authorization: Bearer <your-jwt-token>

Job Management

List All Jobs

GET /api/v1/jobs
Authorization: Bearer <your-jwt-token>

Cancel a Running Job

POST /api/v1/jobs/{job_id}/cancel
Authorization: Bearer <your-jwt-token>

Common Issues

Job Stuck in "pending"

Cause: System at maximum capacity

Solution: Wait for other jobs to complete, or cancel unnecessary jobs

Job Failed: "Plugin not found"

Cause: Plugin ID doesn't exist or doesn't belong to your tenant

Check: GET /api/v1/plugins/{plugin_id}

Job Failed: "Credential not found"

Cause: Source plugin requires credentials that aren't configured

Solution: Contact support to configure source credentials

No Documents Processed

Check:

  1. Source plugin ran successfully
  2. Pre-conditions aren't filtering out all documents
  3. Check job logs for errors

Documents Missing Data Fields

Check:

  1. Extraction plugin ran
  2. Documents have required labels (classification ran first)
  3. Ontology fields match what extraction plugin populates

Best Practices

  1. Use Pipelines - Create pipelines for workflows you'll run repeatedly
  2. Test Incrementally - Test each plugin individually before combining
  3. Monitor Jobs - Check status regularly, don't assume success
  4. Start Small - Test with 1-5 documents before running on hundreds
  5. Use Pre-Conditions - Filter documents to avoid unnecessary processing

Next Steps