Pipeline Code Development
Entwicklerleitfaden zur Implementierung benutzerdefinierter Pipes in Open Ticket AI mit dem dict[str, Any]-Muster für die Parameter-Validierung.
Pipeline Code Development
Dieser Leitfaden erklärt, wie benutzerdefinierte Pipes mit dem aktuellen Parameter-Validierungsmuster implementiert werden.
Pipe-Typen
Simple Pipes
Atomare Verarbeitungseinheiten, die spezifische Geschäftslogik implementieren:
- id: fetch_tickets
use: open_ticket_ai.base:FetchTicketsPipe
injects:
ticket_system: 'otobo_znuny'
params:
search_criteria:
queue:
name: 'Support'
limit: 10
Merkmale:
- Führt spezifische Logik aus
- Keine Child-Pipes
Composite Pipes
Orchestratoren, die Child-Pipes enthalten und ausführen:
flowchart TB
subgraph CompositePipe
A["Pipe #1"] --> B["Pipe #2"] --> C["Pipe #3"]
end
A --> U["Result #1"]
B --> U["Result #2"]
C --> U["Result #3"]
U["union([Result #1, Result #2, Result #3])"]
U --> CR["CompositeResult"]
Composite Pipe Beispiel
- id: ticket_workflow
use: open_ticket_ai.base:CompositePipe
params:
threshold: 0.8
steps:
- id: fetch
use: open_ticket_ai.base:FetchTicketsPipe
injects: { ticket_system: 'otobo_znuny' }
params:
search_criteria:
queue: { name: 'Incoming' }
limit: 10
- id: classify
use: otai_hf_local:HFLocalTextClassificationPipe
params:
model: 'bert-base-german-cased'
text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"
depends_on: [fetch]
- id: update
use: open_ticket_ai.base:UpdateTicketPipe
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify').data.predicted_queue }}"
depends_on: [classify]Merkmale:
- Enthält
steps-Liste von Child-Pipe-Konfigurationen - Verwendet
PipeFactoryzum Erstellen von Child-Pipes - Führt Child-Pipes sequenziell aus
- Führt Ergebnisse via
PipeResult.union()zusammen - Child-Pipes können über
parent.paramsauf Parent-Parameter zugreifen
Composite-Ausführung:
- Initialisierung: Vorbereitung zum Durchlaufen der
steps-Liste - Für jeden Schritt:
- Zusammenführen: Parent-Parameter mit Schritt-Parametern kombinieren (Schritt überschreibt)
- Erstellen: Factory verwenden, um Child-Pipe-Instanz zu erstellen
- Ausführen:
child.process(context)aufrufen → aktualisiert den Kontext - Sammeln: Child-Ergebnis wird in
context.pipes[child_id]gespeichert - Weiter: Zum nächsten Schritt gehen
- Finalisierung:
- Union: Alle Child-Ergebnisse mit
PipeResult.union()zusammenführen - Speichern: Composite-Ergebnis im Kontext speichern
- Rückgabe: Finalen aktualisierten Kontext zurückgeben
- Union: Alle Child-Ergebnisse mit
Feld-Details:
-
pipes: Enthält Ergebnisse aller zuvor ausgeführten Pipes, nach Pipe-ID geordnet- Wird akkumuliert, wenn jede Pipe abgeschlossen ist
- In CompositePipe: zusammengeführte Ergebnisse aller Child-Schritte
- Zugriff via
pipe_result('pipe_id')in Templates
-
params: Parameter der aktuellen Pipe- Wird gesetzt, wenn die Pipe erstellt wird
- Zugänglich via
params.*in Templates - Für verschachtelte Pipes kann via
parent.paramsauf den Parent verwiesen werden
-
parent: Referenz auf den Parent-Kontext (wenn innerhalb einer CompositePipe)- Ermöglicht Zugriff auf Parent-Scope-Variablen
- Erstellt hierarchische Kontextkette
- Kann mehrere Ebenen durchlaufen (
parent.parent...)
Pipe-Typen (Einfache Anleitung)
Simple Pipes
Kleine, fokussierte Schritte. Beispiele:
- AddNotePipe —
registryKey: base:AddNotePipe - FetchTicketsPipe —
registryKey: base:FetchTicketsPipe - UpdateTicketPipe —
registryKey: base:UpdateTicketPipe
- id: fetch_tickets
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria:
queue: { name: 'Support' }
limit: 10
Expression Pipe (speziell)
Rendert einen Ausdruck und gibt diesen Wert zurück. Wenn es zu einem FailMarker gerendert wird, schlägt die Pipe fehl.
registryKey: base:ExpressionPipe
- id: check_any_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}
Composite Pipes
Führen mehrere Child-Pipes der Reihe nach aus und geben die Union ihrer Ergebnisse zurück.
registryKey: base:CompositePipe
flowchart LR
A["Pipe #1"] --> B["Pipe #2"] --> C["Pipe #3"]
classDef node fill: #111827, stroke: #374151, color: #e6e7ea
class A node
class B node
class C node
- id: ticket_flow
use: 'base:CompositePipe'
params:
steps:
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 10 }
- id: pick_first
use: 'base:ExpressionPipe'
params:
expression: "{{ get_pipe_result('fetch','fetched_tickets')[0] }}"
- id: classify
use: 'base:ClassificationPipe'
injects: { classification_service: 'hf_local' }
params:
text: "{{ get_pipe_result('pick_first')['subject'] }} {{ get_pipe_result('pick_first')['body'] }}"
model_name: 'softoft/otai-queue-de-bert-v1'
- id: update
use: 'base:UpdateTicketPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('pick_first')['id'] }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify','label') if get_pipe_result('classify','confidence') >= 0.8 else 'OpenTicketAI::Unclassified' }}"
Wie es sich verhält (nicht-technisch):
- Führt Child-Pipes nacheinander aus
- Stoppt beim ersten Fehler
- Gibt ein zusammengeführtes Ergebnis von allem zurück, was erfolgreich war Hier ist es — klein + einfach.
flowchart TB
subgraph SimpleSequentialOrchestrator
S1["step #1"] --> S2["step #2"] --> S3["step #3"]
end
S3 --> Z(("sleep"))
Z -->|loop| S1
flowchart TB
subgraph SimpleSequentialRunner
ON["on (trigger)"] -->|success| RUN["run (action)"]
ON -.->|fail| SKIP["skipped"]
end
SimpleSequentialOrchestrator (speziell)
Führt seine steps in einer Endlosschleife aus. Es ist für Hintergrund-artige Zyklen. Es macht die
Ergebnisse der Child-Pipes nicht als ein einzelnes Pipe-Ergebnis verfügbar. registryKey: base:SimpleSequentialOrchestrator
- id: orchestrator
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: tick
use: 'base:IntervalTrigger'
params: { interval: 'PT5S' }
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }
SimpleSequentialRunner (speziell)
Hat zwei Parameter: on und run (beides sind Pipe-Konfigurationen). Wenn on erfolgreich ist, führt es run aus;
ansonsten wird es übersprungen. registryKey: base:SimpleSequentialRunner
- id: run-when-triggered
use: 'base:SimpleSequentialRunner'
params:
on:
id: gate
use: 'base:IntervalTrigger'
params: { interval: 'PT60S' }
run:
id: do-something
use: 'base:ExpressionPipe'
params: { expression: 'Triggered run' }
Kurze Notizen
- registryKey = was Sie in
useeintragen, z.B.use: "base:FetchTicketsPipe". - Auf Parent-Parameter zugreifen: Verwenden Sie
parentnur für die Parameter des direkten Parents (keine mehrstufigen Ketten).
Wenn Sie möchten, konvertiere ich dies in eine VitePress-Seite mit derselben Struktur.
Pipe-Ausführungsfluss
%%{init:{
"flowchart":{"defaultRenderer":"elk","htmlLabels":true,"curve":"linear"},
"themeVariables":{"fontSize":"14px","fontFamily":"system-ui","lineColor":"#718096"},
}}%%
flowchart TB
%% ===================== PIPE ENTRY =====================
subgraph ENTRY["📥 Pipe.process(context)"]
direction TB
Start([pipe.process]):::start
CheckShould{"should_run?<br/>(if_ condition)"}:::dec
CheckDeps{"Dependencies met?<br/>(depends_on)"}:::dec
Skip["⏭️ Skip → return context"]:::skip
Start --> CheckShould
CheckShould -- ✓ --> CheckDeps
CheckShould -- ✗ --> Skip
CheckDeps -- ✗ --> Skip
end
%% ===================== EXECUTION =====================
subgraph EXEC["⚙️ Execution"]
direction TB
ProcessAndSave["__process_and_save()"]:::proc
TryCatch["try-catch wrapper"]:::proc
RunProcess["await _process()"]:::proc
CreateResult["Create PipeResult"]:::proc
ProcessAndSave --> TryCatch --> RunProcess --> CreateResult
end
%% ===================== ERROR HANDLING =====================
subgraph ERROR["❌ Error Handling"]
direction TB
CatchEx["Catch Exception"]:::error
LogError["Logger.error + traceback"]:::log
CreateFailed["Create failed PipeResult"]:::error
CatchEx --> LogError --> CreateFailed
end
%% ===================== PERSISTENCE =====================
subgraph PERSIST["💾 Persistence"]
direction TB
SaveResult["context.pipes[pipe_id] = result"]:::ctx
LogResult["Logger.info/warning"]:::log
Return["Return updated context"]:::ctx
SaveResult --> LogResult --> Return
end
%% ===================== CONNECTIONS =====================
CheckDeps -- ✓ --> ProcessAndSave
TryCatch --> CatchEx
CreateResult --> SaveResult
CreateFailed --> SaveResult
%% ===================== STYLES =====================
classDef start fill: #2d6a4f, stroke: #1b4332, stroke-width: 3px, color: #fff, font-weight: bold
classDef dec fill: #d97706, stroke: #b45309, stroke-width: 2px, color: #fff, font-weight: bold
classDef skip fill: #374151, stroke: #1f2937, stroke-width: 2px, color: #9ca3af
classDef proc fill: #2b2d42, stroke: #14213d, stroke-width: 2px, color: #e0e0e0
classDef error fill: #dc2626, stroke: #991b1b, stroke-width: 2px, color: #fff
classDef log fill: #0891b2, stroke: #0e7490, stroke-width: 2px, color: #fff
classDef ctx fill: #165b33, stroke: #0d3b24, stroke-width: 2px, color: #e0e0e0
flowchart TB
%% ===================== PIPE ENTRY =====================
subgraph ENTRY["📥 Pipe.process()"]
direction TB
Start([pipe.process]):::start
CheckShould{"should_run?"}:::dec
CheckDeps{"Dependencies met?"}:::dec
Skip["⏭️ Skip execution"]:::skip
Start --> CheckShould
CheckShould -- ✓ True --> CheckDeps
CheckShould -- ✗ False --> Skip
CheckDeps -- ✗ Missing --> Skip
end
%% ===================== EXECUTION =====================
subgraph EXEC["⚙️ Execution"]
direction TB
ProcessAndSave["__process_and_save()"]:::proc
TryCatch["try-catch wrapper"]:::proc
RunProcess["await _process()<br/>(subclass implementation)"]:::proc
CreateResult["Create PipeResult<br/>with data"]:::proc
ProcessAndSave --> TryCatch --> RunProcess --> CreateResult
end
%% ===================== ERROR HANDLING =====================
subgraph ERROR["❌ Error Handling"]
direction TB
CatchEx["Catch Exception"]:::error
LogError["Logger.error<br/>+ traceback"]:::log
CreateFailed["Create failed<br/>PipeResult"]:::error
CatchEx --> LogError --> CreateFailed
end
%% ===================== PERSISTENCE =====================
subgraph PERSIST["💾 Context Update"]
direction TB
SaveResult["context.pipes[pipe_id]<br/>= result"]:::ctx
LogResult["Log result<br/>(info/warning)"]:::log
Return["Return updated<br/>context"]:::ctx
SaveResult --> LogResult --> Return
end
%% ===================== CONNECTIONS =====================
CheckDeps -- ✓ Met --> ProcessAndSave
TryCatch --> CatchEx
CreateResult --> SaveResult
CreateFailed --> SaveResult
%% ===================== STYLES =====================
classDef start fill: #2d6a4f, stroke: #1b4332, stroke-width: 3px, color: #fff, font-weight: bold
classDef dec fill: #d97706, stroke: #b45309, stroke-width: 2px, color: #fff, font-weight: bold
classDef skip fill: #374151, stroke: #1f2937, stroke-width: 2px, color: #9ca3af
classDef proc fill: #2b2d42, stroke: #14213d, stroke-width: 2px, color: #e0e0e0
classDef error fill: #dc2626, stroke: #991b1b, stroke-width: 2px, color: #fff
classDef log fill: #0891b2, stroke: #0e7490, stroke-width: 2px, color: #fff
classDef ctx fill: #165b33, stroke: #0d3b24, stroke-width: 2px, color: #e0e0e0
Implementierung einer benutzerdefinierten Pipe
Schritt 1: Parameter-Modell definieren
Erstellen Sie ein Pydantic-Modell für die Parameter Ihrer Pipe:
from pydantic import BaseModel
class MyPipeParams(BaseModel):
input_field: str
threshold: float = 0.5
max_items: int = 100
Schritt 2: Ergebnis-Datenmodell definieren
Erstellen Sie ein Modell für die Ausgabe Ihrer Pipe:
class MyPipeResultData(BaseModel):
processed_items: list[str]
count: int
Schritt 3: Pipe-Klasse implementieren
from typing import Any
from open_ticket_ai.core.pipes.pipe import Pipe
from open_ticket_ai.core.pipes.pipe_models import PipeConfig, PipeResult
from open_ticket_ai.core.logging.logging_iface import LoggerFactory
class MyPipe(Pipe[MyPipeParams]):
params_class = MyPipeParams # Erforderliches Klassenattribut
def __init__(
self,
pipe_config: PipeConfig[MyPipeParams],
logger_factory: LoggerFactory,
# Injected Services hier hinzufügen
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
# self.params ist jetzt eine validierte MyPipeParams-Instanz
async def _process(self) -> PipeResult[MyPipeResultData]:
# Auf validierte Parameter zugreifen
input_val = self.params.input_field
threshold = self.params.threshold
# Ihre Verarbeitungslogik hier
items = self._do_processing(input_val, threshold)
# Ergebnis zurückgeben
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(
processed_items=items,
count=len(items)
)
)
def _do_processing(self, input_val: str, threshold: float) -> list[str]:
# Implementierungsdetails
return []
Parameter-Validierungsmuster
Wie es funktioniert
Die Parameter-Validierung erfolgt automatisch in der Pipe-Basisklasse:
# In Pipe.__init__ (src/open_ticket_ai/core/pipes/pipe.py:27-30)
if isinstance(pipe_params._config, dict):
self._config: ParamsT = self.params_class.model_validate(pipe_params._config)
else:
self._config: ParamsT = pipe_params._config
Ablauf:
- YAML-Konfiguration geladen und Templates gerendert → erzeugt
dict[str, Any] - Dict wird als
pipe_config.paramsan den Pipe-Konstruktor übergeben - Basisklasse prüft, ob params ein dict ist
- Wenn dict: validiert mit
params_class.model_validate() - Wenn bereits typisiert: verwendet es unverändert
- Ergebnis:
self.paramsist immer das validierte Pydantic-Modell
YAML-Konfigurationsbeispiel
Benutzer schreiben YAML mit Templates:
- id: my_custom_pipe
use: 'mypackage:MyPipe'
params:
input_field: "{{ pipe_result('previous_step').data.output }}"
threshold: "{{ env('THRESHOLD', '0.5') }}"
max_items: 50
Was passiert:
- Templates gerendert:
input_fielderhält Wert von vorheriger Pipe,thresholdvon env - Ergebnis als dict:
{"input_field": "some_value", "threshold": "0.5", "max_items": 50} - An
MyPipe.__init__übergeben - Zu
MyPipeParamsvalidiert: Typen umgewandelt (threshold: str → float) - Verfügbar als
self.params.threshold(float 0.5)
Dependency Injection
Fügen Sie Service-Abhängigkeiten in der __init__-Signatur hinzu:
from packages.base.src.otai_base.ticket_system_integration import TicketSystemService
class FetchTicketsPipe(Pipe[FetchTicketsParams]):
params_class = FetchTicketsParams
def __init__(
self,
ticket_system: TicketSystemService, # Wird automatisch injected
pipe_config: PipeConfig[FetchTicketsParams],
logger_factory: LoggerFactory,
*args: Any,
**kwargs: Any,
) -> None:
super().__init__(pipe_config, logger_factory)
self.ticket_system = ticket_system
async def _process(self) -> PipeResult[FetchTicketsPipeResultData]:
# Injected Service verwenden
tickets = await self.ticket_system.find_tickets(...)
return PipeResult[FetchTicketsPipeResultData](...)
YAML-Konfiguration für Service-Injection:
- id: fetch_tickets
use: 'mypackage:FetchTicketsPipe'
injects:
ticket_system: 'otobo_system' # Referenziert einen Service per ID
params:
limit: 100
Fehlerbehandlung
Die Basisklasse Pipe behandelt Fehler automatisch, aber Sie können auch spezifische Fälle behandeln:
async def _process(self) -> PipeResult[MyPipeResultData]:
try:
result = await self._risky_operation()
return PipeResult[MyPipeResultData](
success=True,
failed=False,
data=MyPipeResultData(...)
)
except SpecificError as e:
self._logger.warning(f"Handled specific error: {e}")
return PipeResult[MyPipeResultData](
success=False,
failed=True,
message=f"Operation failed: {e}",
data=MyPipeResultData(processed_items=[], count=0)
)
Hinweis: Nicht behandelte Exceptions werden von der Basisklasse abgefangen und führen zu einem fehlgeschlagenen PipeResult.
Testen benutzerdefinierter Pipes
import pytest
from open_ticket_ai.core.pipes.pipe_context_model import PipeContext
from open_ticket_ai.core.pipes.pipe_models import PipeConfig
@pytest.mark.asyncio
async def test_my_pipe_processes_correctly(logger_factory):
# Parameter als dict erstellen (simuliert YAML-Rendering)
params = {
"input_field": "test_value",
"threshold": 0.7,
"max_items": 10
}
# Pipe-Konfiguration erstellen
config = PipeConfig[MyPipeParams](
id="test_pipe",
params=params
)
# Pipe instanziieren
pipe = MyPipe(pipe_config=config, logger_factory=logger_factory)
# Ausführen
context = PipeContext()
result_context = await pipe.process(context)
# Assert
assert "test_pipe" in result_context.pipe_results
assert result_context.pipe_results["test_pipe"].succeeded
assert result_context.pipe_results["test_pipe"].data.count > 0
Häufige Muster
Auf vorherige Pipe-Ergebnisse zugreifen
async def _process(self) -> PipeResult[MyPipeResultData]:
# Zugriff via pipe_config context (falls benötigt)
# Normalerweise via Templates in YAML, kann aber auch im Code erfolgen
# self.params verwenden, die aus Templates gesetzt wurden
input_data = self._config.input_field # Bereits aus Template aufgelöst
return PipeResult[MyPipeResultData](...)
Bedingte Ausführung
Verwenden Sie das if-Feld in der YAML-Konfiguration:
- id: conditional_pipe
use: 'mypackage:MyPipe'
if: "{{ pipe_result('classifier').data.category == 'urgent' }}"
params:
# ...
Abhängige Pipes
Verwenden Sie das depends_on-Feld:
- id: step2
use: 'mypackage:Step2Pipe'
depends_on:
- step1
params:
input: "{{ pipe_result('step1').data.output }}"
Best Practices
TUN:
- ✅ Immer
params_classals Klassenattribut definieren - ✅ Eltern-
__init__die Parameter-Validierung überlassen - ✅ Beschreibende Parameternamen verwenden
- ✅ Sinnvolle Defaults im Parameter-Modell angeben
- ✅ Klare Fehlermeldungen in PipeResult zurückgeben
- ✅ Wichtige Schritte und Entscheidungen loggen
- ✅
_process()fokussiert und testbar halten
NICHT TUN:
- ❌
model_validate()nicht manuell in Ihrem__init__aufrufen - ❌ Den params_class-Mechanismus nicht umgehen
- ❌ Schwere Logik nicht in
__init__packen - ❌ Nicht alle Exceptions abfangen und verstecken
- ❌ Nicht auf unvalidierte
pipe_config.paramsdirekt zugreifen - ❌
super().__init__()nicht vergessen
Verwandte Dokumentation
- Configuration and Template Rendering - Den Rendering-Flow verstehen
- Testing Guide - Teststrategien für Pipes
- Dependency Injection - Service-Injection-Muster
