orca.ai/pkg/kernel/kernel.go
大森 e18dde7c15 feat: implement TUI with bubbletea and multi-agent collaboration
- Add bubbletea/lipgloss/glamour dependencies for TUI
- Create internal/tui package with EventWriter, styles, and bubbletea Model
- Support streaming output display in conversation window
- Add right panel with statistics and active agent status
- Implement multi-agent collaboration with sub-agents
- Add AgentCallTool for delegating tasks to sub-agents
- Support parallel tool execution with goroutines
- Auto-discover sub-agents from ~/.orca/prompts/ directory
- Fix orchestrator routing based on msg.To field
- Add non-blocking event writer with timeout to prevent blocking
2026-05-10 14:28:17 +08:00

506 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package kernel 实现了 Orca 框架的微内核核心。
//
// 内核是一个最小化运行时,负责管理插件生命周期、
// 消息路由和组件间通信。
package kernel
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"github.com/orca/orca/internal/config"
"github.com/orca/orca/pkg/actor"
"github.com/orca/orca/pkg/bus"
"github.com/orca/orca/pkg/llm"
"github.com/orca/orca/pkg/plugin"
"github.com/orca/orca/pkg/session"
"github.com/orca/orca/pkg/skill"
"github.com/orca/orca/pkg/tool"
)
// Kernel 是 Orca 框架的微内核核心。
//
// 它编排插件生命周期、消息路由和组件间通信。
// 内核初始化并管理以下组件:
// - 消息总线,用于组件间通信
// - 插件注册表,支持扩展
// - 会话管理器,用于对话持久化
// - 工具管理器,包含内置工具
// - 技能管理器,用于基于技能的自动化
// - Actor 系统,包含编排器、工作者和 LLM 代理
type Kernel struct {
mu sync.RWMutex
mb bus.MessageBus
registry *plugin.Registry
plugins []plugin.Plugin
started bool
// Integration components
config *config.Config
sessionMgr *session.Manager
toolMgr *tool.Manager
skillMgr *skill.Manager
actorSystem *actor.System
orch *actor.Orchestrator
llmAgent *actor.LLMAgent
toolWorker *actor.ToolWorker
subAgents map[string]actor.Agent
}
// New 从配置文件创建一个新的 Kernel 实例。
// 默认加载 ./config.toml 或 ~/.orca/config.toml。
func New() *Kernel {
cfg, err := config.LoadConfig()
if err != nil {
log.Printf("kernel: warning: failed to load config: %v, using defaults", err)
cfg = config.DefaultConfig()
}
return NewWithConfig(cfg)
}
// NewWithConfig 使用给定的配置创建一个新的 Kernel 实例。
func NewWithConfig(cfg *config.Config) *Kernel {
if cfg == nil {
cfg = config.DefaultConfig()
}
k := &Kernel{
mb: bus.New(),
registry: plugin.NewRegistry(),
config: cfg,
actorSystem: actor.NewSystem(),
subAgents: make(map[string]actor.Agent),
}
// Initialize session manager
store, err := session.NewJSONLStore(cfg.Session.StorageDir)
if err != nil {
log.Printf("kernel: warning: failed to create session store: %v", err)
} else {
k.sessionMgr = session.NewManager(store, k.mb)
}
// Initialize tool manager with all built-in tools
k.toolMgr = tool.NewManager()
k.registerBuiltinTools()
// Initialize skill manager
k.skillMgr = skill.NewManager("~/.agents/skills")
// Initialize actor system
k.initializeActorSystem()
return k
}
// registerBuiltinTools 向工具管理器注册所有内置工具。
func (k *Kernel) registerBuiltinTools() {
tools := []tool.Tool{
tool.NewExecTool(nil), // exec - shell commands
tool.NewReadFileTool(), // read_file
tool.NewWriteFileTool(), // write_file
tool.NewListDirTool(), // list_dir
tool.NewSearchFilesTool(), // search_files
}
for _, t := range tools {
if err := k.toolMgr.Register(t); err != nil {
log.Printf("kernel: warning: failed to register tool %q: %v", t.Name(), err)
}
}
}
// initializeActorSystem 设置编排器、工具工作者和 LLM 代理。
func (k *Kernel) initializeActorSystem() {
orch, err := k.actorSystem.CreateOrchestrator(k)
if err != nil {
log.Printf("kernel: warning: failed to create orchestrator: %v", err)
return
}
k.orch = orch
tw, err := k.actorSystem.CreateToolWorker(k.toolMgr)
if err != nil {
log.Printf("kernel: warning: failed to create tool worker: %v", err)
return
}
k.toolWorker = tw
ollama := k.createLLMBackend()
k.createSubAgents(ollama)
agentCallTool := tool.NewAgentCallTool(k.findAgent)
if err := k.toolMgr.Register(agentCallTool); err != nil {
log.Printf("kernel: warning: failed to register agent_call tool: %v", err)
}
llmAgentID := fmt.Sprintf("llm-%d", len(k.actorSystem.ListAgents())+1)
llmOpts := []actor.LLMAgentOption{
actor.WithToolManager(k.toolMgr),
actor.WithToolWorker(k.toolWorker),
actor.WithWindowSize(k.config.Session.MaxHistory),
}
if prompt := k.config.GetSystemPrompt(); prompt != "" {
llmOpts = append(llmOpts, actor.WithSystemPrompt(prompt))
}
if k.skillMgr != nil {
llmOpts = append(llmOpts, actor.WithSkillManager(k.skillMgr))
}
if k.sessionMgr != nil {
sessionID := "default"
if _, err := k.sessionMgr.GetSession(sessionID); err != nil {
k.sessionMgr.CreateSession(sessionID, map[string]string{
"source": "kernel",
})
}
llmOpts = append(llmOpts,
actor.WithSessionManager(k.sessionMgr),
actor.WithSessionID(sessionID),
)
}
if len(k.subAgents) > 0 {
agentDescs := make(map[string]string)
for name, agent := range k.subAgents {
if sa, ok := agent.(*actor.SubAgent); ok {
agentDescs[name] = sa.SystemPrompt()
}
}
llmOpts = append(llmOpts, actor.WithSubAgents(agentDescs))
}
llmAgent := actor.NewLLMAgent(llmAgentID, ollama, llmOpts...)
k.llmAgent = llmAgent
k.orch.AddWorker(llmAgent)
k.orch.AddWorker(tw)
k.orch.SetDefaultWorker(llmAgent)
}
func (k *Kernel) createSubAgents(llmBackend llm.LLM) {
promptDir := expandHomeDir("~/.orca/prompts")
entries, err := os.ReadDir(promptDir)
if err != nil {
log.Printf("kernel: warning: cannot read prompts dir %s: %v", promptDir, err)
return
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".md") {
continue
}
if name == "assistant.md" {
continue
}
agentName := strings.TrimSuffix(name, ".md")
promptPath := filepath.Join(promptDir, name)
content, err := os.ReadFile(promptPath)
if err != nil {
log.Printf("kernel: warning: failed to read prompt %s: %v", promptPath, err)
continue
}
prompt := string(content)
if strings.TrimSpace(prompt) == "" {
log.Printf("kernel: warning: empty prompt file %s, skipping", promptPath)
continue
}
agent := actor.NewSubAgent(agentName, llmBackend,
actor.WithSubAgentRole(agentName),
actor.WithSubAgentSystemPrompt(prompt),
)
k.subAgents[agentName] = agent
k.orch.AddWorker(agent)
log.Printf("kernel: created sub-agent %q from %s", agentName, promptPath)
}
log.Printf("kernel: created %d sub-agents", len(k.subAgents))
}
func expandHomeDir(path string) string {
if len(path) > 0 && path[0] == '~' {
home, err := os.UserHomeDir()
if err == nil {
return home + path[1:]
}
}
return path
}
func (k *Kernel) findAgent(name string) (tool.Agent, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
agent, ok := k.subAgents[name]
if !ok {
return nil, false
}
return agent.(tool.Agent), true
}
func (k *Kernel) createLLMBackend() llm.LLM {
switch k.config.Provider {
case config.ProviderDeepSeek:
return k.createDeepSeekBackend()
default:
return k.createOllamaBackend()
}
}
func (k *Kernel) createOllamaBackend() llm.LLM {
cfg := k.config.Ollama
client := llm.NewOllamaClient(
llm.WithBaseURL(cfg.BaseURL),
llm.WithModel(cfg.Model),
llm.WithTimeout(cfg.Timeout),
)
log.Printf("kernel: created Ollama client (model=%s, url=%s)", cfg.Model, cfg.BaseURL)
return client
}
func (k *Kernel) createDeepSeekBackend() llm.LLM {
cfg := k.config.DeepSeek
client := llm.NewDeepSeekClient(
llm.WithDeepSeekBaseURL(cfg.BaseURL),
llm.WithDeepSeekModel(cfg.Model),
llm.WithDeepSeekAPIKey(cfg.APIKey),
llm.WithDeepSeekTimeout(cfg.Timeout),
)
log.Printf("kernel: created DeepSeek client (model=%s)", cfg.Model)
return client
}
// Bus 返回内核的消息总线。
func (k *Kernel) Bus() bus.MessageBus {
return k.mb
}
// Registry 返回插件注册表。
func (k *Kernel) Registry() *plugin.Registry {
return k.registry
}
// SessionManager 返回会话管理器。
func (k *Kernel) SessionManager() *session.Manager {
return k.sessionMgr
}
// ToolManager 返回工具管理器。
func (k *Kernel) ToolManager() *tool.Manager {
return k.toolMgr
}
// SkillManager 返回技能管理器。
func (k *Kernel) SkillManager() *skill.Manager {
return k.skillMgr
}
// ActorSystem 返回 Actor 系统。
func (k *Kernel) ActorSystem() *actor.System {
return k.actorSystem
}
// Orchestrator 返回编排器代理。
func (k *Kernel) Orchestrator() *actor.Orchestrator {
return k.orch
}
// LLMAgent 返回 LLM 代理。
func (k *Kernel) LLMAgent() *actor.LLMAgent {
return k.llmAgent
}
// SetStreamWriter 设置用于流式 LLM 输出的写入器。
func (k *Kernel) SetStreamWriter(w io.Writer) {
if k.llmAgent != nil {
k.llmAgent.SetStreamWriter(w)
}
for _, agent := range k.subAgents {
if sa, ok := agent.(*actor.SubAgent); ok {
sa.SetStreamWriter(w)
}
}
}
// SendMessage 从发送方向 LLM 代理发送消息。
//
// 这是与 Orca 系统交互的主要公共 API。
// 它创建一个任务请求消息,并通过编排器发送给 LLM 代理处理。
//
// 参数:
// - from: 发送者标识(例如 "user"、"cli"
// - to: 接收者(使用 "llm" 表示 LLM 代理)
// - content: 消息内容(纯文本)
//
// 返回响应内容的字符串,或错误。
func (k *Kernel) SendMessage(from, to, content string) (string, error) {
if !k.IsRunning() {
return "", fmt.Errorf("kernel: kernel is not running")
}
if k.orch == nil {
return "", fmt.Errorf("kernel: orchestrator not initialized")
}
// Create a task request message
msg := bus.Message{
Type: bus.MsgTypeTaskRequest,
From: from,
To: to,
Content: content,
}
// Send through the orchestrator
ctx := context.Background()
resp, err := k.orch.Process(ctx, msg)
if err != nil {
return "", fmt.Errorf("kernel: orchestrator processing failed: %w", err)
}
// Extract response content
switch v := resp.Content.(type) {
case string:
return v, nil
default:
return fmt.Sprintf("%v", v), nil
}
}
// InitPlugins 从技能目录加载并初始化技能。
func (k *Kernel) InitPlugins() error {
if k.skillMgr == nil {
return nil
}
count, err := k.skillMgr.LoadAll()
if err != nil {
log.Printf("kernel: warning: skill loading had errors: %v", err)
}
if count > 0 {
log.Printf("kernel: loaded %d skills", count)
}
return nil
}
// GetPlugin 根据名称返回已注册的插件。
func (k *Kernel) GetPlugin(name string) (plugin.Plugin, bool) {
return k.registry.Get(name)
}
// ListPlugins 返回所有当前已注册的插件。
func (k *Kernel) ListPlugins() []plugin.Plugin {
return k.registry.List()
}
// RegisterPlugin 注册一个插件但不启动它。
func (k *Kernel) RegisterPlugin(p plugin.Plugin) error {
k.mu.Lock()
defer k.mu.Unlock()
if k.started {
return fmt.Errorf("kernel: cannot register plugin %q: kernel already started", p.Name())
}
return k.registry.Register(p)
}
// UnregisterPlugin 从注册表中移除一个插件。
func (k *Kernel) UnregisterPlugin(name string) error {
k.mu.Lock()
defer k.mu.Unlock()
return k.registry.Unregister(name)
}
// Start 初始化所有已注册的插件,并将内核标记为运行中。
func (k *Kernel) Start() error {
k.mu.Lock()
defer k.mu.Unlock()
if k.started {
return fmt.Errorf("kernel: already started")
}
k.started = true
// Initialize plugins
plugins := k.registry.List()
k.plugins = make([]plugin.Plugin, 0, len(plugins))
for _, p := range plugins {
k.registry.SetState(p.Name(), plugin.StateInitialized)
if err := p.Init(k); err != nil {
log.Printf("kernel: warning: failed to init plugin %q: %v", p.Name(), err)
k.registry.SetState(p.Name(), plugin.StateError)
continue
}
k.registry.SetState(p.Name(), plugin.StateRunning)
k.plugins = append(k.plugins, p)
log.Printf("kernel: plugin %q (%s) initialized", p.Name(), p.Version())
}
log.Printf("kernel: started (tools=%d)", k.toolMgr.Count())
return nil
}
// Stop 优雅地关闭内核。
func (k *Kernel) Stop() error {
k.mu.Lock()
defer k.mu.Unlock()
if !k.started {
return nil
}
// Stop actor system first
if k.actorSystem != nil {
if err := k.actorSystem.StopAll(); err != nil {
log.Printf("kernel: warning: error stopping actor system: %v", err)
}
}
// Stop plugins
for i := len(k.plugins) - 1; i >= 0; i-- {
p := k.plugins[i]
k.registry.SetState(p.Name(), plugin.StateStopped)
if err := p.Shutdown(); err != nil {
log.Printf("kernel: warning: error shutting down plugin %q: %v", p.Name(), err)
continue
}
log.Printf("kernel: plugin %q shut down", p.Name())
}
k.plugins = nil
k.started = false
return k.mb.Close()
}
// IsRunning 返回内核是否已启动且尚未停止。
func (k *Kernel) IsRunning() bool {
k.mu.RLock()
defer k.mu.RUnlock()
return k.started
}