package actor import ( "context" "errors" "sync/atomic" "testing" "time" "github.com/orca/orca/pkg/bus" ) // ============================================================ // BaseAgent Tests // ============================================================ func TestNewBaseAgent(t *testing.T) { a := NewBaseAgent("test-1", "worker") if a == nil { t.Fatal("NewBaseAgent() returned nil") } if a.ID() != "test-1" { t.Errorf("expected id 'test-1', got %q", a.ID()) } if a.Role() != "worker" { t.Errorf("expected role 'worker', got %q", a.Role()) } if s := a.Status(); s != StatusIdle { t.Errorf("expected initial StatusIdle, got %s", s) } } func TestBaseAgentStartAndStop(t *testing.T) { a := NewBaseAgent("test-2", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ID: "response"}, nil }) if err := a.Start(); err != nil { t.Fatalf("Start failed: %v", err) } if !a.IsStarted() { t.Error("expected agent to be started") } if err := a.Stop(); err != nil { t.Fatalf("Stop failed: %v", err) } if a.IsStarted() { t.Error("expected agent to be stopped after Stop()") } } func TestBaseAgentDoubleStart(t *testing.T) { a := NewBaseAgent("test-3", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ID: "response"}, nil }) if err := a.Start(); err != nil { t.Fatalf("first Start failed: %v", err) } err := a.Start() if err == nil { t.Error("expected error on double start") } a.Stop() } func TestBaseAgentStartWithoutHandler(t *testing.T) { a := NewBaseAgent("test-4", "worker") err := a.Start() if err == nil { t.Error("expected error starting agent without handler") } } func TestBaseAgentProcessAndResponse(t *testing.T) { a := NewBaseAgent("test-5", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ ID: msg.ID + "-resp", Type: bus.MsgTypeTaskResponse, From: a.ID(), To: msg.From, Content: "processed: " + msg.ID, }, nil }) a.Start() defer a.Stop() ctx := context.Background() resp, err := a.Process(ctx, bus.Message{ ID: "task-1", Type: bus.MsgTypeTaskRequest, From: "caller", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.ID != "task-1-resp" { t.Errorf("expected response ID 'task-1-resp', got %q", resp.ID) } if resp.Content != "processed: task-1" { t.Errorf("expected content 'processed: task-1', got %v", resp.Content) } if resp.From != "test-5" { t.Errorf("expected From 'test-5', got %q", resp.From) } } func TestBaseAgentProcessReturnsError(t *testing.T) { expectedErr := errors.New("processing failed") a := NewBaseAgent("test-6", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{}, expectedErr }) a.Start() defer a.Stop() _, err := a.Process(context.Background(), bus.Message{ID: "task-1"}) if err == nil { t.Fatal("expected error from Process") } if !errors.Is(err, expectedErr) { t.Errorf("expected error %v, got %v", expectedErr, err) } } func TestBaseAgentProcessOnStoppedAgent(t *testing.T) { a := NewBaseAgent("test-7", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ID: "response"}, nil }) a.Start() a.Stop() _, err := a.Process(context.Background(), bus.Message{ID: "task-1"}) if err == nil { t.Error("expected error processing on stopped agent") } } func TestBaseAgentContextCancellation(t *testing.T) { a := NewBaseAgent("test-8", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { // Simulate long processing select { case <-time.After(5 * time.Second): return bus.Message{ID: "response"}, nil case <-ctx.Done(): return bus.Message{}, ctx.Err() } }) a.Start() defer a.Stop() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() _, err := a.Process(ctx, bus.Message{ID: "task-1"}) if err == nil { t.Error("expected error from context cancellation") } } func TestBaseAgentStatusTransitions(t *testing.T) { a := NewBaseAgent("test-9", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { if a.Status() != StatusProcessing { t.Errorf("expected StatusProcessing inside handler, got %s", a.Status()) } return bus.Message{ID: "response"}, nil }) a.Start() defer a.Stop() // Should be idle before processing if s := a.Status(); s != StatusIdle { t.Errorf("expected StatusIdle before Process, got %s", s) } _, err := a.Process(context.Background(), bus.Message{ID: "task-1"}) if err != nil { t.Fatalf("Process failed: %v", err) } // Should be completed after processing // Give the goroutine a moment to update the status time.Sleep(5 * time.Millisecond) if s := a.Status(); s != StatusCompleted { t.Errorf("expected StatusCompleted after Process, got %s", s) } } func TestBaseAgentConcurrentProcess(t *testing.T) { a := NewBaseAgent("test-10", "worker") var counter int32 a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { atomic.AddInt32(&counter, 1) return bus.Message{ID: "response"}, nil }) a.Start() defer a.Stop() var completed int32 for i := 0; i < 10; i++ { go func(i int) { _, err := a.Process(context.Background(), bus.Message{ID: "task"}) if err == nil { atomic.AddInt32(&completed, 1) } }(i) } time.Sleep(200 * time.Millisecond) if n := atomic.LoadInt32(&completed); n != 10 { t.Errorf("expected 10 completed tasks, got %d", n) } if n := atomic.LoadInt32(&counter); n != 10 { t.Errorf("expected 10 handler calls, got %d", n) } } func TestBaseAgentStopIdempotent(t *testing.T) { a := NewBaseAgent("test-11", "worker") a.SetHandler(func(ctx context.Context, msg bus.Message) (bus.Message, error) { return bus.Message{ID: "response"}, nil }) a.Start() if err := a.Stop(); err != nil { t.Fatalf("first Stop failed: %v", err) } if err := a.Stop(); err != nil { t.Fatalf("second Stop should be idempotent: %v", err) } } func TestActorStatusString(t *testing.T) { tests := []struct { status ActorStatus want string }{ {StatusIdle, "idle"}, {StatusProcessing, "processing"}, {StatusWaitingForTool, "waiting_for_tool"}, {StatusCompleted, "completed"}, {StatusStopped, "stopped"}, {ActorStatus(99), "unknown"}, } for _, tt := range tests { if got := tt.status.String(); got != tt.want { t.Errorf("ActorStatus(%d).String() = %q, want %q", tt.status, got, tt.want) } } } // ============================================================ // Worker Tests // ============================================================ func TestNewWorker(t *testing.T) { w := NewWorker("worker-1") if w == nil { t.Fatal("NewWorker() returned nil") } if w.ID() != "worker-1" { t.Errorf("expected id 'worker-1', got %q", w.ID()) } if w.Role() != "worker" { t.Errorf("expected role 'worker', got %q", w.Role()) } if !w.IsStarted() { t.Error("expected worker to be started automatically") } w.Stop() } func TestWorkerProcessTask(t *testing.T) { w := NewWorker("worker-2") defer w.Stop() resp, err := w.Process(context.Background(), bus.Message{ ID: "task-1", Type: bus.MsgTypeTaskRequest, From: "caller", Content: "do something", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.Type != bus.MsgTypeTaskResponse { t.Errorf("expected MsgTypeTaskResponse, got %s", resp.Type) } if resp.From != "worker-2" { t.Errorf("expected From 'worker-2', got %q", resp.From) } if resp.Metadata["processed_by"] != "worker-2" { t.Errorf("expected processed_by 'worker-2', got %q", resp.Metadata["processed_by"]) } } func TestWorkerProcessToolCall(t *testing.T) { w := NewWorker("worker-3") defer w.Stop() resp, err := w.Process(context.Background(), bus.Message{ ID: "tool-1", Type: bus.MsgTypeToolCall, From: "caller", Content: "execute command", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.Type != bus.MsgTypeToolResult { t.Errorf("expected MsgTypeToolResult, got %s", resp.Type) } } func TestWorkerStatusDuringToolCall(t *testing.T) { w := NewWorker("worker-4") // Perform a tool call _, err := w.Process(context.Background(), bus.Message{ ID: "tool-1", Type: bus.MsgTypeToolCall, From: "caller", }) if err != nil { t.Fatalf("Process failed: %v", err) } // After tool call, status should be Processing (set back by defer) time.Sleep(5 * time.Millisecond) // Status could be Processing or Completed depending on timing s := w.Status() if s != StatusProcessing && s != StatusIdle && s != StatusCompleted { t.Errorf("expected Processing/Idle/Completed after tool call, got %s", s) } w.Stop() } func TestWorkerUnsupportedMessage(t *testing.T) { w := NewWorker("worker-5") defer w.Stop() _, err := w.Process(context.Background(), bus.Message{ ID: "unknown-1", Type: bus.MsgTypeObservation, From: "caller", }) if err == nil { t.Error("expected error for unsupported message type") } } // ============================================================ // Orchestrator Tests // ============================================================ func TestNewOrchestrator(t *testing.T) { o := NewOrchestrator("orch-1", nil) if o == nil { t.Fatal("NewOrchestrator() returned nil") } if o.ID() != "orch-1" { t.Errorf("expected id 'orch-1', got %q", o.ID()) } if o.Role() != "orchestrator" { t.Errorf("expected role 'orchestrator', got %q", o.Role()) } if !o.IsStarted() { t.Error("expected orchestrator to be started automatically") } o.Stop() } func TestOrchestratorAddWorker(t *testing.T) { o := NewOrchestrator("orch-2", nil) defer o.Stop() w := NewWorker("worker-10") defer w.Stop() o.AddWorker(w) if n := o.WorkerCount(); n != 1 { t.Errorf("expected 1 worker, got %d", n) } got, ok := o.GetWorker("worker-10") if !ok { t.Fatal("expected to find worker-10") } if got.ID() != "worker-10" { t.Errorf("expected worker ID 'worker-10', got %q", got.ID()) } } func TestOrchestratorRemoveWorker(t *testing.T) { o := NewOrchestrator("orch-3", nil) defer o.Stop() w := NewWorker("worker-11") defer w.Stop() o.AddWorker(w) o.RemoveWorker("worker-11") if n := o.WorkerCount(); n != 0 { t.Errorf("expected 0 workers after removal, got %d", n) } } func TestOrchestratorListWorkers(t *testing.T) { o := NewOrchestrator("orch-4", nil) defer o.Stop() workers := []string{"w-1", "w-2", "w-3"} for _, name := range workers { w := NewWorker(name) defer w.Stop() o.AddWorker(w) } list := o.ListWorkers() if len(list) != len(workers) { t.Errorf("expected %d workers, got %d", len(workers), len(list)) } ids := make(map[string]bool) for _, w := range list { ids[w.ID()] = true } for _, name := range workers { if !ids[name] { t.Errorf("missing worker %q in list", name) } } } func TestOrchestratorDelegatesToWorker(t *testing.T) { o := NewOrchestrator("orch-5", nil) defer o.Stop() w := NewWorker("worker-20") defer w.Stop() o.AddWorker(w) resp, err := o.Process(context.Background(), bus.Message{ ID: "task-1", Type: bus.MsgTypeTaskRequest, From: "caller", Content: "do work", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.From != "worker-20" { t.Errorf("expected response from 'worker-20', got %q", resp.From) } if resp.Metadata["processed_by"] != "worker-20" { t.Errorf("expected processed_by 'worker-20', got %q", resp.Metadata["processed_by"]) } } func TestOrchestratorNoWorkers(t *testing.T) { o := NewOrchestrator("orch-6", nil) defer o.Stop() _, err := o.Process(context.Background(), bus.Message{ ID: "task-1", Type: bus.MsgTypeTaskRequest, From: "caller", }) if err == nil { t.Error("expected error when no workers available") } } func TestOrchestratorSystemMessage(t *testing.T) { o := NewOrchestrator("orch-7", nil) defer o.Stop() resp, err := o.Process(context.Background(), bus.Message{ ID: "sys-1", Type: bus.MsgTypeSystem, From: "caller", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.Content != "orchestrator acknowledged" { t.Errorf("expected acknowledged message, got %v", resp.Content) } } // ============================================================ // System Tests // ============================================================ func TestNewSystem(t *testing.T) { s := NewSystem() if s == nil { t.Fatal("NewSystem() returned nil") } if n := s.AgentCount(); n != 0 { t.Errorf("expected 0 agents, got %d", n) } } func TestSystemCreateWorker(t *testing.T) { s := NewSystem() w, err := s.CreateWorker() if err != nil { t.Fatalf("CreateWorker failed: %v", err) } if w == nil { t.Fatal("CreateWorker returned nil") } if w.Role() != "worker" { t.Errorf("expected role 'worker', got %q", w.Role()) } if n := s.AgentCount(); n != 1 { t.Errorf("expected 1 agent, got %d", n) } s.StopAll() } func TestSystemStopAgent(t *testing.T) { s := NewSystem() w, _ := s.CreateWorker() if err := s.StopAgent(w.ID()); err != nil { t.Fatalf("StopAgent failed: %v", err) } if n := s.AgentCount(); n != 0 { t.Errorf("expected 0 agents, got %d", n) } _, ok := s.GetAgent(w.ID()) if ok { t.Error("expected agent to be removed after StopAgent") } } func TestSystemStopAgentNotFound(t *testing.T) { s := NewSystem() err := s.StopAgent("nonexistent") if err == nil { t.Error("expected error stopping nonexistent agent") } } func TestSystemListAgents(t *testing.T) { s := NewSystem() s.CreateWorker() s.CreateWorker() agents := s.ListAgents() if len(agents) != 2 { t.Errorf("expected 2 agents, got %d", len(agents)) } s.StopAll() } func TestSystemAgentInfos(t *testing.T) { s := NewSystem() w, _ := s.CreateWorker() infos := s.AgentInfos() if len(infos) != 1 { t.Fatalf("expected 1 agent info, got %d", len(infos)) } if infos[0].ID != w.ID() { t.Errorf("expected ID %q, got %q", w.ID(), infos[0].ID) } if infos[0].Role != "worker" { t.Errorf("expected Role 'worker', got %q", infos[0].Role) } if infos[0].Status != StatusIdle { t.Errorf("expected Status StatusIdle, got %s", infos[0].Status) } s.StopAll() } func TestSystemStopAll(t *testing.T) { s := NewSystem() s.CreateWorker() s.CreateWorker() s.CreateWorker() if err := s.StopAll(); err != nil { t.Fatalf("StopAll failed: %v", err) } if n := s.AgentCount(); n != 0 { t.Errorf("expected 0 agents after StopAll, got %d", n) } } // ============================================================ // ToolWorker Tests // ============================================================ func TestNewToolWorker(t *testing.T) { tw := NewToolWorker("tool-1", nil) if tw == nil { t.Fatal("NewToolWorker() returned nil") } if tw.ID() != "tool-1" { t.Errorf("expected id 'tool-1', got %q", tw.ID()) } if tw.Role() != "tool_worker" { t.Errorf("expected role 'tool_worker', got %q", tw.Role()) } if !tw.IsStarted() { t.Error("expected tool worker to be started automatically") } tw.Stop() } func TestToolWorkerProcessSystemMessage(t *testing.T) { tw := NewToolWorker("tool-2", nil) defer tw.Stop() resp, err := tw.Process(context.Background(), bus.Message{ ID: "sys-1", Type: bus.MsgTypeSystem, From: "caller", }) if err != nil { t.Fatalf("Process failed: %v", err) } if resp.Content != "tool_worker acknowledged" { t.Errorf("expected 'tool_worker acknowledged', got %v", resp.Content) } } func TestToolWorkerUnsupportedMessage(t *testing.T) { tw := NewToolWorker("tool-3", nil) defer tw.Stop() _, err := tw.Process(context.Background(), bus.Message{ ID: "obs-1", Type: bus.MsgTypeObservation, From: "caller", }) if err == nil { t.Error("expected error for unsupported message type") } } func TestParseToolCallContentMap(t *testing.T) { name, args, err := parseToolCallContent(map[string]interface{}{ "name": "exec", "arguments": map[string]interface{}{"command": "ls"}, }) if err != nil { t.Fatalf("parseToolCallContent failed: %v", err) } if name != "exec" { t.Errorf("expected name 'exec', got %q", name) } if args["command"] != "ls" { t.Errorf("expected args['command'] = 'ls', got %v", args["command"]) } } func TestParseToolCallContentMissingName(t *testing.T) { _, _, err := parseToolCallContent(map[string]interface{}{"foo": "bar"}) if err == nil { t.Error("expected error for missing name") } } // ============================================================ // System ToolWorker Tests // ============================================================ func TestSystemCreateToolWorker(t *testing.T) { s := NewSystem() tw, err := s.CreateToolWorker(nil) if err != nil { t.Fatalf("CreateToolWorker failed: %v", err) } if tw == nil { t.Fatal("CreateToolWorker returned nil") } if tw.Role() != "tool_worker" { t.Errorf("expected role 'tool_worker', got %q", tw.Role()) } if n := s.AgentCount(); n != 1 { t.Errorf("expected 1 agent, got %d", n) } s.StopAll() }