- Add bubbletea/lipgloss/glamour dependencies for TUI - Create internal/tui package with EventWriter, styles, and bubbletea Model - Support streaming output display in conversation window - Add right panel with statistics and active agent status - Implement multi-agent collaboration with sub-agents - Add AgentCallTool for delegating tasks to sub-agents - Support parallel tool execution with goroutines - Auto-discover sub-agents from ~/.orca/prompts/ directory - Fix orchestrator routing based on msg.To field - Add non-blocking event writer with timeout to prevent blocking
170 lines
4.0 KiB
Go
170 lines
4.0 KiB
Go
package actor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/orca/orca/pkg/bus"
|
|
)
|
|
|
|
// Orchestrator is an agent that coordinates task execution across a pool of workers.
|
|
//
|
|
// It receives task requests, delegates them to available workers, and
|
|
// collects responses. The orchestrator maintains a registry of worker
|
|
// agents and can dynamically add or remove them.
|
|
type Orchestrator struct {
|
|
*BaseAgent
|
|
workers map[string]Agent
|
|
defaultWorker Agent
|
|
bus bus.MessageBus
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// NewOrchestrator creates a new Orchestrator agent with the given id and message bus.
|
|
// The agent is started automatically upon creation.
|
|
func NewOrchestrator(id string, mb bus.MessageBus) *Orchestrator {
|
|
o := &Orchestrator{
|
|
BaseAgent: NewBaseAgent(id, "orchestrator"),
|
|
workers: make(map[string]Agent),
|
|
bus: mb,
|
|
}
|
|
o.SetHandler(o.handleMessage)
|
|
// Start the agent's processing loop
|
|
if err := o.Start(); err != nil {
|
|
// This should not happen since handler is set above
|
|
panic(fmt.Sprintf("orchestrator: failed to start: %v", err))
|
|
}
|
|
return o
|
|
}
|
|
|
|
// handleMessage routes incoming messages to the appropriate handler.
|
|
func (o *Orchestrator) handleMessage(ctx context.Context, msg bus.Message) (bus.Message, error) {
|
|
switch msg.Type {
|
|
case bus.MsgTypeTaskRequest:
|
|
return o.handleTask(ctx, msg)
|
|
case bus.MsgTypeSystem:
|
|
return o.handleSystem(ctx, msg)
|
|
default:
|
|
return bus.Message{}, fmt.Errorf("orchestrator %s: unsupported message type %s", o.ID(), msg.Type)
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) handleTask(ctx context.Context, msg bus.Message) (bus.Message, error) {
|
|
o.mu.RLock()
|
|
defer o.mu.RUnlock()
|
|
|
|
if len(o.workers) == 0 {
|
|
return bus.Message{}, fmt.Errorf("orchestrator %s: no workers available", o.ID())
|
|
}
|
|
|
|
if msg.To != "" {
|
|
if w, ok := o.workers[msg.To]; ok {
|
|
return w.Process(ctx, msg)
|
|
}
|
|
for _, w := range o.workers {
|
|
if w.Role() == msg.To || containsIgnoreCase(w.Role(), msg.To) {
|
|
return w.Process(ctx, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
if o.defaultWorker != nil {
|
|
return o.defaultWorker.Process(ctx, msg)
|
|
}
|
|
|
|
for _, w := range o.workers {
|
|
return w.Process(ctx, msg)
|
|
}
|
|
|
|
return bus.Message{}, fmt.Errorf("orchestrator %s: no workers available", o.ID())
|
|
}
|
|
|
|
func containsIgnoreCase(s, substr string) bool {
|
|
if len(substr) > len(s) {
|
|
return false
|
|
}
|
|
for i := 0; i <= len(s)-len(substr); i++ {
|
|
match := true
|
|
for j := 0; j < len(substr); j++ {
|
|
if toLower(s[i+j]) != toLower(substr[j]) {
|
|
match = false
|
|
break
|
|
}
|
|
}
|
|
if match {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func toLower(b byte) byte {
|
|
if b >= 'A' && b <= 'Z' {
|
|
return b + ('a' - 'A')
|
|
}
|
|
return b
|
|
}
|
|
|
|
// handleSystem processes internal system messages.
|
|
func (o *Orchestrator) handleSystem(ctx context.Context, msg bus.Message) (bus.Message, error) {
|
|
return bus.Message{
|
|
ID: msg.ID + "-ack",
|
|
Type: bus.MsgTypeSystem,
|
|
From: o.ID(),
|
|
To: msg.From,
|
|
Content: "orchestrator acknowledged",
|
|
}, nil
|
|
}
|
|
|
|
// AddWorker registers a worker agent with this orchestrator.
|
|
func (o *Orchestrator) AddWorker(w Agent) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
o.workers[w.ID()] = w
|
|
}
|
|
|
|
// RemoveWorker unregisters a worker agent from this orchestrator.
|
|
func (o *Orchestrator) RemoveWorker(id string) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
delete(o.workers, id)
|
|
}
|
|
|
|
func (o *Orchestrator) SetDefaultWorker(w Agent) {
|
|
o.mu.Lock()
|
|
defer o.mu.Unlock()
|
|
o.defaultWorker = w
|
|
}
|
|
|
|
// WorkerCount returns the number of registered workers.
|
|
func (o *Orchestrator) WorkerCount() int {
|
|
o.mu.RLock()
|
|
defer o.mu.RUnlock()
|
|
return len(o.workers)
|
|
}
|
|
|
|
// GetWorker retrieves a registered worker by ID.
|
|
func (o *Orchestrator) GetWorker(id string) (Agent, bool) {
|
|
o.mu.RLock()
|
|
defer o.mu.RUnlock()
|
|
w, ok := o.workers[id]
|
|
return w, ok
|
|
}
|
|
|
|
// ListWorkers returns all registered workers.
|
|
func (o *Orchestrator) ListWorkers() []Agent {
|
|
o.mu.RLock()
|
|
defer o.mu.RUnlock()
|
|
workers := make([]Agent, 0, len(o.workers))
|
|
for _, w := range o.workers {
|
|
workers = append(workers, w)
|
|
}
|
|
return workers
|
|
}
|
|
|
|
// Bus returns the orchestrator's message bus reference.
|
|
func (o *Orchestrator) Bus() bus.MessageBus {
|
|
return o.bus
|
|
}
|