Zum Inhalt springen

Pipe System

Dieser Inhalt ist noch nicht in deiner Sprache verfügbar.

Pipes are the fundamental processing units in Open Ticket AI. Each pipe performs a specific task, receives context from previous pipes, executes its logic, and passes updated context forward.

A pipeline is a sequence of pipes that execute one after another:

Each pipe:

  1. Receives the PipeContext (containing results from previous pipes)
  2. Executes its specific task
  3. Creates a PipeResult with output data

A pipe is a self-contained processing unit that:

  • Implements specific business logic (fetch data, classify, update, etc.)
  • Receives input via PipeContext
  • Produces output as PipeResult

Atomic processing units that implement specific business logic:

- id: fetch_tickets
use: open_ticket_ai.base:FetchTicketsPipe
injects:
ticket_system: 'otobo_znuny'
params:
search_criteria:
queue:
name: 'Support'
limit: 10
  • Runs specific logic
  • No child pipes
  • AddNotePipe — registryKey: base:AddNotePipe
  • FetchTicketsPipe — registryKey: base:FetchTicketsPipe
  • UpdateTicketPipe — registryKey: base:UpdateTicketPipe

Renders an expression and returns that value. If it renders to a FailMarker, the pipe fails. registryKey: base:ExpressionPipe

- id: check_any_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if (get_pipe_result('fetch_tickets','fetched_tickets')|length)==0 else 'ok' }}

Orchestrators that contain and execute child pipes:

Composite Pipe Example
- id: ticket_workflow
use: open_ticket_ai.base:CompositePipe
params:
threshold: 0.8
steps:
- id: fetch
use: open_ticket_ai.base:FetchTicketsPipe
injects: { ticket_system: 'otobo_znuny' }
params:
search_criteria:
queue: { name: 'Incoming' }
limit: 10
- id: classify
use: otai_hf_local:HFLocalTextClassificationPipe
params:
model: 'bert-base-german-cased'
text: "{{ get_pipe_result('fetch').data.fetched_tickets[0].subject }}"
- id: update
use: open_ticket_ai.base:UpdateTicketPipe
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_id: "{{ get_pipe_result('fetch').data.fetched_tickets[0].id }}"
updated_ticket:
queue:
name: "{{ get_pipe_result('classify', 'predicted_queue') }}"
  • Contains steps list of child pipe configs
  • Executes children sequentially
  • Merges results
  • Children can access parent params via parent.params

Composite Execution:

  1. For Each Step:
    • Render: Renders Params with Jinja2 using current context
    • Execute: Execute next child pipe with rendered params
    • Loop: Continue to next step
  2. Finalization:
    • Union: Merge all child results using
    • Return: Return final updated context

Runs its steps in an endless loop. It’s for background-style cycles. It does not expose the child pipes’ results as a single pipe result. registryKey: base:SimpleSequentialOrchestrator

SimpleSequentialOrchestrator Example
- id: orchestrator
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: tick
use: 'base:IntervalTrigger'
params: { interval: 'PT5S' }
- id: fetch
use: 'base:FetchTicketsPipe'
injects: { ticket_system: 'otobo_znuny' }
params:
ticket_search_criteria: { queue: { name: 'Incoming' }, limit: 1 }

Has two params: on and run (both are pipe configs). If on succeeds, it executes run; otherwise it skips. registryKey: base:SimpleSequentialRunner

- id: run-when-triggered
use: 'base:SimpleSequentialRunner'
params:
on:
id: gate
use: 'base:IntervalTrigger'
params: { interval: 'PT60S' }
run:
id: do-something
use: 'base:ExpressionPipe'
params: { expression: 'Triggered run' }

Field Details:

  • pipes: Contains results from all previously executed pipes, keyed by pipe ID

    • Access via pipe_result('pipe_id') in templates
    • Accumulated as each pipe completes
    • In CompositePipe: merged results from all child steps
  • params: Current pipe’s parameters

    • Set when the pipe is created
    • Accessible via params.* in templates
    • For nested pipes, can reference parent via parent
  • parent: Reference to parent params

Each pipe produces a PipeResult containing execution outcome and data:

AttributeData TypeDescription
succeededtrue/falseWhether the pipe completed successfully without errors
dataname:value pairsOutput data produced by the pipe for use by following pipes or external systems
was_skippedtrue/falseWhether the pipe was skipped due to failed dependencies or conditional execution
messageTEXTHuman-readable message describing the result or any issues

you access those results in the params of another pipe with those functions. Currenly there is no way to read was_skipped or message;

FunctionParametersReturnsErrors if…
has_failedpipe_id: textTrue if the given pipe result is marked failedUnknown pipe ID
get_pipe_resultpipe_id: text, data_key: text; default = valueValue stored in previous pipe result under given data_keyPipe or key missing

So for example if piperesult is

- id: ticket_fetcher
result:
succeeded: true
data:
fetched_tickets:
- id: 123
subject: 'Help me!'
- id: 124
subject: 'Another ticket'

To access the fetched tickets you would use:

{ { get_pipe_result('ticket_fetcher', 'fetched_tickets') } }

Returning:

- id: 123
subject: 'Help me!'
- id: 124
subject: 'Another ticket'

