orca.ai/pkg/session/manager.go
大森 e18dde7c15 feat: implement TUI with bubbletea and multi-agent collaboration
- Add bubbletea/lipgloss/glamour dependencies for TUI
- Create internal/tui package with EventWriter, styles, and bubbletea Model
- Support streaming output display in conversation window
- Add right panel with statistics and active agent status
- Implement multi-agent collaboration with sub-agents
- Add AgentCallTool for delegating tasks to sub-agents
- Support parallel tool execution with goroutines
- Auto-discover sub-agents from ~/.orca/prompts/ directory
- Fix orchestrator routing based on msg.To field
- Add non-blocking event writer with timeout to prevent blocking
2026-05-10 14:28:17 +08:00

203 lines
4.7 KiB
Go

// Package session 为 Orca 框架提供对话会话管理功能。
//
// 会话持久化对话历史记录,并为 LLM 交互提供基于上下文窗口的
// 检索功能。默认存储后端使用 JSONL 文件,支持 O(1) 追加写入。
package session
import (
"fmt"
"sync"
"time"
"github.com/orca/orca/pkg/bus"
)
// Manager 提供高级会话生命周期操作。
//
// 它使用 Store 包装缓存、上下文窗口管理和
// 消息总线上的事件发布。
type Manager struct {
store Store
bus bus.MessageBus
cache map[string]*Session
mu sync.RWMutex
}
// NewManager 使用给定的存储和可选消息总线创建一个新的会话管理器。
func NewManager(store Store, mb bus.MessageBus) *Manager {
return &Manager{
store: store,
bus: mb,
cache: make(map[string]*Session),
}
}
// CreateSession 使用给定的 ID 和可选元数据创建一个新会话。
func (m *Manager) CreateSession(id string, metadata map[string]string) (*Session, error) {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.cache[id]; exists {
return nil, fmt.Errorf("session %q already exists", id)
}
now := time.Now()
session := &Session{
ID: id,
Status: SessionActive,
Messages: make([]SessionMessage, 0),
CreatedAt: now,
UpdatedAt: now,
Metadata: metadata,
}
m.cache[id] = session
// Publish session created event
if m.bus != nil {
m.bus.Publish("session.created", bus.Message{
ID: "session-" + id,
Type: bus.MsgTypeSystem,
From: "session.manager",
Content: map[string]interface{}{"session_id": id},
})
}
return session, nil
}
// GetSession 通过 ID 检索会话,先检查缓存,然后检查存储。
func (m *Manager) GetSession(id string) (*Session, error) {
m.mu.RLock()
session, ok := m.cache[id]
m.mu.RUnlock()
if ok {
return session, nil
}
// Try to load from store
messages, err := m.store.Load(id)
if err != nil {
return nil, fmt.Errorf("failed to load session %q: %w", id, err)
}
// Check if we can determine created/updated timestamps from messages
var createdAt, updatedAt time.Time
if len(messages) > 0 {
createdAt = messages[0].Timestamp
updatedAt = messages[len(messages)-1].Timestamp
}
if createdAt.IsZero() {
createdAt = time.Now()
}
if updatedAt.IsZero() {
updatedAt = time.Now()
}
session = &Session{
ID: id,
Status: SessionActive,
Messages: messages,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
}
m.mu.Lock()
m.cache[id] = session
m.mu.Unlock()
return session, nil
}
// AddMessage 将消息追加到会话并持久化。
func (m *Manager) AddMessage(sessionID string, role MessageRole, content string, metadata map[string]string) (*SessionMessage, error) {
msg := SessionMessage{
Role: role,
Content: content,
Timestamp: time.Now(),
Metadata: metadata,
}
if err := m.store.Save(sessionID, msg); err != nil {
return nil, fmt.Errorf("failed to save message to session %q: %w", sessionID, err)
}
// Upsert cache
m.mu.Lock()
if session, ok := m.cache[sessionID]; ok {
session.Messages = append(session.Messages, msg)
session.UpdatedAt = msg.Timestamp
} else {
m.cache[sessionID] = &Session{
ID: sessionID,
Status: SessionActive,
Messages: []SessionMessage{msg},
CreatedAt: msg.Timestamp,
UpdatedAt: msg.Timestamp,
}
}
m.mu.Unlock()
return &msg, nil
}
// GetContext 返回会话中最近的 N 条消息。
// 如果 windowSize <= 0 或 >= 总消息数,则返回所有消息。
func (m *Manager) GetContext(sessionID string, windowSize int) ([]SessionMessage, error) {
session, err := m.GetSession(sessionID)
if err != nil {
return nil, err
}
messages := session.Messages
if windowSize > 0 && windowSize < len(messages) {
return messages[len(messages)-windowSize:], nil
}
return messages, nil
}
// ArchiveSession 归档会话,使其变为只读。
func (m *Manager) ArchiveSession(id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if session, ok := m.cache[id]; ok {
session.Status = SessionArchived
}
if err := m.store.Archive(id); err != nil {
return err
}
// Publish event
if m.bus != nil {
m.bus.Publish("session.archived", bus.Message{
ID: "session-" + id,
Type: bus.MsgTypeSystem,
From: "session.manager",
Content: map[string]interface{}{"session_id": id},
})
}
return nil
}
// DeleteSession 永久移除会话。
func (m *Manager) DeleteSession(id string) error {
m.mu.Lock()
delete(m.cache, id)
m.mu.Unlock()
return m.store.Delete(id)
}
// ListSessions 返回所有已知的会话 ID。
func (m *Manager) ListSessions() ([]string, error) {
return m.store.List()
}
// Store 返回底层存储。
func (m *Manager) Store() Store {
return m.store
}