Pipe System

Erfahren Sie, wie Open Ticket AI Pipes Daten durch sequentielle Ausführung, Kontextweitergabe, Abhängigkeiten und bedingte Logik verarbeiten.

Pipe System

Pipes sind die grundlegenden Verarbeitungseinheiten in Open Ticket AI. Jede Pipe führt eine spezifische Aufgabe aus, erhält Kontext von vorherigen Pipes, führt ihre Logik aus und gibt aktualisierten Kontext weiter.

Grundlegender Pipeline-Ablauf

Eine Pipeline ist eine Sequenz von Pipes, die nacheinander ausgeführt werden:

flowchart TD
    Start([Start])
    Pipe1["Pipe 1 - Fetch Tickets"]
    Pipe2["Pipe 2 - Classify Tickets"]
    Pipe3["Pipe 3 - Update Tickets"]
    End([Complete])
    Start --> Pipe1
    Pipe1 --> Pipe2
    Pipe2 --> Pipe3
    Pipe3 --> End

Jede Pipe:

  1. Erhält den PipeContext (enthält Ergebnisse von vorherigen Pipes)
  2. Führt ihre spezifische Aufgabe aus
  3. Erstellt ein PipeResult mit Ausgabedaten

Was ist eine Pipe?

Eine Pipe ist eine eigenständige Verarbeitungseinheit, die:

  • Spezifische Geschäftslogik implementiert (Daten abrufen, klassifizieren, aktualisieren, etc.)
  • Eingabe über PipeContext erhält
  • Ausgabe als PipeResult produziert
flowchart LR
    subgraph Rendering
        PC["PipeContext"]:::value --> JR[["Jinja Renderer"]]:::renderer --> PP["PipeParams"]:::value
    end

    P(("Pipe")):::pipe
    PR["PipeResult"]:::value
    PP --> P --> PR
    classDef pipe fill: #0b1220, stroke: #7c4dff, stroke-width: 2px, color: #e6e7ea
    classDef renderer fill: #0b1220, stroke: #22c55e, stroke-dasharray: 5 3, stroke-width: 2px, color: #e6e7ea
    classDef value fill: #111827, stroke: #475569, color: #e6e7ea

Pipe-Typen

Simple Pipes

Atomare Verarbeitungseinheiten, die spezifische Geschäftslogik implementieren:

Yaml-Beispiel

- id: fetch_tickets
  use: open_ticket_ai.base:FetchTicketsPipe
  injects:
    ticket_system: 'otobo_znuny'
  params:
    search_criteria:
      queue:
        name: 'Support'
      limit: 10

Eigenschaften:

  • Führt spezifische Logik aus
  • Keine Kind-Pipes

Spezifische Simple Pipes:

  • AddNotePipe — registryKey: base:AddNotePipe
  • FetchTicketsPipe — registryKey: base:FetchTicketsPipe
  • UpdateTicketPipe — registryKey: base:UpdateTicketPipe

Expression Pipe (speziell)

Rendert einen Ausdruck und gibt diesen Wert zurück. Wenn es zu einem FailMarker gerendert wird, schlägt die Pipe fehl. registryKey: base:ExpressionPipe

Yaml-Beispiel