To access the subject of the first ticket you would use:

{ { (get_pipe_result('ticket_fetcher', 'fetched_tickets') | first)[ 'subject' ] }

Returning:

Help me!

To check if the ticket_fetcher pipe failed you would use:

{ { has_failed('ticket_fetcher') } }

Returning:

false

Checking the success can be used in combination with fail() to create guards:

- id: fail_no_tickets
use: 'base:ExpressionPipe'
params:
expression: >
{{ fail() if has_failed('ticket_fetcher' else 'ok' }}

The Pipeline/FLow would stop executing at this point if the ticket_fetcher pipe failed.

NameregistryKeyParams (names, comma-separated)
AddNotePipebase:AddNotePipeticket_id, note
FetchTicketsPipebase:FetchTicketsPipeticket_search_criteria
UpdateTicketPipebase:UpdateTicketPipeticket_id, updated_ticket
ClassificationPipebase:ClassificationPipetext, model_name, api_token
ExpressionPipebase:ExpressionPipeexpression
IntervalTriggerbase:IntervalTriggerinterval
SimpleSequentialRunnerbase:SimpleSequentialRunneron, run
SimpleSequentialOrchestratorbase:SimpleSequentialOrchestratororchestrator_sleep, exception_sleep, always_retry, steps
CompositePipebase:CompositePipesteps

Use these values in your YAML as: use: "<registryKey>".


The Pipe AddNotePipe adds a structured note (subject/body) to a given ticket in the ticket system. It logs a short preview of the note and writes it via the ticket system service.

registryKey: base:AddNotePipe So to “use” it set use to this registryKey.

- id: 'my-add_note_pipe'
use: 'base:AddNotePipe'
params:
ticket_id: '<ticket id>'
note:
subject: '<subject>'
body: '<body>'

The Pipe FetchTicketsPipe queries tickets from the ticket system using a unified TicketSearchCriteria (queue, limit, offset, etc.) and returns them as fetched_tickets.

registryKey: base:FetchTicketsPipe

- id: 'my-fetch_tickets'
use: 'base:FetchTicketsPipe'
params:
ticket_search_criteria:
queue:
name: '<QueueName>'
limit: 10

The Pipe UpdateTicketPipe updates an existing ticket by ID using a unified UnifiedTicket payload (e.g., queue, priority, fields). Returns success state from the ticket system.

registryKey: base:UpdateTicketPipe

- id: 'my-update_ticket'
use: 'base:UpdateTicketPipe'
params:
ticket_id: '<ticket id>'
updated_ticket:
queue:
name: '<QueueName>'

The Pipe ClassificationPipe classifies input text using a configured ClassificationService and model, returning label, confidence, and scores (if available).

registryKey: base:ClassificationPipe

- id: 'my-classify'
use: 'base:ClassificationPipe'
injects: { classification_service: 'hf_local' }
params:
text: '{{ some_text }}'
model_name: 'softoft/otai-queue-de-bert-v1'
api_token: '{{ env.HF_TOKEN | default(None) }}'

The Pipe ExpressionPipe returns the value of expression; if it evaluates to a FailMarker, the pipe fails (useful for control-flow/guards).

registryKey: base:ExpressionPipe

- id: 'my-expression'
use: 'base:ExpressionPipe'
params:
expression: "{{ fail() if condition else 'ok' }}"

The Pipe IntervalTrigger gates downstream execution based on a time interval; it succeeds once the interval has passed since the last trigger, otherwise fails.

registryKey: base:IntervalTrigger

- id: 'my-interval'
use: 'base:IntervalTrigger'
params:
interval: 'PT60S'

The Pipe SimpleSequentialRunner runs on and, only if it succeeds, runs run. It’s a minimal two-step control runner.

registryKey: base:SimpleSequentialRunner

- id: 'my-simple_runner'
use: 'base:SimpleSequentialRunner'
params:
on:
id: 'interval'
use: 'base:IntervalTrigger'
params: { interval: 'PT5M' }
run:
id: 'do-something'
use: 'base:ExpressionPipe'
params: { expression: 'go!' }

The Pipe SimpleSequentialOrchestrator loops forever, executing steps in order each cycle; it optionally sleeps between cycles and on exceptions, and can auto-retry.

registryKey: base:SimpleSequentialOrchestrator For SimpleSequentialOrchestrator you don’t need to set params, since all attributes are optional.

- id: 'my-orchestrator'
use: 'base:SimpleSequentialOrchestrator'
params:
orchestrator_sleep: 'PT0.5S'
exception_sleep: 'PT5S'
always_retry: true
steps:
- id: 'step-1'
use: 'base:ExpressionPipe'
params: { expression: 'ok' }

The Pipe CompositePipe executes its steps sequentially, stops on the first failure, and returns a union of prior results. It’s the base building block for multi-step flows.

registryKey: base:CompositePipe For CompositePipe you don’t need to set params, since all attributes are optional.

- id: 'my-composite'
use: 'base:CompositePipe'
params:
steps:
- id: 'one'
use: 'base:ExpressionPipe'
params: { expression: 'first' }
- id: 'two'
use: 'base:ExpressionPipe'
params: { expression: 'second' }