Skip to main content

Queue

Manage background tasks, monitor queue health, and view processing statistics. Chaos Cypher uses a Valkey-backed queue with two processing lanes: LLM (1 concurrent) for AI operations and Operations (8 concurrent) for source processing, exports, and workflows.

Base path: /api/v1/queue


Tasks

Create Task

POST /api/v1/queue/tasks

Queues a new background task for processing.

Request Body

FieldTypeRequiredDefaultDescription
queuestringYes--Queue name (e.g., "operations", "llm")
operationstringYes--Operation name (e.g., "import_ccx", "chat_completion")
dataobjectYes--Operation-specific payload
priorityintegerNo50Task priority (0-100, higher = higher priority — ZPOPMAX)
metadataobjectNo{}Arbitrary metadata for filtering and tracking
{
"queue": "operations",
"operation": "import_ccx",
"data": {
"source_id": "src-uuid-1",
"file_path": "/data/uploads/graph.ccx"
},
"priority": 50,
"metadata": {
"source_id": "src-uuid-1",
"user_initiated": true
}
}

Response

Status: 201 Created

{
"task_id": "task-abc123def456"
}

Errors

StatusDescription
503Queue service unavailable

curl Example

curl -s -X POST http://localhost:8080/api/v1/queue/tasks \
-H "Content-Type: application/json" \
-d '{
"queue": "operations",
"operation": "import_ccx",
"data": {"source_id": "src-uuid-1"},
"priority": 50,
"metadata": {"source_id": "src-uuid-1"}
}'

List Tasks

GET /api/v1/queue/tasks

Returns recent tasks across all queues or filtered by specific queues. Supports pagination.

Query Parameters

ParameterTypeRequiredDefaultDescription
limitintegerNoServer defaultMaximum number of tasks to return (>= 1, capped at server max)
offsetintegerNo0Number of tasks to skip (0-1000)
queuesstringNononeComma-separated queue names to filter by (e.g., "operations,llm")

Response

Status: 200 OK

{
"tasks": [
{
"task_id": "task-abc123def456",
"queue": "operations",
"operation": "import_ccx",
"status": "completed",
"priority": 50,
"data": {"source_id": "src-uuid-1"},
"metadata": {"source_id": "src-uuid-1"},
"attempts": 1,
"created_at": "2026-03-09T14:30:00.000000",
"started_at": "2026-03-09T14:30:01.000000",
"completed_at": "2026-03-09T14:30:15.000000"
},
{
"task_id": "task-xyz789ghi012",
"queue": "llm",
"operation": "chat_completion",
"status": "running",
"priority": 10,
"data": {"chat_id": "chat-uuid-1"},
"metadata": {},
"attempts": 1,
"created_at": "2026-03-09T14:31:00.000000",
"started_at": "2026-03-09T14:31:02.000000",
"completed_at": null
}
],
"total": 2,
"total_in_queue": 5,
"queues": null,
"pagination": {
"limit": 50,
"offset": 0,
"total": 2,
"total_in_queue": 5,
"has_more": false
}
}

curl Example

# List all recent tasks
curl -s http://localhost:8080/api/v1/queue/tasks

# With pagination
curl -s "http://localhost:8080/api/v1/queue/tasks?limit=10&offset=0"

# Filter by queue
curl -s "http://localhost:8080/api/v1/queue/tasks?queues=operations"

# Filter by multiple queues
curl -s "http://localhost:8080/api/v1/queue/tasks?queues=operations,llm"

Get Task

GET /api/v1/queue/tasks/{task_id}

Returns full details for a single task including status, data, attempts, and timestamps.

Path Parameters

ParameterTypeRequiredDescription
task_idstringYesTask ID

Response

Status: 200 OK

Returns a single task object (same schema as the items in List Tasks), plus an error field that contains the error message for failed tasks or null otherwise.

Errors

StatusDescription
404Task not found
503Queue service unavailable

curl Example

curl -s http://localhost:8080/api/v1/queue/tasks/task-abc123def456

Get Task Result

GET /api/v1/queue/tasks/{task_id}/result

