Pipeline Code Development
Pipeline Code Development
Section titled “Pipeline Code Development”This guide explains how to implement custom pipes using the current parameter validation pattern.
Pipe Types
Section titled “Pipe Types”Simple Pipes
Section titled “Simple Pipes”Atomic processing units that implement specific business logic:
- id: fetch_tickets use: open_ticket_ai.base:FetchTicketsPipe injects: ticket_system: 'otobo_znuny' params: search_criteria: queue: name: 'Support' limit: 10Characteristics:
- Runs specific logic
- No child pipes
Composite Pipes
Section titled “Composite Pipes”Orchestrators that contain and execute child pipes:
Composite Pipe Example
- id: ticket_workflow use: open_ticket_ai.base:CompositePipe params: threshold: 0.8 steps: - id: fetch use: open_ticket_ai.base:FetchTicketsPipe injects: { ticket_system: 'otobo_znuny' } params: search_criteria: queue: { name: 'Incoming' } limit: 10
- id: classify use: otai_hf_local:HFLocalTextClassificationPipe params: model: 'bert-base-german-cased' text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}" depends_on: [fetch]
- id: update use: open_ticket_ai.base:UpdateTicketPipe injects: { ticket_system: 'otobo_znuny' } params: ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}" updated_ticket: queue: name: "{{ get_pipe_result('classify').data.predicted_queue }}" depends_on: [classify]Characteristics:
- Contains
stepslist of child pipe configs - Uses
PipeFactoryto build child pipes - Executes children sequentially
- Merges results via
PipeResult.union() - Children can access parent params via
parent.params
Composite Execution:
- Initialization: Prepare to iterate through
stepslist - For Each Step:
- Merge: Combine parent params with step params (step overrides)
- Build: Use factory to create child pipe instance
- Execute: Call
child.process(context)→ updates context - Collect: Child result stored in
context.pipes[child_id] - Loop: Continue to next step
- Finalization:
- Union: Merge all child results using
PipeResult.union() - Save: Store composite result in context
- Return: Return final updated context
- Union: Merge all child results using
Field Details:
-
pipes: Contains results from all previously executed pipes, keyed by pipe ID- Accumulated as each pipe completes
- In CompositePipe: merged results from all child steps
- Access via
pipe_result('pipe_id')in templates
-
params: Current pipe’s parameters- Set when the pipe is created
- Accessible via
params.*in templates - For nested pipes, can reference parent via
parent.params
-
parent: Reference to parent context (if inside a CompositePipe)- Allows access to parent scope variables
- Creates hierarchical context chain
- Can traverse multiple levels (
parent.parent...)
Pipe Types (Simple Guide)
Section titled “Pipe Types (Simple Guide)”Simple Pipes
Section titled “Simple Pipes”Small, focused steps. Examples:
- AddNotePipe —
registryKey: base:AddNotePipe - FetchTicketsPipe —
registryKey: base:FetchTicketsPipe - UpdateTicketPipe —
registryKey: base:UpdateTicketPipe
- id: fetch_tickets use: 'base:FetchTicketsPipe' injects: { ticket_system: 'otobo_znuny' } params: ticket_search_criteria: queue: { name: 'Support' } limit: 10Expression Pipe (special)
Section titled “Expression Pipe (special)”Renders an expression and returns that value. If it renders to a FailMarker, the pipe fails.
registryKey: base:ExpressionPipe
- id: check_any_tickets use: 'base:ExpressionPipe' params: expression: > {{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}Composite Pipes
Section titled “Composite Pipes”Run several child pipes in order and return the union of their results.
registryKey: base:CompositePipe
- id: ticket_flow use: 'base:CompositePipe' params: steps: - id: fetch use: 'base:FetchTicketsPipe' injects: { ticket_system: 'otobo_znuny' } params: ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 10 }
- id: pick_first use: 'base:ExpressionPipe' params: expression: "{{ get_pipe_result('fetch','fetched_tickets')[0] }}"
- id: classify use: 'base:ClassificationPipe' injects: { classification_service: 'hf_local' } params: text: "{{ get_pipe_result('pick_first')['subject'] }} {{ get_pipe_result('pick_first')['body'] }}" model_name: 'softoft/otai-queue-de-bert-v1'
- id: update use: 'base:UpdateTicketPipe' injects: { ticket_system: 'otobo_znuny' } params: ticket_id: "{{ get_pipe_result('pick_first')['id'] }}" updated_ticket: queue: name: "{{ get_pipe_result('classify','label') if get_pipe_result('classify','confidence') >= 0.8 else 'OpenTicketAI::Unclassified' }}"How it behaves (non-technical):
- Runs children one by one
- Stops on first failure
- Returns a merged result of everything that succeeded Here you go — tiny + simple.
SimpleSequentialOrchestrator (special)
Section titled “SimpleSequentialOrchestrator (special)”Runs its steps in an endless loop. It’s for background-style cycles. It does not expose
the child pipes’ results as a single pipe result. registryKey: base:SimpleSequentialOrchestrator
- id: orchestrator use: 'base:SimpleSequentialOrchestrator' params: orchestrator_sleep: 'PT0.5S' exception_sleep: 'PT5S' always_retry: true steps: - id: tick use: 'base:IntervalTrigger' params: { interval: 'PT5S' } - id: fetch use: 'base:FetchTicketsPipe' injects: { ticket_system: 'otobo_znuny' } params: ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }SimpleSequentialRunner (special)
Section titled “SimpleSequentialRunner (special)”Has two params: on and run (both are pipe configs). If on succeeds, it executes run;
otherwise it skips. registryKey: base:SimpleSequentialRunner
- id: run-when-triggered use: 'base:SimpleSequentialRunner' params: on: id: gate use: 'base:IntervalTrigger' params: { interval: 'PT60S' } run: id: do-something use: 'base:ExpressionPipe' params: { expression: 'Triggered run' }Short Notes
Section titled “Short Notes”- registryKey = what you put into
use, e.g.use: "base:FetchTicketsPipe". - Accessing parent params: Use
parentfor the direct parent’s params only (no multi-level chains).
If you want, I’ll convert this into a VitePress page with the same structure.
Pipe Execution Flow
Section titled “Pipe Execution Flow”Implementing a Custom Pipe
Section titled “Implementing a Custom Pipe”Step 1: Define Parameters Model
Section titled “Step 1: Define Parameters Model”Create a Pydantic model for your pipe’s parameters:
from pydantic import BaseModel
class MyPipeParams(BaseModel): input_field: str threshold: float = 0.5 max_items: int = 100Step 2: Define Result Data Model
Section titled “Step 2: Define Result Data Model”Create a model for your pipe’s output:
class MyPipeResultData(BaseModel): processed_items: list[str] count: intStep 3: Implement the Pipe Class
Section titled “Step 3: Implement the Pipe Class”from typing import Anyfrom open_ticket_ai.core.pipes.pipe import Pipefrom open_ticket_ai.core.pipes.pipe_models import PipeConfig, PipeResultfrom open_ticket_ai.core.logging.logging_iface import LoggerFactory
class MyPipe(Pipe[MyPipeParams]): params_class = MyPipeParams # Required class attribute
def __init__( self, pipe_config: PipeConfig[MyPipeParams], logger_factory: LoggerFactory, # Add injected services here *args: Any, **kwargs: Any, ) -> None: super().__init__(pipe_config, logger_factory) # self.params is now a validated MyPipeParams instance
async def _process(self) -> PipeResult[MyPipeResultData]: # Access validated parameters input_val = self.params.input_field threshold = self.params.threshold
# Your processing logic here items = self._do_processing(input_val, threshold)
# Return result return PipeResult[MyPipeResultData]( success=True, failed=False, data=MyPipeResultData( processed_items=items, count=len(items) ) )
def _do_processing(self, input_val: str, threshold: float) -> list[str]: # Implementation details return []Parameter Validation Pattern
Section titled “Parameter Validation Pattern”How It Works
Section titled “How It Works”The parameter validation happens automatically in the Pipe base class:
# In Pipe.__init__ (src/open_ticket_ai/core/pipes/pipe.py:27-30)if isinstance(pipe_params._config, dict): self._config: ParamsT = self.params_class.model_validate(pipe_params._config)else: self._config: ParamsT = pipe_params._configFlow:
- YAML config loaded and templates rendered → produces
dict[str, Any] - Dict passed to Pipe constructor as
pipe_config.params - Base class checks if params is a dict
- If dict: validates using
params_class.model_validate() - If already typed: uses as-is
- Result:
self.paramsis always the validated Pydantic model
YAML Configuration Example
Section titled “YAML Configuration Example”Users write YAML with templates:
- id: my_custom_pipe use: 'mypackage:MyPipe' params: input_field: "{{ pipe_result('previous_step').data.output }}" threshold: "{{ env('THRESHOLD', '0.5') }}" max_items: 50What happens:
- Templates rendered:
input_fieldgets value from previous pipe,thresholdfrom env - Results in dict:
{"input_field": "some_value", "threshold": "0.5", "max_items": 50} - Passed to
MyPipe.__init__ - Validated to
MyPipeParams: types coerced (threshold: str → float) - Available as
self.params.threshold(float 0.5)
Dependency Injection
Section titled “Dependency Injection”Add service dependencies in the __init__ signature:
from packages.base.src.otai_base.ticket_system_integration import TicketSystemService
class FetchTicketsPipe(Pipe[FetchTicketsParams]): params_class = FetchTicketsParams
def __init__( self, ticket_system: TicketSystemService, # Injected automatically pipe_config: PipeConfig[FetchTicketsParams], logger_factory: LoggerFactory, *args: Any, **kwargs: Any, ) -> None: super().__init__(pipe_config, logger_factory) self.ticket_system = ticket_system
async def _process(self) -> PipeResult[FetchTicketsPipeResultData]: # Use injected service tickets = await self.ticket_system.find_tickets(...) return PipeResult[FetchTicketsPipeResultData](...)YAML config for service injection:
- id: fetch_tickets use: 'mypackage:FetchTicketsPipe' injects: ticket_system: 'otobo_system' # References a service by ID params: limit: 100Error Handling
Section titled “Error Handling”The base Pipe class handles errors automatically, but you can also handle specific cases:
async def _process(self) -> PipeResult[MyPipeResultData]: try: result = await self._risky_operation() return PipeResult[MyPipeResultData]( success=True, failed=False, data=MyPipeResultData(...) ) except SpecificError as e: self._logger.warning(f"Handled specific error: {e}") return PipeResult[MyPipeResultData]( success=False, failed=True, message=f"Operation failed: {e}", data=MyPipeResultData(processed_items=[], count=0) )Note: Unhandled exceptions are caught by the base class and result in a failed PipeResult.
Testing Custom Pipes
Section titled “Testing Custom Pipes”import pytestfrom open_ticket_ai.core.pipes.pipe_context_model import PipeContextfrom open_ticket_ai.core.pipes.pipe_models import PipeConfig
@pytest.mark.asyncioasync def test_my_pipe_processes_correctly(logger_factory): # Create params as dict (simulates YAML rendering) params = { "input_field": "test_value", "threshold": 0.7, "max_items": 10 }
# Create pipe config config = PipeConfig[MyPipeParams]( id="test_pipe", params=params )
# Instantiate pipe pipe = MyPipe(pipe_config=config, logger_factory=logger_factory)
# Execute context = PipeContext() result_context = await pipe.process(context)
# Assert assert "test_pipe" in result_context.pipe_results assert result_context.pipe_results["test_pipe"].succeeded assert result_context.pipe_results["test_pipe"].data.count > 0Common Patterns
Section titled “Common Patterns”Accessing Previous Pipe Results
Section titled “Accessing Previous Pipe Results”async def _process(self) -> PipeResult[MyPipeResultData]: # Access via pipe_config context (if needed) # Usually accessed via templates in YAML, but can also be done in code
# Use self.params which were set from templates input_data = self._config.input_field # Already resolved from template return PipeResult[MyPipeResultData](...)Conditional Execution
Section titled “Conditional Execution”Use the if field in YAML config:
- id: conditional_pipe use: 'mypackage:MyPipe' if: "{{ pipe_result('classifier').data.category == 'urgent' }}" params: # ...Dependent Pipes
Section titled “Dependent Pipes”Use the depends_on field:
- id: step2 use: 'mypackage:Step2Pipe' depends_on: - step1 params: input: "{{ pipe_result('step1').data.output }}"Best Practices
Section titled “Best Practices”DO:
- ✅ Always define
params_classas a class attribute - ✅ Let parent
__init__handle parameter validation - ✅ Use descriptive parameter names
- ✅ Provide sensible defaults in params model
- ✅ Return clear error messages in PipeResult
- ✅ Log important steps and decisions
- ✅ Keep
_process()focused and testable
DON’T:
- ❌ Don’t manually call
model_validate()in your__init__ - ❌ Don’t bypass the params_class mechanism
- ❌ Don’t put heavy logic in
__init__ - ❌ Don’t catch and hide all exceptions
- ❌ Don’t access unvalidated
pipe_config.paramsdirectly - ❌ Don’t forget to call
super().__init__()
Related Documentation
Section titled “Related Documentation”- Configuration and Template Rendering - Understanding the rendering flow
- Configuration Reference - YAML configuration syntax
- Testing Guide - Testing strategies for pipes
- Dependency Injection - Service injection patterns