Zum Inhalt springen

Pipeline Code Development

Dieser Inhalt ist noch nicht in deiner Sprache verfügbar.

This guide explains how to implement custom pipes using the current parameter validation pattern.

Atomic processing units that implement specific business logic:

- id: fetch_tickets
use: open_ticket_ai.base:FetchTicketsPipe
injects:
ticket_system: 'otobo_znuny'
params:
search_criteria:
queue:
name: 'Support'
limit: 10

Characteristics:

  • Runs specific logic
  • No child pipes

Orchestrators that contain and execute child pipes:

Composite Pipe Example
- id: ticket_workflow
use: open_ticket_ai.base:CompositePipe
params:
threshold: 0.8
steps:
- id: fetch
use: open_ticket_ai.base:FetchTicketsPipe
injects: { ticket_system: 'otobo_znuny' }
params:
search_criteria:
queue: { name: 'Incoming' }
limit: 10
- id: classify
use: otai_hf_local:HFLocalTextClassificationPipe
params:
model: 'bert-base-german-cased'
text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"
depends_on: [fetch]
- id: update
use: open_ticket_ai.base:UpdateTicketPipe
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify').data.predicted_queue }}"
depends_on: [classify]

Characteristics:

  • Contains steps list of child pipe configs
  • Uses PipeFactory to build child pipes
  • Executes children sequentially
  • Merges results via PipeResult.union()
  • Children can access parent params via parent.params

Composite Execution:

  1. Initialization: Prepare to iterate through steps list
  2. For Each Step:
    • Merge: Combine parent params with step params (step overrides)
    • Build: Use factory to create child pipe instance
    • Execute: Call child.process(context) → updates context
    • Collect: Child result stored in context.pipes[child_id]
    • Loop: Continue to next step
  3. Finalization:
    • Union: Merge all child results using PipeResult.union()
    • Save: Store composite result in context
    • Return: Return final updated context

Field Details:

  • pipes: Contains results from all previously executed pipes, keyed by pipe ID

    • Accumulated as each pipe completes
    • In CompositePipe: merged results from all child steps
    • Access via pipe_result('pipe_id') in templates
  • params: Current pipe’s parameters

    • Set when the pipe is created
    • Accessible via params.* in templates
    • For nested pipes, can reference parent via parent.params
  • parent: Reference to parent context (if inside a CompositePipe)

    • Allows access to parent scope variables
    • Creates hierarchical context chain
    • Can traverse multiple levels (parent.parent...)

Small, focused steps. Examples:

  • AddNotePiperegistryKey: base:AddNotePipe
  • FetchTicketsPiperegistryKey: base:FetchTicketsPipe
  • UpdateTicketPiperegistryKey: base:UpdateTicketPipe
- id: fetch_tickets
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria:
queue: { name: 'Support' }
limit: 10

Renders an expression and returns that value. If it renders to a FailMarker, the pipe fails. registryKey: base:ExpressionPipe

- id: check_any_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}

Run several child pipes in order and return the union of their results. registryKey: base:CompositePipe

- id: ticket_flow
use: 'base:CompositePipe'
params:
steps:
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 10 }
- id: pick_first
use: 'base:ExpressionPipe'
params:
expression: "{{ get_pipe_result('fetch','fetched_tickets')[0] }}"
- id: classify
use: 'base:ClassificationPipe'
injects: { classification_service: 'hf_local' }
params:
text: "{{ get_pipe_result('pick_first')['subject'] }} {{ get_pipe_result('pick_first')['body'] }}"
model_name: 'softoft/otai-queue-de-bert-v1'
- id: update
use: 'base:UpdateTicketPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('pick_first')['id'] }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify','label') if get_pipe_result('classify','confidence') >= 0.8 else 'OpenTicketAI::Unclassified' }}"

How it behaves (non-technical):

  • Runs children one by one
  • Stops on first failure
  • Returns a merged result of everything that succeeded Here you go — tiny + simple.

Runs its steps in an endless loop. It’s for background-style cycles. It does not expose the child pipes’ results as a single pipe result. registryKey: base:SimpleSequentialOrchestrator

- id: orchestrator
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: tick
use: 'base:IntervalTrigger'
params: { interval: 'PT5S' }
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }

Has two params: on and run (both are pipe configs). If on succeeds, it executes run; otherwise it skips. registryKey: base:SimpleSequentialRunner

- id: run-when-triggered
use: 'base:SimpleSequentialRunner'
params:
on:
id: gate
use: 'base:IntervalTrigger'
params: { interval: 'PT60S' }
run:
id: do-something
use: 'base:ExpressionPipe'
params: { expression: 'Triggered run' }

  • registryKey = what you put into use, e.g. use: "base:FetchTicketsPipe".
  • Accessing parent params: Use parent for the direct parent’s params only (no multi-level chains).

If you want, I’ll convert this into a VitePress page with the same structure.


Create a Pydantic model for your pipe’s parameters:

from pydantic import BaseModel
class MyPipeParams(BaseModel):
input_field: str
threshold: float = 0.5
max_items: int = 100

Create a model for your pipe’s output:

class MyPipeResultData(BaseModel):
processed_items: list[str]
count: int
from typing import Any
from open_ticket_ai.core.pipes.pipe import Pipe
from open_ticket_ai.core.pipes.pipe_models import PipeConfig, PipeResult
from open_ticket_ai.core.logging.logging_iface import LoggerFactory
class MyPipe(Pipe[MyPipeParams]):
params_class = MyPipeParams # Required class attribute
def __init__(
self,
pipe_config: PipeConfig[MyPipeParams],
logger_factory: LoggerFactory,
# Add injected services here
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
# self.params is now a validated MyPipeParams instance
async def _process(self) -> PipeResult[MyPipeResultData]:
# Access validated parameters
input_val = self.params.input_field
threshold = self.params.threshold
# Your processing logic here
items = self._do_processing(input_val, threshold)
# Return result
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(
processed_items=items,
count=len(items)
)
)
def _do_processing(self, input_val: str, threshold: float) -> list[str]:
# Implementation details
return []

The parameter validation happens automatically in the Pipe base class:

# In Pipe.__init__ (src/open_ticket_ai/core/pipes/pipe.py:27-30)
if isinstance(pipe_params._config, dict):
self._config: ParamsT = self.params_class.model_validate(pipe_params._config)
else:
self._config: ParamsT = pipe_params._config

Flow:

  1. YAML config loaded and templates rendered → produces dict[str, Any]
  2. Dict passed to Pipe constructor as pipe_config.params
  3. Base class checks if params is a dict
  4. If dict: validates using params_class.model_validate()
  5. If already typed: uses as-is
  6. Result: self.params is always the validated Pydantic model

Users write YAML with templates:

- id: my_custom_pipe
use: 'mypackage:MyPipe'
params:
input_field: "{{ pipe_result('previous_step').data.output }}"
threshold: "{{ env('THRESHOLD', '0.5') }}"
max_items: 50

What happens:

  1. Templates rendered: input_field gets value from previous pipe, threshold from env
  2. Results in dict: {"input_field": "some_value", "threshold": "0.5", "max_items": 50}
  3. Passed to MyPipe.__init__
  4. Validated to MyPipeParams: types coerced (threshold: str → float)
  5. Available as self.params.threshold (float 0.5)

Add service dependencies in the __init__ signature:

from packages.base.src.otai_base.ticket_system_integration import TicketSystemService
class FetchTicketsPipe(Pipe[FetchTicketsParams]):
params_class = FetchTicketsParams
def __init__(
self,
ticket_system: TicketSystemService, # Injected automatically
pipe_config: PipeConfig[FetchTicketsParams],
logger_factory: LoggerFactory,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
self.ticket_system = ticket_system
async def _process(self) -> PipeResult[FetchTicketsPipeResultData]:
# Use injected service
tickets = await self.ticket_system.find_tickets(...)
return PipeResult[FetchTicketsPipeResultData](...)

YAML config for service injection:

- id: fetch_tickets
use: 'mypackage:FetchTicketsPipe'
injects:
ticket_system: 'otobo_system' # References a service by ID
params:
limit: 100

The base Pipe class handles errors automatically, but you can also handle specific cases:

async def _process(self) -> PipeResult[MyPipeResultData]:
try:
result = await self._risky_operation()
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(...)
)
except SpecificError as e:
self._logger.warning(f"Handled specific error: {e}")
return PipeResult[MyPipeResultData](
success=False,
failed=True,
message=f"Operation failed: {e}",
data=MyPipeResultData(processed_items=[], count=0)
)

Note: Unhandled exceptions are caught by the base class and result in a failed PipeResult.

import pytest
from open_ticket_ai.core.pipes.pipe_context_model import PipeContext
from open_ticket_ai.core.pipes.pipe_models import PipeConfig
@pytest.mark.asyncio
async def test_my_pipe_processes_correctly(logger_factory):
# Create params as dict (simulates YAML rendering)
params = {
"input_field": "test_value",
"threshold": 0.7,
"max_items": 10
}
# Create pipe config
config = PipeConfig[MyPipeParams](
id="test_pipe",
params=params
)
# Instantiate pipe
pipe = MyPipe(pipe_config=config, logger_factory=logger_factory)
# Execute
context = PipeContext()
result_context = await pipe.process(context)
# Assert
assert "test_pipe" in result_context.pipe_results
assert result_context.pipe_results["test_pipe"].succeeded
assert result_context.pipe_results["test_pipe"].data.count > 0
async def _process(self) -> PipeResult[MyPipeResultData]:
# Access via pipe_config context (if needed)
# Usually accessed via templates in YAML, but can also be done in code
# Use self.params which were set from templates
input_data = self._config.input_field # Already resolved from template
return PipeResult[MyPipeResultData](...)

Use the if field in YAML config:

- id: conditional_pipe
use: 'mypackage:MyPipe'
if: "{{ pipe_result('classifier').data.category == 'urgent' }}"
params:
# ...

Use the depends_on field:

- id: step2
use: 'mypackage:Step2Pipe'
depends_on:
- step1
params:
input: "{{ pipe_result('step1').data.output }}"

DO:

  • ✅ Always define params_class as a class attribute
  • ✅ Let parent __init__ handle parameter validation
  • ✅ Use descriptive parameter names
  • ✅ Provide sensible defaults in params model
  • ✅ Return clear error messages in PipeResult
  • ✅ Log important steps and decisions
  • ✅ Keep _process() focused and testable

DON’T:

  • ❌ Don’t manually call model_validate() in your __init__
  • ❌ Don’t bypass the params_class mechanism
  • ❌ Don’t put heavy logic in __init__
  • ❌ Don’t catch and hide all exceptions
  • ❌ Don’t access unvalidated pipe_config.params directly
  • ❌ Don’t forget to call super().__init__()