Returns the result data for a completed task. Results may expire after a configured retention period.

Path Parameters

ParameterTypeRequiredDescription
task_idstringYesTask ID

Response

Status: 200 OK

{
"result": {
"entities_created": 42,
"relationships_created": 18,
"processing_time_ms": 14320
}
}

Errors

StatusDescription
404Result not found or expired
503Queue service unavailable

curl Example

curl -s http://localhost:8080/api/v1/queue/tasks/task-abc123def456/result

Cancel Task

DELETE /api/v1/queue/tasks/{task_id}

Cancels a single task. Both queued and running tasks can be cancelled. Running tasks are cancelled cooperatively — a cancellation flag is set in Valkey that the worker checks between processing batches.

Path Parameters

ParameterTypeRequiredDescription
task_idstringYesTask ID

Response

Status: 200 OK

{
"status": "cancelled"
}

Errors

StatusDescription
400Task cannot be cancelled (already completed or failed)
404Task not found
503Queue service unavailable

curl Example

curl -s -X DELETE http://localhost:8080/api/v1/queue/tasks/task-abc123def456

Retry Task

POST /api/v1/queue/tasks/{task_id}/retry

Re-enqueues a failed task with the same parameters. Creates a new task with a new ID.

Path Parameters

ParameterTypeRequiredDescription
task_idstringYesTask ID of the failed task

Response

Status: 200 OK

{
"new_task_id": "task-new789xyz012",
"original_task_id": "task-abc123def456"
}

Errors

StatusDescription
400Task is not in failed status
404Task not found
503Queue service unavailable

curl Example

curl -s -X POST http://localhost:8080/api/v1/queue/tasks/task-abc123def456/retry

Cancel All Tasks

DELETE /api/v1/queue/tasks

Cancels all active (queued + running) tasks. Optionally filtered by queue name.

Query Parameters

ParameterTypeRequiredDefaultDescription
queuestringNononeQueue name filter. Omit to cancel across all queues.

Response

Status: 200 OK

{
"cancelled": 12,
"queue": null
}

With queue filter:

{
"cancelled": 3,
"queue": "operations"
}

Errors

StatusDescription
503Queue service unavailable
warning

This permanently cancels all active tasks. Use with caution.

curl Example

# Cancel all tasks across all queues
curl -s -X DELETE http://localhost:8080/api/v1/queue/tasks

# Cancel all tasks in a specific queue
curl -s -X DELETE "http://localhost:8080/api/v1/queue/tasks?queue=operations"

Cancel Tasks (Batch or by Metadata)

POST /api/v1/queue/tasks/cancel

Cancels tasks using one of two modes: batch (by task IDs) or metadata (by matching key-value pairs). Batch mode is preferred for performance.

Request Body

FieldTypeRequiredDescription
task_idsstring[]NoList of task IDs to cancel (batch mode)
metadataobjectNoMetadata key-value pairs to match (metadata mode)
queuestringNoQueue name filter (metadata mode only)
note

You must provide either task_ids or metadata, but not both. Batch mode is preferred to avoid SCAN overhead.

Batch mode request:

{
"task_ids": ["task-abc123", "task-def456", "task-ghi789"]
}

Metadata mode request:

{
"metadata": {"source_id": "src-uuid-1"},
"queue": "operations"
}

Response (Batch Mode)

Status: 200 OK

{
"cancelled_count": 2,
"requested_count": 3,
"failed": [
{
"task_id": "task-ghi789",
"reason": "Task is currently running"
}
]
}

Response (Metadata Mode)

Status: 200 OK

{
"cancelled": 4
}

Errors

StatusDescription
400Must provide either task_ids or metadata
503Queue service unavailable

curl Example

# Batch cancel by task IDs
curl -s -X POST http://localhost:8080/api/v1/queue/tasks/cancel \
-H "Content-Type: application/json" \
-d '{"task_ids": ["task-abc123", "task-def456"]}'

# Cancel by metadata
curl -s -X POST http://localhost:8080/api/v1/queue/tasks/cancel \
-H "Content-Type: application/json" \
-d '{
"metadata": {"source_id": "src-uuid-1"},
"queue": "operations"
}'