- id: check_any_tickets
  use: 'base:ExpressionPipe'
  params:
    expression: >
      {{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}

Composite Pipes

Orchestratoren, die Kind-Pipes enthalten und ausführen:

Flussdiagramm

flowchart TB
    subgraph CompositePipe
        A["Pipe #1"] --> B["Pipe #2"] --> C["Pipe #3"]
    end

    A --> U["Result #1"]
    B --> U["Result #2"]
    C --> U["Result #3"]
    U["union(Results #1, #2 #3)"]
    U --> CR["PipeResult"]

Yaml-Beispiel

Composite Pipe Beispiel
- id: ticket_workflow
  use: open_ticket_ai.base:CompositePipe
  params:
    threshold: 0.8
  steps:
    - id: fetch
      use: open_ticket_ai.base:FetchTicketsPipe
      injects: { ticket_system: 'otobo_znuny' }
      params:
        search_criteria:
          queue: { name: 'Incoming' }
          limit: 10

    - id: classify
      use: otai_hf_local:HFLocalTextClassificationPipe
      params:
        model: 'bert-base-german-cased'
        text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"

    - id: update
      use: open_ticket_ai.base:UpdateTicketPipe
      injects: { ticket_system: 'otobo_znuny' }
      params:
        ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
        updated_ticket:
          queue:
            name: "{{ get_pipe_result('classify', 'predicted_queue') }}"

Eigenschaften

  • Enthält steps-Liste von Kind-Pipe-Konfigurationen
  • Führt Kinder sequentiell aus
  • Führt Ergebnisse zusammen
  • Kinder können über parent.params auf Eltern-Parameter zugreifen

Composite-Ausführung:

  1. Für jeden Schritt:
    • Rendern: Rendert Parameter mit Jinja2 unter Verwendung des aktuellen Kontexts
    • Ausführen: Führt nächste Kind-Pipe mit gerenderten Parametern aus
    • Schleife: Fährt mit dem nächsten Schritt fort
  2. Finalisierung:
    • Union: Führt alle Kindergebnisse zusammen unter Verwendung von
    • Rückgabe: Gibt den finalen aktualisierten Kontext zurück

SimpleSequentialOrchestrator (speziell)

Führt seine steps in einer Endlosschleife aus. Dient für Hintergrund-artige Zyklen. Er stellt die Ergebnisse der Kind-Pipes nicht als ein einzelnes Pipe-Ergebnis zur Verfügung. registryKey: base:SimpleSequentialOrchestrator

Flussdiagramm

flowchart TB
    subgraph SimpleSequentialOrchestrator
        S1["step #1"] --> S2["step #2"] --> S3["step #3"]
    end
    S3 --> Z(("sleep"))
    Z -->|loop| S1

Yaml-Beispiel

SimpleSequentialOrchestrator Beispiel
- id: orchestrator
  use: 'base:SimpleSequentialOrchestrator'
  params:
    orchestrator_sleep: 'PT0.5S'
    exception_sleep: 'PT5S'
    always_retry: true
    steps:
      - id: tick
        use: 'base:IntervalTrigger'
        params: { interval: 'PT5S' }
      - id: fetch
        use: 'base:FetchTicketsPipe'
        injects: { ticket_system: 'otobo_znuny' }
        params:
          ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }

SimpleSequentialRunner (speziell)

Hat zwei Parameter: on und run (beides sind Pipe-Konfigurationen). Wenn on erfolgreich ist, führt es run aus; ansonsten wird es übersprungen. registryKey: base:SimpleSequentialRunner

Flussdiagramm

flowchart TB
    subgraph SimpleSequentialRunner
        ON{"on (trigger) succeded ?"} -->|yes| RUN["run (action)"]
        ON -.->|no| SKIP["skipped"]
    end

Yaml-Beispiel

- id: run-when-triggered
  use: 'base:SimpleSequentialRunner'
  params:
    on:
      id: gate
      use: 'base:IntervalTrigger'
      params: { interval: 'PT60S' }
    run:
      id: do-something
      use: 'base:ExpressionPipe'
      params: { expression: 'Triggered run' }

Kontextweitergabe zwischen Pipes

Felddetails:

  • pipes: Enthält Ergebnisse aller zuvor ausgeführten Pipes, nach Pipe-ID geordnet

    • Zugriff über pipe_result('pipe_id') in Templates
    • Wird akkumuliert, wenn jede Pipe abgeschlossen ist
    • In CompositePipe: zusammengeführte Ergebnisse aller Kind-Schritte
  • params: Parameter der aktuellen Pipe

    • Wird gesetzt, wenn die Pipe erstellt wird
    • Zugänglich über params.* in Templates
    • Für verschachtelte Pipes kann über parent auf Eltern verwiesen werden
  • parent: Referenz auf Eltern-Parameter

PipeResult-Struktur

Jede Pipe erzeugt ein PipeResult, das Ausführungsstatus und Daten enthält:

AttributDatentypBeschreibung
succeededtrue/falseOb die Pipe erfolgreich ohne Fehler abgeschlossen wurde
dataName:Wert-PaareAusgabedaten, die von der Pipe für nachfolgende Pipes oder externe Systeme erzeugt wurden
was_skippedtrue/falseOb die Pipe aufgrund fehlgeschlagener Abhängigkeiten oder bedingter Ausführung übersprungen wurde
messageTEXTMenschenlesbare Nachricht, die das Ergebnis oder Probleme beschreibt

Auf diese Ergebnisse greift man in den Parametern einer anderen Pipe mit diesen Funktionen zu. Derzeit gibt es keine Möglichkeit, was_skipped oder message zu lesen;

FunktionParameterRückgabewertFehler wenn…
has_failedpipe_id: textTrue, wenn das gegebene Pipe-Ergebnis als fehlgeschlagen markiert istUnbekannte Pipe-ID
get_pipe_resultpipe_id: text, data_key: text; default = valueWert, der im vorherigen Pipe-Ergebnis unter dem gegebenen data_key gespeichert istPipe oder Schlüssel fehlt

Wenn das Piperesult also beispielsweise ist:

- id: ticket_fetcher
  result:
    succeeded: true
    data:
      fetched_tickets:
        - id: 123
          subject: 'Help me!'
        - id: 124
          subject: 'Another ticket'

Um auf die abgerufenen Tickets zuzugreifen, würden Sie verwenden:

{ { get_pipe_result('ticket_fetcher', 'fetched_tickets') } }

Gibt zurück:

- id: 123
  subject: 'Help me!'
- id: 124
  subject: 'Another ticket'

Um auf den Betreff des ersten Tickets zuzugreifen, würden Sie verwenden:

