Pipe System
Erfahren Sie, wie Open Ticket AI Pipes Daten durch sequentielle Ausführung, Kontextweitergabe, Abhängigkeiten und bedingte Logik verarbeiten.
Pipe System
Pipes sind die grundlegenden Verarbeitungseinheiten in Open Ticket AI. Jede Pipe führt eine spezifische Aufgabe aus, erhält Kontext von vorherigen Pipes, führt ihre Logik aus und gibt aktualisierten Kontext weiter.
Grundlegender Pipeline-Ablauf
Eine Pipeline ist eine Sequenz von Pipes, die nacheinander ausgeführt werden:
flowchart TD
Start([Start])
Pipe1["Pipe 1 - Fetch Tickets"]
Pipe2["Pipe 2 - Classify Tickets"]
Pipe3["Pipe 3 - Update Tickets"]
End([Complete])
Start --> Pipe1
Pipe1 --> Pipe2
Pipe2 --> Pipe3
Pipe3 --> End
Jede Pipe:
- Erhält den
PipeContext(enthält Ergebnisse von vorherigen Pipes) - Führt ihre spezifische Aufgabe aus
- Erstellt ein
PipeResultmit Ausgabedaten
Was ist eine Pipe?
Eine Pipe ist eine eigenständige Verarbeitungseinheit, die:
- Spezifische Geschäftslogik implementiert (Daten abrufen, klassifizieren, aktualisieren, etc.)
- Eingabe über
PipeContexterhält - Ausgabe als
PipeResultproduziert
flowchart LR
subgraph Rendering
PC["PipeContext"]:::value --> JR[["Jinja Renderer"]]:::renderer --> PP["PipeParams"]:::value
end
P(("Pipe")):::pipe
PR["PipeResult"]:::value
PP --> P --> PR
classDef pipe fill: #0b1220, stroke: #7c4dff, stroke-width: 2px, color: #e6e7ea
classDef renderer fill: #0b1220, stroke: #22c55e, stroke-dasharray: 5 3, stroke-width: 2px, color: #e6e7ea
classDef value fill: #111827, stroke: #475569, color: #e6e7ea
Pipe-Typen
Simple Pipes
Atomare Verarbeitungseinheiten, die spezifische Geschäftslogik implementieren:
Yaml-Beispiel
- id: fetch_tickets
use: open_ticket_ai.base:FetchTicketsPipe
injects:
ticket_system: 'otobo_znuny'
params:
search_criteria:
queue:
name: 'Support'
limit: 10
Eigenschaften:
- Führt spezifische Logik aus
- Keine Kind-Pipes
Spezifische Simple Pipes:
- AddNotePipe — registryKey: base:AddNotePipe
- FetchTicketsPipe — registryKey: base:FetchTicketsPipe
- UpdateTicketPipe — registryKey: base:UpdateTicketPipe
Expression Pipe (speziell)
Rendert einen Ausdruck und gibt diesen Wert zurück. Wenn es zu einem FailMarker gerendert wird, schlägt die Pipe fehl.
registryKey: base:ExpressionPipe
Yaml-Beispiel
- id: check_any_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}
Composite Pipes
Orchestratoren, die Kind-Pipes enthalten und ausführen:
Flussdiagramm
flowchart TB
subgraph CompositePipe
A["Pipe #1"] --> B["Pipe #2"] --> C["Pipe #3"]
end
A --> U["Result #1"]
B --> U["Result #2"]
C --> U["Result #3"]
U["union(Results #1, #2 #3)"]
U --> CR["PipeResult"]
Yaml-Beispiel
Composite Pipe Beispiel
- id: ticket_workflow
use: open_ticket_ai.base:CompositePipe
params:
threshold: 0.8
steps:
- id: fetch
use: open_ticket_ai.base:FetchTicketsPipe
injects: { ticket_system: 'otobo_znuny' }
params:
search_criteria:
queue: { name: 'Incoming' }
limit: 10
- id: classify
use: otai_hf_local:HFLocalTextClassificationPipe
params:
model: 'bert-base-german-cased'
text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"
- id: update
use: open_ticket_ai.base:UpdateTicketPipe
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify', 'predicted_queue') }}"Eigenschaften
- Enthält
steps-Liste von Kind-Pipe-Konfigurationen - Führt Kinder sequentiell aus
- Führt Ergebnisse zusammen
- Kinder können über
parent.paramsauf Eltern-Parameter zugreifen
Composite-Ausführung:
- Für jeden Schritt:
- Rendern: Rendert Parameter mit Jinja2 unter Verwendung des aktuellen Kontexts
- Ausführen: Führt nächste Kind-Pipe mit gerenderten Parametern aus
- Schleife: Fährt mit dem nächsten Schritt fort
- Finalisierung:
- Union: Führt alle Kindergebnisse zusammen unter Verwendung von
- Rückgabe: Gibt den finalen aktualisierten Kontext zurück
SimpleSequentialOrchestrator (speziell)
Führt seine steps in einer Endlosschleife aus. Dient für Hintergrund-artige Zyklen. Er stellt die
Ergebnisse der Kind-Pipes nicht als ein einzelnes Pipe-Ergebnis zur Verfügung. registryKey: base:SimpleSequentialOrchestrator
Flussdiagramm
flowchart TB
subgraph SimpleSequentialOrchestrator
S1["step #1"] --> S2["step #2"] --> S3["step #3"]
end
S3 --> Z(("sleep"))
Z -->|loop| S1
Yaml-Beispiel
SimpleSequentialOrchestrator Beispiel
- id: orchestrator
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: tick
use: 'base:IntervalTrigger'
params: { interval: 'PT5S' }
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }SimpleSequentialRunner (speziell)
Hat zwei Parameter: on und run (beides sind Pipe-Konfigurationen). Wenn on erfolgreich ist, führt es run aus;
ansonsten wird es übersprungen. registryKey: base:SimpleSequentialRunner
Flussdiagramm
flowchart TB
subgraph SimpleSequentialRunner
ON{"on (trigger) succeded ?"} -->|yes| RUN["run (action)"]
ON -.->|no| SKIP["skipped"]
end
Yaml-Beispiel
- id: run-when-triggered
use: 'base:SimpleSequentialRunner'
params:
on:
id: gate
use: 'base:IntervalTrigger'
params: { interval: 'PT60S' }
run:
id: do-something
use: 'base:ExpressionPipe'
params: { expression: 'Triggered run' }
Kontextweitergabe zwischen Pipes
Felddetails:
-
pipes: Enthält Ergebnisse aller zuvor ausgeführten Pipes, nach Pipe-ID geordnet- Zugriff über
pipe_result('pipe_id')in Templates - Wird akkumuliert, wenn jede Pipe abgeschlossen ist
- In CompositePipe: zusammengeführte Ergebnisse aller Kind-Schritte
- Zugriff über
-
params: Parameter der aktuellen Pipe- Wird gesetzt, wenn die Pipe erstellt wird
- Zugänglich über
params.*in Templates - Für verschachtelte Pipes kann über
parentauf Eltern verwiesen werden
-
parent: Referenz auf Eltern-Parameter
PipeResult-Struktur
Jede Pipe erzeugt ein PipeResult, das Ausführungsstatus und Daten enthält:
| Attribut | Datentyp | Beschreibung |
|---|---|---|
| succeeded | true/false | Ob die Pipe erfolgreich ohne Fehler abgeschlossen wurde |
| data | Name:Wert-Paare | Ausgabedaten, die von der Pipe für nachfolgende Pipes oder externe Systeme erzeugt wurden |
| was_skipped | true/false | Ob die Pipe aufgrund fehlgeschlagener Abhängigkeiten oder bedingter Ausführung übersprungen wurde |
| message | TEXT | Menschenlesbare Nachricht, die das Ergebnis oder Probleme beschreibt |
Auf diese Ergebnisse greift man in den Parametern einer anderen Pipe mit diesen Funktionen zu. Derzeit gibt es keine Möglichkeit, was_skipped oder message zu lesen;
| Funktion | Parameter | Rückgabewert | Fehler wenn… |
|---|---|---|---|
has_failed | pipe_id: text | True, wenn das gegebene Pipe-Ergebnis als fehlgeschlagen markiert ist | Unbekannte Pipe-ID |
get_pipe_result | pipe_id: text, data_key: text; default = value | Wert, der im vorherigen Pipe-Ergebnis unter dem gegebenen data_key gespeichert ist | Pipe oder Schlüssel fehlt |
Wenn das Piperesult also beispielsweise ist:
- id: ticket_fetcher
result:
succeeded: true
data:
fetched_tickets:
- id: 123
subject: 'Help me!'
- id: 124
subject: 'Another ticket'
Um auf die abgerufenen Tickets zuzugreifen, würden Sie verwenden:
{ { get_pipe_result('ticket_fetcher', 'fetched_tickets') } }
Gibt zurück:
- id: 123
subject: 'Help me!'
- id: 124
subject: 'Another ticket'
Um auf den Betreff des ersten Tickets zuzugreifen, würden Sie verwenden:
{ { (get_pipe_result('ticket_fetcher', 'fetched_tickets') | first)[ 'subject' ] }
Gibt zurück:
Help me!
Um zu prüfen, ob die ticket_fetcher Pipe fehlgeschlagen ist, würden Sie verwenden:
{ { has_failed('ticket_fetcher') } }
Gibt zurück:
false
Die Erfolgsprüfung kann in Kombination mit fail() verwendet werden, um Guards zu erstellen:
- id: fail_no_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if has_failed('ticket_fetcher' else 'ok' }}
Die Pipeline/der Flow würde an dieser Stelle die Ausführung stoppen, wenn die ticket_fetcher Pipe fehlgeschlagen ist.
Pipe-Registry-Übersicht
| Name | registryKey | Parameter (Namen, kommagetrennt) |
|---|---|---|
| AddNotePipe | base:AddNotePipe | ticket_id, note |
| FetchTicketsPipe | base:FetchTicketsPipe | ticket_search_criteria |
| UpdateTicketPipe | base:UpdateTicketPipe | ticket_id, updated_ticket |
| ClassificationPipe | base:ClassificationPipe | text, model_name, api_token |
| ExpressionPipe | base:ExpressionPipe | expression |
| IntervalTrigger | base:IntervalTrigger | interval |
| SimpleSequentialRunner | base:SimpleSequentialRunner | on, run |
| SimpleSequentialOrchestrator | base:SimpleSequentialOrchestrator | orchestrator_sleep, exception_sleep, always_retry, steps |
| CompositePipe | base:CompositePipe | steps |
Verwenden Sie diese Werte in Ihrer YAML als:
use: "<registryKey>".
Pipe-Details
AddNotePipe
Die Pipe AddNotePipe fügt einem gegebenen Ticket im Ticket-System eine strukturierte Notiz (Betreff/Text) hinzu.
Sie protokolliert eine kurze Vorschau der Notiz und schreibt sie über den Ticket-System-Service.
registryKey: base:AddNotePipe
Um sie zu “verwenden”, setzen Sie use auf diesen registryKey.
- id: 'my-add_note_pipe'
use: 'base:AddNotePipe'
params:
ticket_id: '<ticket id>'
note:
subject: '<subject>'
body: '<body>'
FetchTicketsPipe
Die Pipe FetchTicketsPipe fragt Tickets vom Ticket-System unter Verwendung eines vereinheitlichten
TicketSearchCriteria (Queue, Limit, Offset, etc.) ab und gibt sie als fetched_tickets zurück.
registryKey: base:FetchTicketsPipe
- id: 'my-fetch_tickets'
use: 'base:FetchTicketsPipe'
params:
ticket_search_criteria:
queue:
name: '<QueueName>'
limit: 10
UpdateTicketPipe
Die Pipe UpdateTicketPipe aktualisiert ein bestehendes Ticket anhand der ID unter Verwendung einer vereinheitlichten UnifiedTicket-
Nutzlast (z.B. Queue, Priorität, Felder). Gibt den Erfolgsstatus vom Ticket-System zurück.
registryKey: base:UpdateTicketPipe
- id: 'my-update_ticket'
use: 'base:UpdateTicketPipe'
params:
ticket_id: '<ticket id>'
updated_ticket:
queue:
name: '<QueueName>'
ClassificationPipe
Die Pipe ClassificationPipe klassifiziert Eingabetext unter Verwendung eines konfigurierten ClassificationService und
Modells und gibt Label, Konfidenz und Scores (falls verfügbar) zurück.
registryKey: base:ClassificationPipe
- id: 'my-classify'
use: 'base:ClassificationPipe'
injects: { classification_service: 'hf_local' }
params:
text: '{{ some_text }}'
model_name: 'softoft/otai-queue-de-bert-v1'
api_token: '{{ env.HF_TOKEN | default(None) }}'
ExpressionPipe
Die Pipe ExpressionPipe gibt den Wert von expression zurück; wenn er zu einem FailMarker ausgewertet wird, schlägt die
Pipe fehl (nützlich für Kontrollfluss/Guards).
registryKey: base:ExpressionPipe
- id: 'my-expression'
use: 'base:ExpressionPipe'
params:
expression: "{{ fail() if condition else 'ok' }}"
IntervalTrigger
Die Pipe IntervalTrigger steuert die nachgelagerte Ausführung basierend auf einem Zeitintervall; sie ist erfolgreich, sobald das
Intervall seit dem letzten Trigger vergangen ist, andernfalls schlägt sie fehl.
registryKey: base:IntervalTrigger
- id: 'my-interval'
use: 'base:IntervalTrigger'
params:
interval: 'PT60S'
SimpleSequentialRunner
Die Pipe SimpleSequentialRunner führt on aus und, nur wenn es erfolgreich ist, führt es run aus. Es ist ein minimaler
Zwei-Schritt-Kontroll-Runner.
registryKey: base:SimpleSequentialRunner
- id: 'my-simple_runner'
use: 'base:SimpleSequentialRunner'
params:
on:
id: 'interval'
use: 'base:IntervalTrigger'
params: { interval: 'PT5M' }
run:
id: 'do-something'
use: 'base:ExpressionPipe'
params: { expression: 'go!' }
SimpleSequentialOrchestrator
Die Pipe SimpleSequentialOrchestrator läuft endlos in einer Schleife und führt steps in jeder Runde der Reihe nach aus; er
schläft optional zwischen den Zyklen und bei Ausnahmen und kann automatisch wiederholen.
registryKey: base:SimpleSequentialOrchestrator
Für SimpleSequentialOrchestrator müssen Sie keine Parameter setzen, da alle Attribute optional sind.
- id: 'my-orchestrator'
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: 'step-1'
use: 'base:ExpressionPipe'
params: { expression: 'ok' }
CompositePipe
Die Pipe CompositePipe führt ihre steps sequentiell aus, stoppt beim ersten Fehler und gibt
eine Vereinigung der vorherigen Ergebnisse zurück. Sie ist der grundlegende Baustein für mehrstufige Flows.
registryKey: base:CompositePipe
Für CompositePipe müssen Sie keine Parameter setzen, da alle Attribute optional sind.
- id: 'my-composite'
use: 'base:CompositePipe'
params:
steps:
- id: 'one'
use: 'base:ExpressionPipe'
params: { expression: 'first' }
- id: 'two'
use: 'base:ExpressionPipe'
params: { expression: 'second' } 