orca.ai/pkg/kernel/kernel.go
2026-05-12 00:09:01 +08:00

539 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package kernel 实现了 Orca 框架的微内核核心。
//
// 内核是一个最小化运行时,负责管理插件生命周期、
// 消息路由和组件间通信。
package kernel
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/orca/orca/internal/config"
"github.com/orca/orca/pkg/actor"
"github.com/orca/orca/pkg/bus"
"github.com/orca/orca/pkg/embedding"
"github.com/orca/orca/pkg/llm"
"github.com/orca/orca/pkg/plugin"
"github.com/orca/orca/pkg/session"
"github.com/orca/orca/pkg/skill"
"github.com/orca/orca/pkg/tool"
)
type Kernel struct {
mu sync.RWMutex
mb bus.MessageBus
registry *plugin.Registry
plugins []plugin.Plugin
started bool
config *config.Config
sessionMgr *session.Manager
sessionStore *session.SQLiteStore
memoryManager *session.MemoryManager
toolMgr *tool.Manager
skillMgr *skill.Manager
actorSystem *actor.System
orch *actor.Orchestrator
llmAgent *actor.LLMAgent
toolWorker *actor.ToolWorker
subAgents map[string]actor.Agent
}
// New 从配置文件创建一个新的 Kernel 实例。
// 默认加载 ./config.toml 或 ~/.orca/config.toml。
func New() *Kernel {
cfg, err := config.LoadConfig()
if err != nil {
log.Printf("kernel: warning: failed to load config: %v, using defaults", err)
cfg = config.DefaultConfig()
}
return NewWithConfig(cfg)
}
// NewWithConfig 使用给定的配置创建一个新的 Kernel 实例。
func NewWithConfig(cfg *config.Config) *Kernel {
if cfg == nil {
cfg = config.DefaultConfig()
}
k := &Kernel{
mb: bus.New(),
registry: plugin.NewRegistry(),
config: cfg,
actorSystem: actor.NewSystem(),
subAgents: make(map[string]actor.Agent),
}
storageDir := expandHomeDir(cfg.Session.StorageDir)
dbPath := filepath.Join(storageDir, "orcasession.db")
store, err := session.NewSQLiteStore(dbPath)
if err != nil {
log.Printf("kernel: warning: failed to create session store: %v", err)
} else {
k.sessionStore = store
k.sessionMgr = session.NewManager(store, k.mb)
log.Printf("kernel: session manager initialized with SQLite storage")
}
if cfg.SiliconFlow.APIKey != "" && store != nil {
memoryCfg := session.MemoryConfig{
DBPath: dbPath,
ModelWindow: cfg.Embedding.MaxCtx,
EmbedConfig: embedding.Config{
APIKey: cfg.SiliconFlow.APIKey,
BaseURL: cfg.SiliconFlow.BaseURL,
Model: cfg.Embedding.Model,
Timeout: 5 * time.Second,
},
}
k.memoryManager, err = session.NewMemoryManagerWithStore(memoryCfg, store)
if err != nil {
log.Printf("kernel: warning: failed to create memory manager: %v", err)
} else {
log.Printf("kernel: memory manager initialized with %s embedding (shared storage)", cfg.Embedding.Model)
}
}
// Initialize tool manager with all built-in tools
k.toolMgr = tool.NewManager()
k.registerBuiltinTools()
// Initialize skill manager
k.skillMgr = skill.NewManager("~/.agents/skills")
// Initialize actor system
k.initializeActorSystem()
return k
}
// registerBuiltinTools 向工具管理器注册所有内置工具。
func (k *Kernel) registerBuiltinTools() {
tools := []tool.Tool{
tool.NewExecTool(nil), // exec - shell commands
tool.NewReadFileTool(), // read_file
tool.NewWriteFileTool(), // write_file
tool.NewListDirTool(), // list_dir
tool.NewSearchFilesTool(), // search_files
}
for _, t := range tools {
if err := k.toolMgr.Register(t); err != nil {
log.Printf("kernel: warning: failed to register tool %q: %v", t.Name(), err)
}
}
}
// initializeActorSystem 设置编排器、工具工作者和 LLM 代理。
func (k *Kernel) initializeActorSystem() {
orch, err := k.actorSystem.CreateOrchestrator(k)
if err != nil {
log.Printf("kernel: warning: failed to create orchestrator: %v", err)
return
}
k.orch = orch
tw, err := k.actorSystem.CreateToolWorker(k.toolMgr)
if err != nil {
log.Printf("kernel: warning: failed to create tool worker: %v", err)
return
}
k.toolWorker = tw
ollama := k.createLLMBackend()
k.createSubAgents(ollama)
agentCallTool := tool.NewAgentCallTool(k.findAgent)
if acTool, ok := agentCallTool.(interface{ SetEventBus(bus.MessageBus) }); ok {
acTool.SetEventBus(k.mb)
}
if err := k.toolMgr.Register(agentCallTool); err != nil {
log.Printf("kernel: warning: failed to register agent_call tool: %v", err)
}
llmAgentID := fmt.Sprintf("llm-%d", len(k.actorSystem.ListAgents())+1)
llmOpts := []actor.LLMAgentOption{
actor.WithToolManager(k.toolMgr),
actor.WithToolWorker(k.toolWorker),
actor.WithWindowSize(k.config.Session.MaxHistory),
}
builtinPromptPath := expandHomeDir("~/.orca/agents/_builtin/assistant.md")
if data, err := os.ReadFile(builtinPromptPath); err == nil && len(data) > 0 {
llmOpts = append(llmOpts, actor.WithSystemPrompt(string(data)))
} else if prompt := k.config.GetSystemPrompt(); prompt != "" {
llmOpts = append(llmOpts, actor.WithSystemPrompt(prompt))
}
if k.skillMgr != nil {
llmOpts = append(llmOpts, actor.WithSkillManager(k.skillMgr))
}
if k.sessionMgr != nil {
sessionID := "default"
if _, err := k.sessionMgr.GetSession(sessionID); err != nil {
k.sessionMgr.CreateSession(sessionID, map[string]string{
"source": "kernel",
})
}
llmOpts = append(llmOpts,
actor.WithSessionManager(k.sessionMgr),
actor.WithSessionID(sessionID),
)
}
if k.memoryManager != nil {
llmOpts = append(llmOpts, actor.WithMemoryManager(k.memoryManager))
}
if len(k.subAgents) > 0 {
agentDescs := make(map[string]string)
for name, agent := range k.subAgents {
if sa, ok := agent.(*actor.SubAgent); ok {
agentDescs[name] = sa.SystemPrompt()
}
}
llmOpts = append(llmOpts, actor.WithSubAgents(agentDescs))
}
llmAgent := actor.NewLLMAgent(llmAgentID, ollama, llmOpts...)
k.llmAgent = llmAgent
if k.memoryManager != nil {
k.memoryManager.SetLLM(ollama)
}
k.orch.AddWorker(llmAgent)
k.orch.AddWorker(tw)
k.orch.SetDefaultWorker(llmAgent)
}
func (k *Kernel) createSubAgents(llmBackend llm.LLM) {
agentsDir := expandHomeDir("~/.orca/agents")
entries, err := os.ReadDir(agentsDir)
if err != nil {
log.Printf("kernel: warning: cannot read agents dir %s: %v", agentsDir, err)
return
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".md") {
continue
}
agentName := strings.TrimSuffix(name, ".md")
agentPath := filepath.Join(agentsDir, name)
content, err := os.ReadFile(agentPath)
if err != nil {
log.Printf("kernel: warning: failed to read agent prompt %s: %v", agentPath, err)
continue
}
prompt := string(content)
if strings.TrimSpace(prompt) == "" {
log.Printf("kernel: warning: empty agent prompt file %s, skipping", agentPath)
continue
}
agent := actor.NewSubAgent(agentName, llmBackend,
actor.WithSubAgentRole(agentName),
actor.WithSubAgentSystemPrompt(prompt),
actor.WithSubAgentStore(k.sessionStore),
actor.WithSubAgentParentSessionID("default"),
)
k.subAgents[agentName] = agent
k.orch.AddWorker(agent)
log.Printf("kernel: created sub-agent %q from %s", agentName, agentPath)
}
log.Printf("kernel: created %d sub-agents", len(k.subAgents))
}
func expandHomeDir(path string) string {
if len(path) > 0 && path[0] == '~' {
home, err := os.UserHomeDir()
if err == nil {
return home + path[1:]
}
}
return path
}
func (k *Kernel) findAgent(name string) (tool.Agent, bool) {
k.mu.RLock()
defer k.mu.RUnlock()
agent, ok := k.subAgents[name]
if !ok {
return nil, false
}
return agent.(tool.Agent), true
}
func (k *Kernel) createLLMBackend() llm.LLM {
switch k.config.Provider {
case config.ProviderDeepSeek:
return k.createDeepSeekBackend()
default:
return k.createOllamaBackend()
}
}
func (k *Kernel) createOllamaBackend() llm.LLM {
cfg := k.config.Ollama
client := llm.NewOllamaClient(
llm.WithBaseURL(cfg.BaseURL),
llm.WithModel(cfg.Model),
llm.WithTimeout(cfg.Timeout),
)
log.Printf("kernel: created Ollama client (model=%s, url=%s)", cfg.Model, cfg.BaseURL)
return client
}
func (k *Kernel) createDeepSeekBackend() llm.LLM {
cfg := k.config.DeepSeek
client := llm.NewDeepSeekClient(
llm.WithDeepSeekBaseURL(cfg.BaseURL),
llm.WithDeepSeekModel(cfg.Model),
llm.WithDeepSeekAPIKey(cfg.APIKey),
llm.WithDeepSeekTimeout(cfg.Timeout),
)
log.Printf("kernel: created DeepSeek client (model=%s)", cfg.Model)
return client
}
// Bus 返回内核的消息总线。
func (k *Kernel) Bus() bus.MessageBus {
return k.mb
}
// Registry 返回插件注册表。
func (k *Kernel) Registry() *plugin.Registry {
return k.registry
}
// SessionManager 返回会话管理器。
func (k *Kernel) SessionManager() *session.Manager {
return k.sessionMgr
}
// ToolManager 返回工具管理器。
func (k *Kernel) ToolManager() *tool.Manager {
return k.toolMgr
}
// SkillManager 返回技能管理器。
func (k *Kernel) SkillManager() *skill.Manager {
return k.skillMgr
}
// ActorSystem 返回 Actor 系统。
func (k *Kernel) ActorSystem() *actor.System {
return k.actorSystem
}
// Orchestrator 返回编排器代理。
func (k *Kernel) Orchestrator() *actor.Orchestrator {
return k.orch
}
// LLMAgent 返回 LLM 代理。
func (k *Kernel) LLMAgent() *actor.LLMAgent {
return k.llmAgent
}
// MemoryManager 返回记忆管理器。
func (k *Kernel) MemoryManager() *session.MemoryManager {
return k.memoryManager
}
// SetStreamWriter 设置用于流式 LLM 输出的写入器。
func (k *Kernel) SetStreamWriter(w io.Writer) {
if k.llmAgent != nil {
k.llmAgent.SetStreamWriter(w)
}
for _, agent := range k.subAgents {
if sa, ok := agent.(*actor.SubAgent); ok {
sa.SetStreamWriter(w)
}
}
}
// SendMessage 从发送方向 LLM 代理发送消息。
//
// 这是与 Orca 系统交互的主要公共 API。
// 它创建一个任务请求消息,并通过编排器发送给 LLM 代理处理。
//
// 参数:
// - from: 发送者标识(例如 "user"、"cli"
// - to: 接收者(使用 "llm" 表示 LLM 代理)
// - content: 消息内容(纯文本)
//
// 返回响应内容的字符串,或错误。
func (k *Kernel) SendMessage(from, to, content string) (string, error) {
if !k.IsRunning() {
return "", fmt.Errorf("kernel: kernel is not running")
}
if k.orch == nil {
return "", fmt.Errorf("kernel: orchestrator not initialized")
}
// Create a task request message
msg := bus.Message{
Type: bus.MsgTypeTaskRequest,
From: from,
To: to,
Content: content,
}
// Send through the orchestrator
ctx := context.Background()
resp, err := k.orch.Process(ctx, msg)
if err != nil {
return "", fmt.Errorf("kernel: orchestrator processing failed: %w", err)
}
// Extract response content
switch v := resp.Content.(type) {
case string:
return v, nil
default:
return fmt.Sprintf("%v", v), nil
}
}
// InitPlugins 从技能目录加载并初始化技能。
func (k *Kernel) InitPlugins() error {
if k.skillMgr == nil {
return nil
}
count, err := k.skillMgr.LoadAll()
if err != nil {
log.Printf("kernel: warning: skill loading had errors: %v", err)
}
if count > 0 {
log.Printf("kernel: loaded %d skills", count)
}
return nil
}
// GetPlugin 根据名称返回已注册的插件。
func (k *Kernel) GetPlugin(name string) (plugin.Plugin, bool) {
return k.registry.Get(name)
}
// ListPlugins 返回所有当前已注册的插件。
func (k *Kernel) ListPlugins() []plugin.Plugin {
return k.registry.List()
}
// RegisterPlugin 注册一个插件但不启动它。
func (k *Kernel) RegisterPlugin(p plugin.Plugin) error {
k.mu.Lock()
defer k.mu.Unlock()
if k.started {
return fmt.Errorf("kernel: cannot register plugin %q: kernel already started", p.Name())
}
return k.registry.Register(p)
}
// UnregisterPlugin 从注册表中移除一个插件。
func (k *Kernel) UnregisterPlugin(name string) error {
k.mu.Lock()
defer k.mu.Unlock()
return k.registry.Unregister(name)
}
// Start 初始化所有已注册的插件,并将内核标记为运行中。
func (k *Kernel) Start() error {
k.mu.Lock()
defer k.mu.Unlock()
if k.started {
return fmt.Errorf("kernel: already started")
}
k.started = true
// Initialize plugins
plugins := k.registry.List()
k.plugins = make([]plugin.Plugin, 0, len(plugins))
for _, p := range plugins {
k.registry.SetState(p.Name(), plugin.StateInitialized)
if err := p.Init(k); err != nil {
log.Printf("kernel: warning: failed to init plugin %q: %v", p.Name(), err)
k.registry.SetState(p.Name(), plugin.StateError)
continue
}
k.registry.SetState(p.Name(), plugin.StateRunning)
k.plugins = append(k.plugins, p)
log.Printf("kernel: plugin %q (%s) initialized", p.Name(), p.Version())
}
log.Printf("kernel: started (tools=%d)", k.toolMgr.Count())
return nil
}
// Stop 优雅地关闭内核。
func (k *Kernel) Stop() error {
k.mu.Lock()
defer k.mu.Unlock()
if !k.started {
return nil
}
// Stop actor system first
if k.actorSystem != nil {
if err := k.actorSystem.StopAll(); err != nil {
log.Printf("kernel: warning: error stopping actor system: %v", err)
}
}
// Stop plugins
for i := len(k.plugins) - 1; i >= 0; i-- {
p := k.plugins[i]
k.registry.SetState(p.Name(), plugin.StateStopped)
if err := p.Shutdown(); err != nil {
log.Printf("kernel: warning: error shutting down plugin %q: %v", p.Name(), err)
continue
}
log.Printf("kernel: plugin %q shut down", p.Name())
}
k.plugins = nil
k.started = false
return k.mb.Close()
}
// IsRunning 返回内核是否已启动且尚未停止。
func (k *Kernel) IsRunning() bool {
k.mu.RLock()
defer k.mu.RUnlock()
return k.started
}