// Package session 为 Orca 框架提供对话会话管理功能。 package session import ( "encoding/json" "fmt" "os" "path/filepath" "strings" "sync" ) // JSONLStore 使用 JSONL 文件实现 Store 接口。 // // 每个会话存储在单独的 {session_id}.jsonl 文件中, // 位于配置的存储目录下。文件中的每一行都是一个 // JSON 编码的 SessionMessage。新消息以 O(1) 时间追加。 type JSONLStore struct { storageDir string mu sync.RWMutex } // NewJSONLStore 使用给定的存储目录创建新的 JSONLStore。 // 如果目录不存在,则创建它。 func NewJSONLStore(storageDir string) (*JSONLStore, error) { if err := os.MkdirAll(storageDir, 0755); err != nil { return nil, fmt.Errorf("failed to create session storage directory %q: %w", storageDir, err) } return &JSONLStore{storageDir: storageDir}, nil } // path 返回给定会话 ID 的完整文件路径。 func (s *JSONLStore) path(sessionID string) string { return filepath.Join(s.storageDir, sessionID+".jsonl") } // archivePath 返回给定会话 ID 的归档文件路径。 func (s *JSONLStore) archivePath(sessionID string) string { return filepath.Join(s.storageDir, sessionID+".jsonl.archived") } // Save 将消息追加到会话的 JSONL 文件。 // 如果文件不存在,则创建它。 // 这是一个 O(1) 追加操作。 func (s *JSONLStore) Save(sessionID string, msg SessionMessage) error { s.mu.Lock() defer s.mu.Unlock() f, err := os.OpenFile(s.path(sessionID), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("failed to open session file for %q: %w", sessionID, err) } defer f.Close() data, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal session message: %w", err) } if _, err := f.Write(append(data, '\n')); err != nil { return fmt.Errorf("failed to write session message: %w", err) } return nil } // Load 按时间顺序检索会话的所有消息。 // 如果会话文件不存在,则返回错误。 func (s *JSONLStore) Load(sessionID string) ([]SessionMessage, error) { s.mu.RLock() defer s.mu.RUnlock() data, err := os.ReadFile(s.path(sessionID)) if err != nil { if os.IsNotExist(err) { // Check archive archiveData, archiveErr := os.ReadFile(s.archivePath(sessionID)) if archiveErr != nil { return nil, fmt.Errorf("session %q not found", sessionID) } data = archiveData } else { return nil, fmt.Errorf("failed to read session file for %q: %w", sessionID, err) } } return parseJSONL(data) } // parseJSONL 将 JSONL 字节切片解析为 SessionMessage 切片。 func parseJSONL(data []byte) ([]SessionMessage, error) { var messages []SessionMessage trimmed := strings.TrimSpace(string(data)) if trimmed == "" { return messages, nil } lines := strings.Split(trimmed, "\n") for _, line := range lines { line = strings.TrimSpace(line) if line == "" { continue } var msg SessionMessage if err := json.Unmarshal([]byte(line), &msg); err != nil { return nil, fmt.Errorf("failed to unmarshal session message: %w", err) } messages = append(messages, msg) } return messages, nil } // List 通过扫描存储目录返回所有会话 ID。 func (s *JSONLStore) List() ([]string, error) { s.mu.RLock() defer s.mu.RUnlock() entries, err := os.ReadDir(s.storageDir) if err != nil { return nil, fmt.Errorf("failed to read storage directory %q: %w", s.storageDir, err) } var sessions []string for _, entry := range entries { name := entry.Name() if strings.HasSuffix(name, ".jsonl") && !strings.HasSuffix(name, ".archived") { sessions = append(sessions, strings.TrimSuffix(name, ".jsonl")) } } return sessions, nil } // Exists 检查会话文件是否存在(活动或已归档)。 func (s *JSONLStore) Exists(sessionID string) (bool, error) { s.mu.RLock() defer s.mu.RUnlock() if _, err := os.Stat(s.path(sessionID)); err == nil { return true, nil } else if !os.IsNotExist(err) { return false, fmt.Errorf("failed to check session %q: %w", sessionID, err) } // Check archive if _, err := os.Stat(s.archivePath(sessionID)); err == nil { return true, nil } else if !os.IsNotExist(err) { return false, fmt.Errorf("failed to check archived session %q: %w", sessionID, err) } return false, nil } // Archive 通过重命名将会话文件移动到归档状态。 func (s *JSONLStore) Archive(sessionID string) error { s.mu.Lock() defer s.mu.Unlock() if err := os.Rename(s.path(sessionID), s.archivePath(sessionID)); err != nil { if os.IsNotExist(err) { return fmt.Errorf("session %q not found", sessionID) } return fmt.Errorf("failed to archive session %q: %w", sessionID, err) } return nil } // Delete 永久移除会话文件及其归档。 func (s *JSONLStore) Delete(sessionID string) error { s.mu.Lock() defer s.mu.Unlock() var lastErr error // Remove active file if err := os.Remove(s.path(sessionID)); err != nil && !os.IsNotExist(err) { lastErr = fmt.Errorf("failed to delete session %q: %w", sessionID, err) } // Also remove archived file if it exists if err := os.Remove(s.archivePath(sessionID)); err != nil && !os.IsNotExist(err) { lastErr = fmt.Errorf("failed to delete archived session %q: %w", sessionID, err) } return lastErr } // StorageDir 返回存储目录路径。 func (s *JSONLStore) StorageDir() string { return s.storageDir }