// Package actor implements the Actor model for the Orca framework. // // An Agent is an independent goroutine that communicates via channels. // Each agent has a state machine: Idle -> Processing -> [ToolCall] -> // WaitingForTool -> Processing -> Completed. package actor import ( "context" "fmt" "sync" "sync/atomic" "github.com/orca/orca/pkg/bus" ) // ActorStatus represents the current state of an agent in its state machine. type ActorStatus int const ( // StatusIdle indicates the agent is ready to accept messages. StatusIdle ActorStatus = iota // StatusProcessing indicates the agent is actively handling a message. StatusProcessing // StatusWaitingForTool indicates the agent has called a tool and is awaiting its result. StatusWaitingForTool // StatusCompleted indicates the agent has finished processing the last message. StatusCompleted // StatusStopped indicates the agent has been shut down. StatusStopped ) // String returns the human-readable name of the actor status. func (s ActorStatus) String() string { switch s { case StatusIdle: return "idle" case StatusProcessing: return "processing" case StatusWaitingForTool: return "waiting_for_tool" case StatusCompleted: return "completed" case StatusStopped: return "stopped" default: return "unknown" } } // Agent is the interface that all actors in the Orca framework must implement. // // Each Agent runs as an independent goroutine processing messages // through an internal channel. The Process method provides a synchronous // API to submit messages and await responses. type Agent interface { // ID returns the unique identifier for this agent. ID() string // Role returns the role/type of this agent (e.g., "orchestrator", "worker"). Role() string // Process sends a message to this agent and waits for a response. // This is a synchronous call; the agent's goroutine handles the message. Process(ctx context.Context, msg bus.Message) (bus.Message, error) // Stop gracefully shuts down this agent, waiting for in-flight processing to complete. Stop() error } // agentRequest wraps a message and provides a response channel. type agentRequest struct { ctx context.Context msg bus.Message resp chan agentResponse } // agentResponse wraps the result of processing a message. type agentResponse struct { msg bus.Message err error } // BaseAgent provides shared infrastructure for all agent implementations. // // It manages the message channel, goroutine lifecycle, and status tracking. // Concrete agents should embed BaseAgent and set a handler via SetHandler. type BaseAgent struct { id string role string msgCh chan agentRequest stopCh chan struct{} status atomic.Value wg sync.WaitGroup mu sync.Mutex started bool handler func(context.Context, bus.Message) (bus.Message, error) } // NewBaseAgent creates a new BaseAgent with the given id and role. // The agent is not started until Start() is called and a handler is set. func NewBaseAgent(id, role string) *BaseAgent { a := &BaseAgent{ id: id, role: role, msgCh: make(chan agentRequest, 64), stopCh: make(chan struct{}), } a.status.Store(StatusIdle) return a } // ID returns the agent's unique identifier. func (a *BaseAgent) ID() string { return a.id } // Role returns the agent's role. func (a *BaseAgent) Role() string { return a.role } // Status returns the current ActorStatus of this agent. func (a *BaseAgent) Status() ActorStatus { s, _ := a.status.Load().(ActorStatus) return s } // setStatus atomically updates the agent's status. func (a *BaseAgent) setStatus(s ActorStatus) { a.status.Store(s) } // SetHandler sets the message handler function for this agent. // Must be called before Start(). func (a *BaseAgent) SetHandler(handler func(context.Context, bus.Message) (bus.Message, error)) { a.mu.Lock() defer a.mu.Unlock() a.handler = handler } // Start launches the agent's message processing goroutine. // The handler must be set before calling Start. func (a *BaseAgent) Start() error { a.mu.Lock() defer a.mu.Unlock() if a.started { return fmt.Errorf("agent %s is already started", a.id) } if a.handler == nil { return fmt.Errorf("agent %s has no handler set", a.id) } a.started = true a.status.Store(StatusIdle) a.wg.Add(1) go a.loop() return nil } // loop is the main goroutine that reads messages from msgCh and processes them. func (a *BaseAgent) loop() { defer a.wg.Done() for { select { case req := <-a.msgCh: a.setStatus(StatusProcessing) resp, err := a.handler(req.ctx, req.msg) if err != nil { a.setStatus(StatusIdle) } else { a.setStatus(StatusCompleted) } req.resp <- agentResponse{msg: resp, err: err} case <-a.stopCh: a.setStatus(StatusStopped) return } } } // Process sends a message to the agent's processing loop and waits for a response. // It respects context cancellation and the agent's stop signal. func (a *BaseAgent) Process(ctx context.Context, msg bus.Message) (bus.Message, error) { respCh := make(chan agentResponse, 1) select { case a.msgCh <- agentRequest{ctx: ctx, msg: msg, resp: respCh}: case <-ctx.Done(): return bus.Message{}, ctx.Err() case <-a.stopCh: return bus.Message{}, fmt.Errorf("agent %s is stopped", a.id) } select { case r := <-respCh: return r.msg, r.err case <-ctx.Done(): return bus.Message{}, ctx.Err() } } // Stop gracefully shuts down the agent. // It signals the processing loop to exit and waits for it to finish. func (a *BaseAgent) Stop() error { a.mu.Lock() started := a.started a.started = false a.mu.Unlock() if !started { return nil } close(a.stopCh) a.wg.Wait() return nil } // IsStarted returns whether the agent's processing loop is running. func (a *BaseAgent) IsStarted() bool { a.mu.Lock() defer a.mu.Unlock() return a.started }