orca.ai/pkg/actor/actor.go
大森 6b94476347 Initial commit: Orca Agent Framework
Core features:
- Microkernel architecture with Actor model
- Session management with JSONL persistence
- Tool system (5 built-in tools)
- Skill system with SKILL.md parsing
- Sandbox security execution
- Ollama integration with gemma4:e4b
- Prompt-based tool calling (compatible with native function calling)
- REPL interface

11 packages, all tests passing
2026-05-08 00:55:48 +08:00

221 lines
5.7 KiB
Go

// Package actor implements the Actor model for the Orca framework.
//
// An Agent is an independent goroutine that communicates via channels.
// Each agent has a state machine: Idle -> Processing -> [ToolCall] ->
// WaitingForTool -> Processing -> Completed.
package actor
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/orca/orca/pkg/bus"
)
// ActorStatus represents the current state of an agent in its state machine.
type ActorStatus int
const (
// StatusIdle indicates the agent is ready to accept messages.
StatusIdle ActorStatus = iota
// StatusProcessing indicates the agent is actively handling a message.
StatusProcessing
// StatusWaitingForTool indicates the agent has called a tool and is awaiting its result.
StatusWaitingForTool
// StatusCompleted indicates the agent has finished processing the last message.
StatusCompleted
// StatusStopped indicates the agent has been shut down.
StatusStopped
)
// String returns the human-readable name of the actor status.
func (s ActorStatus) String() string {
switch s {
case StatusIdle:
return "idle"
case StatusProcessing:
return "processing"
case StatusWaitingForTool:
return "waiting_for_tool"
case StatusCompleted:
return "completed"
case StatusStopped:
return "stopped"
default:
return "unknown"
}
}
// Agent is the interface that all actors in the Orca framework must implement.
//
// Each Agent runs as an independent goroutine processing messages
// through an internal channel. The Process method provides a synchronous
// API to submit messages and await responses.
type Agent interface {
// ID returns the unique identifier for this agent.
ID() string
// Role returns the role/type of this agent (e.g., "orchestrator", "worker").
Role() string
// Process sends a message to this agent and waits for a response.
// This is a synchronous call; the agent's goroutine handles the message.
Process(ctx context.Context, msg bus.Message) (bus.Message, error)
// Stop gracefully shuts down this agent, waiting for in-flight processing to complete.
Stop() error
}
// agentRequest wraps a message and provides a response channel.
type agentRequest struct {
ctx context.Context
msg bus.Message
resp chan agentResponse
}
// agentResponse wraps the result of processing a message.
type agentResponse struct {
msg bus.Message
err error
}
// BaseAgent provides shared infrastructure for all agent implementations.
//
// It manages the message channel, goroutine lifecycle, and status tracking.
// Concrete agents should embed BaseAgent and set a handler via SetHandler.
type BaseAgent struct {
id string
role string
msgCh chan agentRequest
stopCh chan struct{}
status atomic.Value
wg sync.WaitGroup
mu sync.Mutex
started bool
handler func(context.Context, bus.Message) (bus.Message, error)
}
// NewBaseAgent creates a new BaseAgent with the given id and role.
// The agent is not started until Start() is called and a handler is set.
func NewBaseAgent(id, role string) *BaseAgent {
a := &BaseAgent{
id: id,
role: role,
msgCh: make(chan agentRequest, 64),
stopCh: make(chan struct{}),
}
a.status.Store(StatusIdle)
return a
}
// ID returns the agent's unique identifier.
func (a *BaseAgent) ID() string { return a.id }
// Role returns the agent's role.
func (a *BaseAgent) Role() string { return a.role }
// Status returns the current ActorStatus of this agent.
func (a *BaseAgent) Status() ActorStatus {
s, _ := a.status.Load().(ActorStatus)
return s
}
// setStatus atomically updates the agent's status.
func (a *BaseAgent) setStatus(s ActorStatus) {
a.status.Store(s)
}
// SetHandler sets the message handler function for this agent.
// Must be called before Start().
func (a *BaseAgent) SetHandler(handler func(context.Context, bus.Message) (bus.Message, error)) {
a.mu.Lock()
defer a.mu.Unlock()
a.handler = handler
}
// Start launches the agent's message processing goroutine.
// The handler must be set before calling Start.
func (a *BaseAgent) Start() error {
a.mu.Lock()
defer a.mu.Unlock()
if a.started {
return fmt.Errorf("agent %s is already started", a.id)
}
if a.handler == nil {
return fmt.Errorf("agent %s has no handler set", a.id)
}
a.started = true
a.status.Store(StatusIdle)
a.wg.Add(1)
go a.loop()
return nil
}
// loop is the main goroutine that reads messages from msgCh and processes them.
func (a *BaseAgent) loop() {
defer a.wg.Done()
for {
select {
case req := <-a.msgCh:
a.setStatus(StatusProcessing)
resp, err := a.handler(req.ctx, req.msg)
if err != nil {
a.setStatus(StatusIdle)
} else {
a.setStatus(StatusCompleted)
}
req.resp <- agentResponse{msg: resp, err: err}
case <-a.stopCh:
a.setStatus(StatusStopped)
return
}
}
}
// Process sends a message to the agent's processing loop and waits for a response.
// It respects context cancellation and the agent's stop signal.
func (a *BaseAgent) Process(ctx context.Context, msg bus.Message) (bus.Message, error) {
respCh := make(chan agentResponse, 1)
select {
case a.msgCh <- agentRequest{ctx: ctx, msg: msg, resp: respCh}:
case <-ctx.Done():
return bus.Message{}, ctx.Err()
case <-a.stopCh:
return bus.Message{}, fmt.Errorf("agent %s is stopped", a.id)
}
select {
case r := <-respCh:
return r.msg, r.err
case <-ctx.Done():
return bus.Message{}, ctx.Err()
}
}
// Stop gracefully shuts down the agent.
// It signals the processing loop to exit and waits for it to finish.
func (a *BaseAgent) Stop() error {
a.mu.Lock()
started := a.started
a.started = false
a.mu.Unlock()
if !started {
return nil
}
close(a.stopCh)
a.wg.Wait()
return nil
}
// IsStarted returns whether the agent's processing loop is running.
func (a *BaseAgent) IsStarted() bool {
a.mu.Lock()
defer a.mu.Unlock()
return a.started
}