package actor import ( "context" "fmt" "sync" "github.com/orca/orca/pkg/bus" ) // Orchestrator is an agent that coordinates task execution across a pool of workers. // // It receives task requests, delegates them to available workers, and // collects responses. The orchestrator maintains a registry of worker // agents and can dynamically add or remove them. type Orchestrator struct { *BaseAgent workers map[string]Agent bus bus.MessageBus mu sync.RWMutex } // NewOrchestrator creates a new Orchestrator agent with the given id and message bus. // The agent is started automatically upon creation. func NewOrchestrator(id string, mb bus.MessageBus) *Orchestrator { o := &Orchestrator{ BaseAgent: NewBaseAgent(id, "orchestrator"), workers: make(map[string]Agent), bus: mb, } o.SetHandler(o.handleMessage) // Start the agent's processing loop if err := o.Start(); err != nil { // This should not happen since handler is set above panic(fmt.Sprintf("orchestrator: failed to start: %v", err)) } return o } // handleMessage routes incoming messages to the appropriate handler. func (o *Orchestrator) handleMessage(ctx context.Context, msg bus.Message) (bus.Message, error) { switch msg.Type { case bus.MsgTypeTaskRequest: return o.handleTask(ctx, msg) case bus.MsgTypeSystem: return o.handleSystem(ctx, msg) default: return bus.Message{}, fmt.Errorf("orchestrator %s: unsupported message type %s", o.ID(), msg.Type) } } // handleTask processes a task request by delegating to an available worker. func (o *Orchestrator) handleTask(ctx context.Context, msg bus.Message) (bus.Message, error) { o.mu.RLock() defer o.mu.RUnlock() if len(o.workers) == 0 { return bus.Message{}, fmt.Errorf("orchestrator %s: no workers available", o.ID()) } // Simple round-robin: pick the first available worker for _, w := range o.workers { return w.Process(ctx, msg) } return bus.Message{}, fmt.Errorf("orchestrator %s: no workers available", o.ID()) } // handleSystem processes internal system messages. func (o *Orchestrator) handleSystem(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ ID: msg.ID + "-ack", Type: bus.MsgTypeSystem, From: o.ID(), To: msg.From, Content: "orchestrator acknowledged", }, nil } // AddWorker registers a worker agent with this orchestrator. func (o *Orchestrator) AddWorker(w Agent) { o.mu.Lock() defer o.mu.Unlock() o.workers[w.ID()] = w } // RemoveWorker unregisters a worker agent from this orchestrator. func (o *Orchestrator) RemoveWorker(id string) { o.mu.Lock() defer o.mu.Unlock() delete(o.workers, id) } // WorkerCount returns the number of registered workers. func (o *Orchestrator) WorkerCount() int { o.mu.RLock() defer o.mu.RUnlock() return len(o.workers) } // GetWorker retrieves a registered worker by ID. func (o *Orchestrator) GetWorker(id string) (Agent, bool) { o.mu.RLock() defer o.mu.RUnlock() w, ok := o.workers[id] return w, ok } // ListWorkers returns all registered workers. func (o *Orchestrator) ListWorkers() []Agent { o.mu.RLock() defer o.mu.RUnlock() workers := make([]Agent, 0, len(o.workers)) for _, w := range o.workers { workers = append(workers, w) } return workers } // Bus returns the orchestrator's message bus reference. func (o *Orchestrator) Bus() bus.MessageBus { return o.bus }