Cleo
Repository
  • Getting Started
    • System Requirements & Dependencies
    • Project Structure & Key Components
  • Agent Creation
  • Memory & Storage
  • Running Cleo
    • Server & Container Environments
    • Local Deployment
    • Scheduled & Background Execution
  • Integration with External APIs
  • Data Storage and Management
  • Task Scheduling and Execution
  • Logging and Monitoring
  • Advanced Agent Configuration and Customization
Powered by GitBook
On this page
  • Task Scheduling Basics
  • Task Dependencies
  • Task Retries and Error Handling
  • Scaling Task Execution

Task Scheduling and Execution

In Cleo, tasks represent specific actions that agents must perform to achieve their objectives. The task scheduler is responsible for managing these tasks, ensuring they are executed at the right time, and providing the agents with the context needed to carry out their duties. Cleo supports flexible task execution, from simple one-off tasks to more complex workflows with dependencies and retries.

This guide provides an overview of how task scheduling and execution work in Cleo, covering basic task management, task dependencies, and best practices for scalability and reliability.


Task Scheduling Basics

Cleo supports a variety of scheduling mechanisms that allow you to define when and how tasks should be executed. You can schedule tasks to run immediately, after a delay, or at a recurring interval. The scheduler also ensures that tasks are executed in the correct order, and it can manage concurrent executions where necessary.

1. Immediate Task Execution

For tasks that need to be executed right away, Cleo provides a straightforward method to initiate task execution. Tasks are triggered as soon as they are added to the task queue.

Example: Executing a Task Immediately

pythonCopyEditdef execute_task(agent, task):
    try:
        task_result = task.run(agent)
        return task_result
    except Exception as e:
        return f"Error executing task: {e}"

In this example, the task is executed immediately upon being added to the queue, and any exceptions that occur are caught and returned as an error message.

2. Delayed Task Execution

For tasks that need to be delayed or scheduled for a later time, you can use a timer or a scheduling library like APScheduler or Celery to add time-based delays.

Example: Delayed Task Execution

pythonCopyEditfrom apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

def delayed_task(agent, task, delay_seconds):
    scheduler.add_job(task.run, 'interval', seconds=delay_seconds, args=[agent])
    scheduler.start()

def task_function(agent):
    print(f"Task executed for agent {agent.id}")

# Example usage: Run task after 10 seconds
delayed_task(agent, task_function, 10)

Here, the task is scheduled to run after a delay of 10 seconds. The APScheduler library helps manage the delayed execution.


Task Dependencies

In some cases, tasks need to be executed in a specific order or have dependencies on other tasks. Cleo provides a flexible mechanism for managing task dependencies, allowing you to chain tasks and ensure that they are executed only when the required preconditions are met.

1. Simple Task Dependencies

You can define task dependencies by making the execution of one task contingent on the completion of another.

Example: Dependent Task Execution

pythonCopyEditdef task_a(agent):
    print("Task A executed")
    return "Task A completed"

def task_b(agent):
    print("Task B executed after Task A")
    return "Task B completed"

def run_tasks_in_sequence(agent):
    result_a = task_a(agent)
    if result_a:
        return task_b(agent)

In this example, Task B is executed only after Task A completes successfully. You can add more complex dependencies and conditions as needed.

2. Task Dependency Graphs

For more complex workflows, you may need to define a dependency graph where multiple tasks depend on the results of others. You can use libraries like NetworkX or Celery's workflows to build a directed acyclic graph (DAG) of tasks.

Example: Task Dependency Graph

pythonCopyEditimport networkx as nx

# Create a directed graph for task dependencies
task_graph = nx.DiGraph()

# Define tasks and their dependencies
task_graph.add_edge("task_a", "task_b")
task_graph.add_edge("task_b", "task_c")

def execute_task_graph(agent, task_graph):
    for task in nx.topological_sort(task_graph):
        result = globals()[task](agent)
        print(result)

# Example usage: Execute tasks based on dependency graph
execute_task_graph(agent, task_graph)

Here, NetworkX is used to represent the task dependencies as a directed graph. The tasks will be executed in the correct order, ensuring that each task is executed only after its dependencies are satisfied.


Task Retries and Error Handling

When tasks fail due to unexpected errors or timeouts, Cleo allows for automatic retries and provides error handling mechanisms to ensure that agents can recover and continue working.

1. Task Retries

If a task fails, Cleo can be configured to retry the task a certain number of times, with exponential backoff or a fixed delay between retries.

Example: Automatic Task Retry

pythonCopyEditimport time

def retry_task(agent, task, retries=3, delay_seconds=5):
    for attempt in range(retries):
        try:
            task_result = task.run(agent)
            return task_result
        except Exception as e:
            print(f"Task failed on attempt {attempt + 1}, retrying...")
            time.sleep(delay_seconds)
    return "Task failed after multiple attempts"

In this example, if the task fails, it will be retried up to three times, with a 5-second delay between each attempt.

2. Task Error Handling

Cleo provides customizable error handling mechanisms to catch specific exceptions and take appropriate actions, such as logging the error or triggering fallback tasks.

Example: Error Handling

pythonCopyEditdef handle_task_error(agent, task, error):
    # Log the error or notify relevant parties
    print(f"Error in task {task}: {error}")
    
    # Trigger a fallback task if needed
    fallback_task(agent)

def run_task_with_error_handling(agent, task):
    try:
        task_result = task.run(agent)
        return task_result
    except Exception as e:
        handle_task_error(agent, task, e)
        return f"Error: {e}"

Here, a fallback task is triggered if an error occurs during task execution. You can extend this approach by notifying users or logging the error for later investigation.


Scaling Task Execution

For large systems that require the execution of thousands or even millions of tasks, Cleo supports the parallel execution of tasks, load balancing, and distributed task management to ensure performance at scale.

1. Parallel Task Execution

Cleo supports running multiple tasks concurrently, either within a single agent or across multiple agents. Libraries like Celery or Dask can be used to manage the distribution of tasks across workers.

Example: Parallel Execution with Threading

pythonCopyEditimport threading

def execute_task_in_parallel(agent, task):
    thread = threading.Thread(target=task.run, args=(agent,))
    thread.start()
    thread.join()

This example demonstrates how to execute tasks in parallel using Python's threading module. For distributed systems, more advanced tools like Celery can be used to distribute tasks across multiple nodes.

2. Distributed Task Management

For cloud-based applications or systems requiring high availability, Cleo can integrate with distributed task queues like Celery, Kubernetes, or Apache Kafka to ensure that tasks are managed and executed efficiently across multiple systems.

PreviousData Storage and ManagementNextLogging and Monitoring

Last updated 1 month ago