Clear History

DELETE /api/v1/queue/tasks/history

Permanently removes completed, failed, and cancelled tasks from history. Does not affect queued or running tasks.

Query Parameters

ParameterTypeRequiredDefaultDescription
queuestringNononeQueue name filter. Omit to clear across all queues.
older_than_hoursintegerNo0Clear only tasks older than this many hours. 0 clears all history. Max: 8760 (1 year).

Response

Status: 200 OK

{
"cleared": 156,
"queue": null
}

With filters:

{
"cleared": 42,
"queue": "operations"
}

Errors

StatusDescription
503Queue service unavailable
warning

This permanently removes task history and cannot be undone.

curl Example

# Clear all task history
curl -s -X DELETE http://localhost:8080/api/v1/queue/tasks/history

# Clear history for a specific queue
curl -s -X DELETE "http://localhost:8080/api/v1/queue/tasks/history?queue=operations"

# Clear tasks older than 24 hours
curl -s -X DELETE "http://localhost:8080/api/v1/queue/tasks/history?older_than_hours=24"

# Combine filters
curl -s -X DELETE "http://localhost:8080/api/v1/queue/tasks/history?queue=llm&older_than_hours=48"

Statistics

Get All Queue Stats

GET /api/v1/queue/stats

Returns statistics for all known queues.

Response

Status: 200 OK

{
"queues": [
{
"queue": "llm",
"queued": 2,
"running": 1,
"completed": 145,
"failed": 3
},
{
"queue": "operations",
"queued": 0,
"running": 4,
"completed": 892,
"failed": 12
}
],
"note": "Queue configuration managed in worker/config.py"
}

If the queue service is unavailable, the response returns an empty list with a note:

{
"queues": [],
"note": "Queue service unavailable"
}

curl Example

curl -s http://localhost:8080/api/v1/queue/stats

Get Per-Queue Stats

GET /api/v1/queue/stats/{queue_name}

Returns statistics for a single queue.

Path Parameters

ParameterTypeRequiredDescription
queue_namestringYesQueue name (e.g., "llm", "operations")

Response

Status: 200 OK

{
"queue": "operations",
"queued": 0,
"running": 4,
"completed": 892,
"failed": 12
}

Errors

StatusDescription
503Queue service unavailable

curl Example

curl -s http://localhost:8080/api/v1/queue/stats/operations

Health

Health Check

GET /api/v1/queue/health

Returns the health status of the queue system, including Valkey connectivity and worker configuration.

Response

Status: 200 OK

{
"status": "healthy",
"enabled": true,
"connected": true,
"system": "valkey",
"note": "Workers run in separate container. See worker/config.py for concurrency settings."
}

When the queue is unavailable:

{
"status": "unavailable",
"enabled": true,
"connected": false,
"system": "valkey",
"note": "Workers run in separate container. See worker/config.py for concurrency settings."
}

curl Example

curl -s http://localhost:8080/api/v1/queue/health

Maintenance

Reconcile Queue

POST /api/v1/queue/reconcile

Triggers an immediate queue reconciliation pass. This self-healing admin endpoint inspects running sets across the specified queue (or all queues if omitted), removes orphan task IDs that have no backing hash, and requeues or fails tasks that were abandoned by crashed workers.

Request Body

FieldTypeRequiredDefaultDescription
queuestringNonullQueue name to reconcile. Omit or set to null to reconcile all queues.
{
"queue": "operations"
}

Response

Status: 200 OK

{
"recovered_orphans": 3,
"recovered_crashed": 1,
"failed_unrecoverable": 0
}
FieldTypeDescription
recovered_orphansintegerTask IDs found in the running set with no backing hash — removed
recovered_crashedintegerTasks abandoned by a crashed worker that were requeued
failed_unrecoverableintegerAbandoned tasks that exhausted retries or had retry_on_crash=false — marked failed

Errors

StatusDescription
503Queue service unavailable

curl Example

# Reconcile all queues
curl -s -X POST http://localhost:8080/api/v1/queue/reconcile \
-H "Content-Type: application/json" \
-d '{}'

