package session import ( "database/sql" "fmt" "os" "path/filepath" "sync" "time" _ "modernc.org/sqlite" _ "modernc.org/sqlite/vec" ) type SQLiteStore struct { dbPath string db *sql.DB mu sync.RWMutex } func NewSQLiteStore(dbPath string) (*SQLiteStore, error) { dir := filepath.Dir(dbPath) if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to create storage directory: %w", err) } db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000") if err != nil { return nil, fmt.Errorf("failed to open SQLite database: %w", err) } // 限制连接池为单连接,避免 SQLite 并发冲突 // WAL 模式下支持并发读,但写操作仍需串行化 db.SetMaxOpenConns(1) db.SetMaxIdleConns(1) db.SetConnMaxLifetime(0) store := &SQLiteStore{ dbPath: dbPath, db: db, } if err := store.initSchema(); err != nil { db.Close() return nil, fmt.Errorf("failed to initialize schema: %w", err) } return store, nil } func (s *SQLiteStore) initSchema() error { schema := ` CREATE TABLE IF NOT EXISTS main_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system', 'tool')), content TEXT NOT NULL, msg_type TEXT DEFAULT 'normal' CHECK(msg_type IN ('normal', 'fact', 'todo', 'decision', 'preference', 'error')), token_count INTEGER DEFAULT 0, has_embedding BOOLEAN DEFAULT FALSE, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, metadata TEXT ); CREATE INDEX IF NOT EXISTS idx_main_session_time ON main_messages(session_id, timestamp DESC); CREATE INDEX IF NOT EXISTS idx_main_role ON main_messages(role) WHERE role IN ('user', 'assistant'); CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, status TEXT DEFAULT 'active' CHECK(status IN ('active', 'archived')), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, metadata TEXT ); CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status); CREATE TABLE IF NOT EXISTS short_term_memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, content TEXT NOT NULL, source_count INTEGER DEFAULT 1, confidence REAL DEFAULT 0.8 CHECK(confidence BETWEEN 0 AND 1), created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME, UNIQUE(session_id, content) ); CREATE INDEX IF NOT EXISTS idx_stm_session ON short_term_memories(session_id, updated_at DESC); CREATE TABLE IF NOT EXISTS long_term_memories ( id INTEGER PRIMARY KEY AUTOINCREMENT, content TEXT NOT NULL UNIQUE, memory_type TEXT NOT NULL DEFAULT 'fact' CHECK(memory_type IN ('preference', 'fact', 'decision', 'project')), source_session TEXT, confidence REAL NOT NULL DEFAULT 0.8, weight REAL NOT NULL DEFAULT 1.0, tags TEXT, access_count INTEGER NOT NULL DEFAULT 0, last_accessed DATETIME, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, archived INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_ltm_type ON long_term_memories(memory_type, confidence DESC); CREATE INDEX IF NOT EXISTS idx_ltm_weight ON long_term_memories(weight DESC); CREATE INDEX IF NOT EXISTS idx_ltm_archived ON long_term_memories(archived); CREATE VIRTUAL TABLE IF NOT EXISTS vec_long_term_memories USING vec0( memory_id INTEGER PRIMARY KEY, embedding FLOAT[1024] ); CREATE TABLE IF NOT EXISTS memory_usage_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, memory_id INTEGER NOT NULL, session_id TEXT NOT NULL, query TEXT NOT NULL, was_referenced INTEGER NOT NULL DEFAULT 0, used_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_usage_memory ON memory_usage_log(memory_id); CREATE INDEX IF NOT EXISTS idx_usage_session ON memory_usage_log(session_id); CREATE TABLE IF NOT EXISTS dialogue_buffer ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, user_query TEXT NOT NULL, assistant_response TEXT NOT NULL, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_buffer_session ON dialogue_buffer(session_id, created_at DESC); CREATE TABLE IF NOT EXISTS subagent_messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, parent_session_id TEXT NOT NULL, session_id TEXT NOT NULL, agent_name TEXT NOT NULL, role TEXT NOT NULL CHECK(role IN ('assistant', 'system', 'user')), content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_subagent_parent ON subagent_messages(parent_session_id); CREATE INDEX IF NOT EXISTS idx_subagent_session ON subagent_messages(session_id); ` _, err := s.db.Exec(schema) return err } func (s *SQLiteStore) Save(sessionID string, msg SessionMessage) error { s.mu.Lock() defer s.mu.Unlock() tx, err := s.db.Begin() if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() _, err = tx.Exec( `INSERT INTO sessions (id, updated_at) VALUES (?, ?) ON CONFLICT(id) DO UPDATE SET updated_at = ?`, sessionID, msg.Timestamp, msg.Timestamp, ) if err != nil { return fmt.Errorf("failed to upsert session: %w", err) } metadataJSON := "{}" if len(msg.Metadata) > 0 { metadataJSON = fmt.Sprintf("%v", msg.Metadata) } _, err = tx.Exec( `INSERT INTO main_messages (session_id, role, content, timestamp, metadata) VALUES (?, ?, ?, ?, ?)`, sessionID, string(msg.Role), msg.Content, msg.Timestamp, metadataJSON, ) if err != nil { return fmt.Errorf("failed to insert message: %w", err) } return tx.Commit() } func (s *SQLiteStore) Load(sessionID string) ([]SessionMessage, error) { s.mu.RLock() defer s.mu.RUnlock() var status string err := s.db.QueryRow( "SELECT status FROM sessions WHERE id = ?", sessionID, ).Scan(&status) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("session %q not found", sessionID) } return nil, fmt.Errorf("failed to query session: %w", err) } rows, err := s.db.Query( `SELECT role, content, timestamp, metadata FROM main_messages WHERE session_id = ? ORDER BY timestamp ASC, id ASC`, sessionID, ) if err != nil { return nil, fmt.Errorf("failed to query messages: %w", err) } defer rows.Close() var messages []SessionMessage for rows.Next() { var msg SessionMessage var timestampStr string var metadataStr string if err := rows.Scan(&msg.Role, &msg.Content, ×tampStr, &metadataStr); err != nil { return nil, fmt.Errorf("failed to scan message: %w", err) } msg.Timestamp, _ = time.Parse(time.RFC3339, timestampStr) messages = append(messages, msg) } return messages, rows.Err() } func (s *SQLiteStore) List() ([]string, error) { s.mu.RLock() defer s.mu.RUnlock() rows, err := s.db.Query( "SELECT id FROM sessions WHERE status = 'active' ORDER BY updated_at DESC", ) if err != nil { return nil, fmt.Errorf("failed to query sessions: %w", err) } defer rows.Close() var sessions []string for rows.Next() { var id string if err := rows.Scan(&id); err != nil { return nil, fmt.Errorf("failed to scan session: %w", err) } sessions = append(sessions, id) } return sessions, rows.Err() } func (s *SQLiteStore) Exists(sessionID string) (bool, error) { s.mu.RLock() defer s.mu.RUnlock() var count int err := s.db.QueryRow( "SELECT COUNT(*) FROM sessions WHERE id = ?", sessionID, ).Scan(&count) if err != nil { return false, fmt.Errorf("failed to check session: %w", err) } return count > 0, nil } func (s *SQLiteStore) Archive(sessionID string) error { s.mu.Lock() defer s.mu.Unlock() result, err := s.db.Exec( "UPDATE sessions SET status = 'archived' WHERE id = ?", sessionID, ) if err != nil { return fmt.Errorf("failed to archive session: %w", err) } rowsAffected, _ := result.RowsAffected() if rowsAffected == 0 { return fmt.Errorf("session %q not found", sessionID) } return nil } func (s *SQLiteStore) Delete(sessionID string) error { s.mu.Lock() defer s.mu.Unlock() tx, err := s.db.Begin() if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } defer tx.Rollback() _, err = tx.Exec("DELETE FROM main_messages WHERE session_id = ?", sessionID) if err != nil { return fmt.Errorf("failed to delete messages: %w", err) } _, err = tx.Exec("DELETE FROM sessions WHERE id = ?", sessionID) if err != nil { return fmt.Errorf("failed to delete session: %w", err) } return tx.Commit() } func (s *SQLiteStore) SaveSubAgentMessage(parentSessionID, sessionID, agentName string, msg SessionMessage) error { s.mu.Lock() defer s.mu.Unlock() _, err := s.db.Exec( `INSERT INTO subagent_messages (parent_session_id, session_id, agent_name, role, content, timestamp) VALUES (?, ?, ?, ?, ?, ?)`, parentSessionID, sessionID, agentName, string(msg.Role), msg.Content, msg.Timestamp, ) if err != nil { return fmt.Errorf("failed to save subagent message: %w", err) } return nil } func (s *SQLiteStore) LoadSubAgentMessages(sessionID string) ([]SessionMessage, error) { s.mu.RLock() defer s.mu.RUnlock() rows, err := s.db.Query( `SELECT role, content, timestamp FROM subagent_messages WHERE session_id = ? ORDER BY timestamp ASC, id ASC`, sessionID, ) if err != nil { return nil, fmt.Errorf("failed to query subagent messages: %w", err) } defer rows.Close() var messages []SessionMessage for rows.Next() { var msg SessionMessage var timestampStr string if err := rows.Scan(&msg.Role, &msg.Content, ×tampStr); err != nil { return nil, fmt.Errorf("failed to scan subagent message: %w", err) } msg.Timestamp, _ = time.Parse(time.RFC3339, timestampStr) messages = append(messages, msg) } return messages, rows.Err() } func (s *SQLiteStore) Close() error { return s.db.Close() } func (s *SQLiteStore) DB() *sql.DB { return s.db }