orca.ai/pkg/session/jsonl.go
大森 e18dde7c15 feat: implement TUI with bubbletea and multi-agent collaboration
- Add bubbletea/lipgloss/glamour dependencies for TUI
- Create internal/tui package with EventWriter, styles, and bubbletea Model
- Support streaming output display in conversation window
- Add right panel with statistics and active agent status
- Implement multi-agent collaboration with sub-agents
- Add AgentCallTool for delegating tasks to sub-agents
- Support parallel tool execution with goroutines
- Auto-discover sub-agents from ~/.orca/prompts/ directory
- Fix orchestrator routing based on msg.To field
- Add non-blocking event writer with timeout to prevent blocking
2026-05-10 14:28:17 +08:00

192 lines
5.3 KiB
Go

// Package session 为 Orca 框架提供对话会话管理功能。
package session
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
)
// JSONLStore 使用 JSONL 文件实现 Store 接口。
//
// 每个会话存储在单独的 {session_id}.jsonl 文件中,
// 位于配置的存储目录下。文件中的每一行都是一个
// JSON 编码的 SessionMessage。新消息以 O(1) 时间追加。
type JSONLStore struct {
storageDir string
mu sync.RWMutex
}
// NewJSONLStore 使用给定的存储目录创建新的 JSONLStore。
// 如果目录不存在,则创建它。
func NewJSONLStore(storageDir string) (*JSONLStore, error) {
if err := os.MkdirAll(storageDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create session storage directory %q: %w", storageDir, err)
}
return &JSONLStore{storageDir: storageDir}, nil
}
// path 返回给定会话 ID 的完整文件路径。
func (s *JSONLStore) path(sessionID string) string {
return filepath.Join(s.storageDir, sessionID+".jsonl")
}
// archivePath 返回给定会话 ID 的归档文件路径。
func (s *JSONLStore) archivePath(sessionID string) string {
return filepath.Join(s.storageDir, sessionID+".jsonl.archived")
}
// Save 将消息追加到会话的 JSONL 文件。
// 如果文件不存在,则创建它。
// 这是一个 O(1) 追加操作。
func (s *JSONLStore) Save(sessionID string, msg SessionMessage) error {
s.mu.Lock()
defer s.mu.Unlock()
f, err := os.OpenFile(s.path(sessionID), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open session file for %q: %w", sessionID, err)
}
defer f.Close()
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal session message: %w", err)
}
if _, err := f.Write(append(data, '\n')); err != nil {
return fmt.Errorf("failed to write session message: %w", err)
}
return nil
}
// Load 按时间顺序检索会话的所有消息。
// 如果会话文件不存在,则返回错误。
func (s *JSONLStore) Load(sessionID string) ([]SessionMessage, error) {
s.mu.RLock()
defer s.mu.RUnlock()
data, err := os.ReadFile(s.path(sessionID))
if err != nil {
if os.IsNotExist(err) {
// Check archive
archiveData, archiveErr := os.ReadFile(s.archivePath(sessionID))
if archiveErr != nil {
return nil, fmt.Errorf("session %q not found", sessionID)
}
data = archiveData
} else {
return nil, fmt.Errorf("failed to read session file for %q: %w", sessionID, err)
}
}
return parseJSONL(data)
}
// parseJSONL 将 JSONL 字节切片解析为 SessionMessage 切片。
func parseJSONL(data []byte) ([]SessionMessage, error) {
var messages []SessionMessage
trimmed := strings.TrimSpace(string(data))
if trimmed == "" {
return messages, nil
}
lines := strings.Split(trimmed, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var msg SessionMessage
if err := json.Unmarshal([]byte(line), &msg); err != nil {
return nil, fmt.Errorf("failed to unmarshal session message: %w", err)
}
messages = append(messages, msg)
}
return messages, nil
}
// List 通过扫描存储目录返回所有会话 ID。
func (s *JSONLStore) List() ([]string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
entries, err := os.ReadDir(s.storageDir)
if err != nil {
return nil, fmt.Errorf("failed to read storage directory %q: %w", s.storageDir, err)
}
var sessions []string
for _, entry := range entries {
name := entry.Name()
if strings.HasSuffix(name, ".jsonl") && !strings.HasSuffix(name, ".archived") {
sessions = append(sessions, strings.TrimSuffix(name, ".jsonl"))
}
}
return sessions, nil
}
// Exists 检查会话文件是否存在(活动或已归档)。
func (s *JSONLStore) Exists(sessionID string) (bool, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if _, err := os.Stat(s.path(sessionID)); err == nil {
return true, nil
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("failed to check session %q: %w", sessionID, err)
}
// Check archive
if _, err := os.Stat(s.archivePath(sessionID)); err == nil {
return true, nil
} else if !os.IsNotExist(err) {
return false, fmt.Errorf("failed to check archived session %q: %w", sessionID, err)
}
return false, nil
}
// Archive 通过重命名将会话文件移动到归档状态。
func (s *JSONLStore) Archive(sessionID string) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := os.Rename(s.path(sessionID), s.archivePath(sessionID)); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("session %q not found", sessionID)
}
return fmt.Errorf("failed to archive session %q: %w", sessionID, err)
}
return nil
}
// Delete 永久移除会话文件及其归档。
func (s *JSONLStore) Delete(sessionID string) error {
s.mu.Lock()
defer s.mu.Unlock()
var lastErr error
// Remove active file
if err := os.Remove(s.path(sessionID)); err != nil && !os.IsNotExist(err) {
lastErr = fmt.Errorf("failed to delete session %q: %w", sessionID, err)
}
// Also remove archived file if it exists
if err := os.Remove(s.archivePath(sessionID)); err != nil && !os.IsNotExist(err) {
lastErr = fmt.Errorf("failed to delete archived session %q: %w", sessionID, err)
}
return lastErr
}
// StorageDir 返回存储目录路径。
func (s *JSONLStore) StorageDir() string {
return s.storageDir
}