Skip to content

Pipeline Code Development

This guide explains how to implement custom pipes using the current parameter validation pattern.

Atomic processing units that implement specific business logic:

- id: fetch_tickets
use: open_ticket_ai.base:FetchTicketsPipe
injects:
ticket_system: 'otobo_znuny'
params:
search_criteria:
queue:
name: 'Support'
limit: 10

Characteristics:

  • Runs specific logic
  • No child pipes

Orchestrators that contain and execute child pipes:

Composite Pipe Example
- id: ticket_workflow
use: open_ticket_ai.base:CompositePipe
params:
threshold: 0.8
steps:
- id: fetch
use: open_ticket_ai.base:FetchTicketsPipe
injects: { ticket_system: 'otobo_znuny' }
params:
search_criteria:
queue: { name: 'Incoming' }
limit: 10
- id: classify
use: otai_hf_local:HFLocalTextClassificationPipe
params:
model: 'bert-base-german-cased'
text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"
depends_on: [fetch]
- id: update
use: open_ticket_ai.base:UpdateTicketPipe
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify').data.predicted_queue }}"
depends_on: [classify]

Characteristics:

  • Contains steps list of child pipe configs
  • Uses PipeFactory to build child pipes
  • Executes children sequentially
  • Merges results via PipeResult.union()
  • Children can access parent params via parent.params

Composite Execution:

  1. Initialization: Prepare to iterate through steps list
  2. For Each Step:
    • Merge: Combine parent params with step params (step overrides)
    • Build: Use factory to create child pipe instance
    • Execute: Call child.process(context) → updates context
    • Collect: Child result stored in context.pipes[child_id]
    • Loop: Continue to next step
  3. Finalization:
    • Union: Merge all child results using PipeResult.union()
    • Save: Store composite result in context
    • Return: Return final updated context

Field Details:

  • pipes: Contains results from all previously executed pipes, keyed by pipe ID

    • Accumulated as each pipe completes
    • In CompositePipe: merged results from all child steps
    • Access via pipe_result('pipe_id') in templates
  • params: Current pipe’s parameters

    • Set when the pipe is created
    • Accessible via params.* in templates
    • For nested pipes, can reference parent via parent.params
  • parent: Reference to parent context (if inside a CompositePipe)

    • Allows access to parent scope variables
    • Creates hierarchical context chain
    • Can traverse multiple levels (parent.parent...)

Small, focused steps. Examples:

  • AddNotePiperegistryKey: base:AddNotePipe
  • FetchTicketsPiperegistryKey: base:FetchTicketsPipe
  • UpdateTicketPiperegistryKey: base:UpdateTicketPipe
- id: fetch_tickets
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria:
queue: { name: 'Support' }
limit: 10

Renders an expression and returns that value. If it renders to a FailMarker, the pipe fails. registryKey: base:ExpressionPipe

- id: check_any_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}

Run several child pipes in order and return the union of their results. registryKey: base:CompositePipe

- id: ticket_flow
use: 'base:CompositePipe'
params:
steps:
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 10 }
- id: pick_first
use: 'base:ExpressionPipe'
params:
expression: "{{ get_pipe_result('fetch','fetched_tickets')[0] }}"
- id: classify
use: 'base:ClassificationPipe'
injects: { classification_service: 'hf_local' }
params:
text: "{{ get_pipe_result('pick_first')['subject'] }} {{ get_pipe_result('pick_first')['body'] }}"
model_name: 'softoft/otai-queue-de-bert-v1'
- id: update
use: 'base:UpdateTicketPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('pick_first')['id'] }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify','label') if get_pipe_result('classify','confidence') >= 0.8 else 'OpenTicketAI::Unclassified' }}"

How it behaves (non-technical):

  • Runs children one by one
  • Stops on first failure
  • Returns a merged result of everything that succeeded Here you go — tiny + simple.

Runs its steps in an endless loop. It’s for background-style cycles. It does not expose the child pipes’ results as a single pipe result. registryKey: base:SimpleSequentialOrchestrator

- id: orchestrator
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: tick
use: 'base:IntervalTrigger'
params: { interval: 'PT5S' }
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }

Has two params: on and run (both are pipe configs). If on succeeds, it executes run; otherwise it skips. registryKey: base:SimpleSequentialRunner

- id: run-when-triggered
use: 'base:SimpleSequentialRunner'
params:
on:
id: gate
use: 'base:IntervalTrigger'
params: { interval: 'PT60S' }
run:
id: do-something
use: 'base:ExpressionPipe'
params: { expression: 'Triggered run' }

  • registryKey = what you put into use, e.g. use: "base:FetchTicketsPipe".
  • Accessing parent params: Use parent for the direct parent’s params only (no multi-level chains).

If you want, I’ll convert this into a VitePress page with the same structure.


Create a Pydantic model for your pipe’s parameters:

from pydantic import BaseModel
class MyPipeParams(BaseModel):
input_field: str
threshold: float = 0.5
max_items: int = 100

Create a model for your pipe’s output:

class MyPipeResultData(BaseModel):
processed_items: list[str]
count: int
from typing import Any
from open_ticket_ai.core.pipes.pipe import Pipe
from open_ticket_ai.core.pipes.pipe_models import PipeConfig, PipeResult
from open_ticket_ai.core.logging.logging_iface import LoggerFactory
class MyPipe(Pipe[MyPipeParams]):
params_class = MyPipeParams # Required class attribute
def __init__(
self,
pipe_config: PipeConfig[MyPipeParams],
logger_factory: LoggerFactory,
# Add injected services here
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
# self.params is now a validated MyPipeParams instance
async def _process(self) -> PipeResult[MyPipeResultData]:
# Access validated parameters
input_val = self.params.input_field
threshold = self.params.threshold
# Your processing logic here
items = self._do_processing(input_val, threshold)
# Return result
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(
processed_items=items,
count=len(items)
)
)
def _do_processing(self, input_val: str, threshold: float) -> list[str]:
# Implementation details
return []

The parameter validation happens automatically in the Pipe base class:

# In Pipe.__init__ (src/open_ticket_ai/core/pipes/pipe.py:27-30)
if isinstance(pipe_params._config, dict):
self._config: ParamsT = self.params_class.model_validate(pipe_params._config)
else:
self._config: ParamsT = pipe_params._config

Flow:

  1. YAML config loaded and templates rendered → produces dict[str, Any]
  2. Dict passed to Pipe constructor as pipe_config.params
  3. Base class checks if params is a dict
  4. If dict: validates using params_class.model_validate()
  5. If already typed: uses as-is
  6. Result: self.params is always the validated Pydantic model

Users write YAML with templates:

- id: my_custom_pipe
use: 'mypackage:MyPipe'
params:
input_field: "{{ pipe_result('previous_step').data.output }}"
threshold: "{{ env('THRESHOLD', '0.5') }}"
max_items: 50

What happens:

  1. Templates rendered: input_field gets value from previous pipe, threshold from env
  2. Results in dict: {"input_field": "some_value", "threshold": "0.5", "max_items": 50}
  3. Passed to MyPipe.__init__
  4. Validated to MyPipeParams: types coerced (threshold: str → float)
  5. Available as self.params.threshold (float 0.5)

Add service dependencies in the __init__ signature:

from packages.base.src.otai_base.ticket_system_integration import TicketSystemService
class FetchTicketsPipe(Pipe[FetchTicketsParams]):
params_class = FetchTicketsParams
def __init__(
self,
ticket_system: TicketSystemService, # Injected automatically
pipe_config: PipeConfig[FetchTicketsParams],
logger_factory: LoggerFactory,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
self.ticket_system = ticket_system
async def _process(self) -> PipeResult[FetchTicketsPipeResultData]:
# Use injected service
tickets = await self.ticket_system.find_tickets(...)
return PipeResult[FetchTicketsPipeResultData](...)

YAML config for service injection:

- id: fetch_tickets
use: 'mypackage:FetchTicketsPipe'
injects:
ticket_system: 'otobo_system' # References a service by ID
params:
limit: 100

The base Pipe class handles errors automatically, but you can also handle specific cases:

async def _process(self) -> PipeResult[MyPipeResultData]:
try:
result = await self._risky_operation()
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(...)
)
except SpecificError as e:
self._logger.warning(f"Handled specific error: {e}")
return PipeResult[MyPipeResultData](
success=False,
failed=True,
message=f"Operation failed: {e}",
data=MyPipeResultData(processed_items=[], count=0)
)

Note: Unhandled exceptions are caught by the base class and result in a failed PipeResult.

import pytest
from open_ticket_ai.core.pipes.pipe_context_model import PipeContext
from open_ticket_ai.core.pipes.pipe_models import PipeConfig
@pytest.mark.asyncio
async def test_my_pipe_processes_correctly(logger_factory):
# Create params as dict (simulates YAML rendering)
params = {
"input_field": "test_value",
"threshold": 0.7,
"max_items": 10
}
# Create pipe config
config = PipeConfig[MyPipeParams](
id="test_pipe",
params=params
)
# Instantiate pipe
pipe = MyPipe(pipe_config=config, logger_factory=logger_factory)
# Execute
context = PipeContext()
result_context = await pipe.process(context)
# Assert
assert "test_pipe" in result_context.pipe_results
assert result_context.pipe_results["test_pipe"].succeeded
assert result_context.pipe_results["test_pipe"].data.count > 0
async def _process(self) -> PipeResult[MyPipeResultData]:
# Access via pipe_config context (if needed)
# Usually accessed via templates in YAML, but can also be done in code
# Use self.params which were set from templates
input_data = self._config.input_field # Already resolved from template
return PipeResult[MyPipeResultData](...)

Use the if field in YAML config:

- id: conditional_pipe
use: 'mypackage:MyPipe'
if: "{{ pipe_result('classifier').data.category == 'urgent' }}"
params:
# ...

Use the depends_on field:

- id: step2
use: 'mypackage:Step2Pipe'
depends_on:
- step1
params:
input: "{{ pipe_result('step1').data.output }}"

DO:

  • ✅ Always define params_class as a class attribute
  • ✅ Let parent __init__ handle parameter validation
  • ✅ Use descriptive parameter names
  • ✅ Provide sensible defaults in params model
  • ✅ Return clear error messages in PipeResult
  • ✅ Log important steps and decisions
  • ✅ Keep _process() focused and testable

DON’T:

  • ❌ Don’t manually call model_validate() in your __init__
  • ❌ Don’t bypass the params_class mechanism
  • ❌ Don’t put heavy logic in __init__
  • ❌ Don’t catch and hide all exceptions
  • ❌ Don’t access unvalidated pipe_config.params directly
  • ❌ Don’t forget to call super().__init__()