# Reconcile a specific queue
curl -s -X POST http://localhost:8080/api/v1/queue/reconcile \
-H "Content-Type: application/json" \
-d '{"queue": "operations"}'

Task Lifecycle

A task progresses through the following statuses:

StatusDescription
queuedTask is waiting to be picked up by a worker
runningTask is currently being processed
completedTask finished successfully (result available)
failedTask encountered an error (can be retried)
cancelledTask was cancelled before completion

Example: Submit and Track a Task

1. Create the task:

curl -s -X POST http://localhost:8080/api/v1/queue/tasks \
-H "Content-Type: application/json" \
-d '{
"queue": "operations",
"operation": "import_ccx",
"data": {"source_id": "src-uuid-1"},
"metadata": {"source_id": "src-uuid-1"}
}'
{"task_id": "task-abc123def456"}

2. Poll for status (returns the full task object -- see Get Task):

curl -s http://localhost:8080/api/v1/queue/tasks/task-abc123def456

3. Retrieve the result once completed (see Get Task Result):

curl -s http://localhost:8080/api/v1/queue/tasks/task-abc123def456/result

4. If the task failed, retry it:

curl -s -X POST http://localhost:8080/api/v1/queue/tasks/task-abc123def456/retry
{
"new_task_id": "task-new789xyz012",
"original_task_id": "task-abc123def456"
}

Response Models Reference

QueueTaskRequest

Request body for creating a new task.

FieldTypeRequiredDefaultDescription
queuestringYes--Target queue name
operationstringYes--Operation to perform
dataobjectYes--Operation-specific payload
priorityintegerNo50Priority (0-100)
metadataobjectNo{}Arbitrary metadata

QueueTaskResponse

Returned when a task is created.

FieldTypeDescription
task_idstringUnique task identifier

TaskListResponse

Returned by the list tasks endpoint.

FieldTypeDescription
tasksobject[]List of task detail objects
totalintegerNumber of tasks in this response
total_in_queueintegerActive tasks across queues (queued + running)
queuesstring[] or nullQueue filter applied, or null if unfiltered
paginationPaginationInfo or nullPagination metadata

PaginationInfo

Pagination metadata included in list responses.

FieldTypeDescription
limitintegerItems per page
offsetintegerCurrent offset
totalintegerTotal items in the list
total_in_queueintegerActive tasks (queued + running)
has_morebooleanWhether more pages exist

TaskResultResponse

Returned by the get result endpoint.

FieldTypeDescription
resultanyTask result data (structure varies by operation)

CancelTaskResponse

Returned when a single task is cancelled.

FieldTypeDescription
statusstringCancellation status (e.g., "cancelled")

RetryTaskResponse

Returned when a failed task is retried.

FieldTypeDescription
new_task_idstringID of the newly created task
original_task_idstringID of the original failed task

CancelAllResponse

Returned when all tasks are cancelled.

FieldTypeDescription
cancelledintegerNumber of tasks cancelled
queuestring or nullQueue filter applied, or null if all queues

CancelBatchResponse

Returned by batch cancel (via task IDs).

FieldTypeDescription
cancelled_countintegerNumber of tasks successfully cancelled
requested_countintegerNumber of task IDs requested
failedobject[]List of tasks that could not be cancelled (with task_id and reason)

CancelByMetadataResponse

Returned by metadata-based cancel.

FieldTypeDescription
cancelledintegerNumber of tasks cancelled

ClearHistoryResponse

Returned when task history is cleared.

FieldTypeDescription
clearedintegerNumber of history entries removed
queuestring or nullQueue filter applied, or null if all queues

QueueStatsResponse

Returned by the all-queues stats endpoint.

FieldTypeDescription
queuesobject[]List of per-queue statistics
notestring or nullInformational note about configuration

QueueHealthResponse

Returned by the health check endpoint.

FieldTypeDescription
statusstringHealth status ("healthy" or "unavailable")
enabledbooleanWhether the queue system is enabled
connectedbooleanWhether Valkey is connected
systemstringQueue backend system (e.g., "valkey")
notestring or nullInformational note about worker configuration