{ { (get_pipe_result('ticket_fetcher', 'fetched_tickets') | first)[ 'subject' ] }

Gibt zurück:

Help me!

Um zu prüfen, ob die ticket_fetcher Pipe fehlgeschlagen ist, würden Sie verwenden:

{ { has_failed('ticket_fetcher') } }

Gibt zurück:

false

Die Erfolgsprüfung kann in Kombination mit fail() verwendet werden, um Guards zu erstellen:

- id: fail_no_tickets
  use: 'base:ExpressionPipe'
  params:
    expression: >
      {{ fail() if has_failed('ticket_fetcher' else 'ok' }}

Die Pipeline/der Flow würde an dieser Stelle die Ausführung stoppen, wenn die ticket_fetcher Pipe fehlgeschlagen ist.

Pipe-Registry-Übersicht

NameregistryKeyParameter (Namen, kommagetrennt)
AddNotePipebase:AddNotePipeticket_id, note
FetchTicketsPipebase:FetchTicketsPipeticket_search_criteria
UpdateTicketPipebase:UpdateTicketPipeticket_id, updated_ticket
ClassificationPipebase:ClassificationPipetext, model_name, api_token
ExpressionPipebase:ExpressionPipeexpression
IntervalTriggerbase:IntervalTriggerinterval
SimpleSequentialRunnerbase:SimpleSequentialRunneron, run
SimpleSequentialOrchestratorbase:SimpleSequentialOrchestratororchestrator_sleep, exception_sleep, always_retry, steps
CompositePipebase:CompositePipesteps

Verwenden Sie diese Werte in Ihrer YAML als: use: "<registryKey>".


Pipe-Details

AddNotePipe

Die Pipe AddNotePipe fügt einem gegebenen Ticket im Ticket-System eine strukturierte Notiz (Betreff/Text) hinzu. Sie protokolliert eine kurze Vorschau der Notiz und schreibt sie über den Ticket-System-Service.

registryKey: base:AddNotePipe Um sie zu “verwenden”, setzen Sie use auf diesen registryKey.

- id: 'my-add_note_pipe'
  use: 'base:AddNotePipe'
  params:
    ticket_id: '<ticket id>'
    note:
      subject: '<subject>'
      body: '<body>'

FetchTicketsPipe

Die Pipe FetchTicketsPipe fragt Tickets vom Ticket-System unter Verwendung eines vereinheitlichten TicketSearchCriteria (Queue, Limit, Offset, etc.) ab und gibt sie als fetched_tickets zurück.

registryKey: base:FetchTicketsPipe

- id: 'my-fetch_tickets'
  use: 'base:FetchTicketsPipe'
  params:
    ticket_search_criteria:
      queue:
        name: '<QueueName>'
      limit: 10

UpdateTicketPipe

Die Pipe UpdateTicketPipe aktualisiert ein bestehendes Ticket anhand der ID unter Verwendung einer vereinheitlichten UnifiedTicket- Nutzlast (z.B. Queue, Priorität, Felder). Gibt den Erfolgsstatus vom Ticket-System zurück.

registryKey: base:UpdateTicketPipe

- id: 'my-update_ticket'
  use: 'base:UpdateTicketPipe'
  params:
    ticket_id: '<ticket id>'
    updated_ticket:
      queue:
        name: '<QueueName>'

ClassificationPipe

Die Pipe ClassificationPipe klassifiziert Eingabetext unter Verwendung eines konfigurierten ClassificationService und Modells und gibt Label, Konfidenz und Scores (falls verfügbar) zurück.

registryKey: base:ClassificationPipe

- id: 'my-classify'
  use: 'base:ClassificationPipe'
  injects: { classification_service: 'hf_local' }
  params:
    text: '{{ some_text }}'
    model_name: 'softoft/otai-queue-de-bert-v1'
    api_token: '{{ env.HF_TOKEN | default(None) }}'

ExpressionPipe

Die Pipe ExpressionPipe gibt den Wert von expression zurück; wenn er zu einem FailMarker ausgewertet wird, schlägt die Pipe fehl (nützlich für Kontrollfluss/Guards).

registryKey: base:ExpressionPipe

- id: 'my-expression'
  use: 'base:ExpressionPipe'
  params:
    expression: "{{ fail() if condition else 'ok' }}"

IntervalTrigger

Die Pipe IntervalTrigger steuert die nachgelagerte Ausführung basierend auf einem Zeitintervall; sie ist erfolgreich, sobald das Intervall seit dem letzten Trigger vergangen ist, andernfalls schlägt sie fehl.

registryKey: base:IntervalTrigger

- id: 'my-interval'
  use: 'base:IntervalTrigger'
  params:
    interval: 'PT60S'

SimpleSequentialRunner

Die Pipe SimpleSequentialRunner führt on aus und, nur wenn es erfolgreich ist, führt es run aus. Es ist ein minimaler Zwei-Schritt-Kontroll-Runner.

registryKey: base:SimpleSequentialRunner

- id: 'my-simple_runner'
  use: 'base:SimpleSequentialRunner'
  params:
    on:
      id: 'interval'
      use: 'base:IntervalTrigger'
      params: { interval: 'PT5M' }
    run:
      id: 'do-something'
      use: 'base:ExpressionPipe'
      params: { expression: 'go!' }

SimpleSequentialOrchestrator

Die Pipe SimpleSequentialOrchestrator läuft endlos in einer Schleife und führt steps in jeder Runde der Reihe nach aus; er schläft optional zwischen den Zyklen und bei Ausnahmen und kann automatisch wiederholen.

registryKey: base:SimpleSequentialOrchestrator Für SimpleSequentialOrchestrator müssen Sie keine Parameter setzen, da alle Attribute optional sind.

- id: 'my-orchestrator'
  use: 'base:SimpleSequentialOrchestrator'
  params:
    orchestrator_sleep: 'PT0.5S'
    exception_sleep: 'PT5S'
    always_retry: true
    steps:
      - id: 'step-1'
        use: 'base:ExpressionPipe'
        params: { expression: 'ok' }

CompositePipe

Die Pipe CompositePipe führt ihre steps sequentiell aus, stoppt beim ersten Fehler und gibt eine Vereinigung der vorherigen Ergebnisse zurück. Sie ist der grundlegende Baustein für mehrstufige Flows.

registryKey: base:CompositePipe Für CompositePipe müssen Sie keine Parameter setzen, da alle Attribute optional sind.

- id: 'my-composite'
  use: 'base:CompositePipe'
  params:
    steps:
      - id: 'one'
        use: 'base:ExpressionPipe'
        params: { expression: 'first' }
      - id: 'two'
        use: 'base:ExpressionPipe'
        params: { expression: 'second' }