369 lines
9.7 KiB
Go
369 lines
9.7 KiB
Go
package session
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "modernc.org/sqlite"
|
|
_ "modernc.org/sqlite/vec"
|
|
)
|
|
|
|
type SQLiteStore struct {
|
|
dbPath string
|
|
db *sql.DB
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewSQLiteStore(dbPath string) (*SQLiteStore, error) {
|
|
dir := filepath.Dir(dbPath)
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create storage directory: %w", err)
|
|
}
|
|
|
|
db, err := sql.Open("sqlite", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open SQLite database: %w", err)
|
|
}
|
|
|
|
// 限制连接池为单连接,避免 SQLite 并发冲突
|
|
// WAL 模式下支持并发读,但写操作仍需串行化
|
|
db.SetMaxOpenConns(1)
|
|
db.SetMaxIdleConns(1)
|
|
db.SetConnMaxLifetime(0)
|
|
|
|
store := &SQLiteStore{
|
|
dbPath: dbPath,
|
|
db: db,
|
|
}
|
|
|
|
if err := store.initSchema(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("failed to initialize schema: %w", err)
|
|
}
|
|
|
|
return store, nil
|
|
}
|
|
|
|
func (s *SQLiteStore) initSchema() error {
|
|
schema := `
|
|
CREATE TABLE IF NOT EXISTS main_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
role TEXT NOT NULL CHECK(role IN ('user', 'assistant', 'system', 'tool')),
|
|
content TEXT NOT NULL,
|
|
msg_type TEXT DEFAULT 'normal' CHECK(msg_type IN ('normal', 'fact', 'todo', 'decision', 'preference', 'error')),
|
|
token_count INTEGER DEFAULT 0,
|
|
has_embedding BOOLEAN DEFAULT FALSE,
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
metadata TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_main_session_time ON main_messages(session_id, timestamp DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_main_role ON main_messages(role) WHERE role IN ('user', 'assistant');
|
|
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
status TEXT DEFAULT 'active' CHECK(status IN ('active', 'archived')),
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
metadata TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_sessions_status ON sessions(status);
|
|
|
|
CREATE TABLE IF NOT EXISTS short_term_memories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
source_count INTEGER DEFAULT 1,
|
|
confidence REAL DEFAULT 0.8 CHECK(confidence BETWEEN 0 AND 1),
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME,
|
|
UNIQUE(session_id, content)
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_stm_session ON short_term_memories(session_id, updated_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS long_term_memories (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
content TEXT NOT NULL UNIQUE,
|
|
memory_type TEXT NOT NULL DEFAULT 'fact' CHECK(memory_type IN ('preference', 'fact', 'decision', 'project')),
|
|
source_session TEXT,
|
|
confidence REAL NOT NULL DEFAULT 0.8,
|
|
weight REAL NOT NULL DEFAULT 1.0,
|
|
tags TEXT,
|
|
access_count INTEGER NOT NULL DEFAULT 0,
|
|
last_accessed DATETIME,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
archived INTEGER NOT NULL DEFAULT 0
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_ltm_type ON long_term_memories(memory_type, confidence DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_ltm_weight ON long_term_memories(weight DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_ltm_archived ON long_term_memories(archived);
|
|
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS vec_long_term_memories USING vec0(
|
|
memory_id INTEGER PRIMARY KEY,
|
|
embedding FLOAT[1024]
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS memory_usage_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
memory_id INTEGER NOT NULL,
|
|
session_id TEXT NOT NULL,
|
|
query TEXT NOT NULL,
|
|
was_referenced INTEGER NOT NULL DEFAULT 0,
|
|
used_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_usage_memory ON memory_usage_log(memory_id);
|
|
CREATE INDEX IF NOT EXISTS idx_usage_session ON memory_usage_log(session_id);
|
|
|
|
CREATE TABLE IF NOT EXISTS dialogue_buffer (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
user_query TEXT NOT NULL,
|
|
assistant_response TEXT NOT NULL,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_buffer_session ON dialogue_buffer(session_id, created_at DESC);
|
|
|
|
CREATE TABLE IF NOT EXISTS subagent_messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
parent_session_id TEXT NOT NULL,
|
|
session_id TEXT NOT NULL,
|
|
agent_name TEXT NOT NULL,
|
|
role TEXT NOT NULL CHECK(role IN ('assistant', 'system', 'user')),
|
|
content TEXT NOT NULL,
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_subagent_parent ON subagent_messages(parent_session_id);
|
|
CREATE INDEX IF NOT EXISTS idx_subagent_session ON subagent_messages(session_id);
|
|
`
|
|
|
|
_, err := s.db.Exec(schema)
|
|
return err
|
|
}
|
|
|
|
func (s *SQLiteStore) Save(sessionID string, msg SessionMessage) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
tx, err := s.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
_, err = tx.Exec(
|
|
`INSERT INTO sessions (id, updated_at) VALUES (?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET updated_at = ?`,
|
|
sessionID, msg.Timestamp, msg.Timestamp,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upsert session: %w", err)
|
|
}
|
|
|
|
metadataJSON := "{}"
|
|
if len(msg.Metadata) > 0 {
|
|
metadataJSON = fmt.Sprintf("%v", msg.Metadata)
|
|
}
|
|
|
|
_, err = tx.Exec(
|
|
`INSERT INTO main_messages (session_id, role, content, timestamp, metadata)
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
sessionID, string(msg.Role), msg.Content, msg.Timestamp, metadataJSON,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert message: %w", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (s *SQLiteStore) Load(sessionID string) ([]SessionMessage, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var status string
|
|
err := s.db.QueryRow(
|
|
"SELECT status FROM sessions WHERE id = ?",
|
|
sessionID,
|
|
).Scan(&status)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, fmt.Errorf("session %q not found", sessionID)
|
|
}
|
|
return nil, fmt.Errorf("failed to query session: %w", err)
|
|
}
|
|
|
|
rows, err := s.db.Query(
|
|
`SELECT role, content, timestamp, metadata FROM main_messages
|
|
WHERE session_id = ?
|
|
ORDER BY timestamp ASC, id ASC`,
|
|
sessionID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query messages: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var messages []SessionMessage
|
|
for rows.Next() {
|
|
var msg SessionMessage
|
|
var timestampStr string
|
|
var metadataStr string
|
|
|
|
if err := rows.Scan(&msg.Role, &msg.Content, ×tampStr, &metadataStr); err != nil {
|
|
return nil, fmt.Errorf("failed to scan message: %w", err)
|
|
}
|
|
|
|
msg.Timestamp, _ = time.Parse(time.RFC3339, timestampStr)
|
|
messages = append(messages, msg)
|
|
}
|
|
|
|
return messages, rows.Err()
|
|
}
|
|
|
|
func (s *SQLiteStore) List() ([]string, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
rows, err := s.db.Query(
|
|
"SELECT id FROM sessions WHERE status = 'active' ORDER BY updated_at DESC",
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query sessions: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []string
|
|
for rows.Next() {
|
|
var id string
|
|
if err := rows.Scan(&id); err != nil {
|
|
return nil, fmt.Errorf("failed to scan session: %w", err)
|
|
}
|
|
sessions = append(sessions, id)
|
|
}
|
|
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
func (s *SQLiteStore) Exists(sessionID string) (bool, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var count int
|
|
err := s.db.QueryRow(
|
|
"SELECT COUNT(*) FROM sessions WHERE id = ?",
|
|
sessionID,
|
|
).Scan(&count)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check session: %w", err)
|
|
}
|
|
|
|
return count > 0, nil
|
|
}
|
|
|
|
func (s *SQLiteStore) Archive(sessionID string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
result, err := s.db.Exec(
|
|
"UPDATE sessions SET status = 'archived' WHERE id = ?",
|
|
sessionID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to archive session: %w", err)
|
|
}
|
|
|
|
rowsAffected, _ := result.RowsAffected()
|
|
if rowsAffected == 0 {
|
|
return fmt.Errorf("session %q not found", sessionID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *SQLiteStore) Delete(sessionID string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
tx, err := s.db.Begin()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to begin transaction: %w", err)
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
_, err = tx.Exec("DELETE FROM main_messages WHERE session_id = ?", sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete messages: %w", err)
|
|
}
|
|
|
|
_, err = tx.Exec("DELETE FROM sessions WHERE id = ?", sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete session: %w", err)
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (s *SQLiteStore) SaveSubAgentMessage(parentSessionID, sessionID, agentName string, msg SessionMessage) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO subagent_messages (parent_session_id, session_id, agent_name, role, content, timestamp)
|
|
VALUES (?, ?, ?, ?, ?, ?)`,
|
|
parentSessionID, sessionID, agentName, string(msg.Role), msg.Content, msg.Timestamp,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save subagent message: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *SQLiteStore) LoadSubAgentMessages(sessionID string) ([]SessionMessage, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
rows, err := s.db.Query(
|
|
`SELECT role, content, timestamp FROM subagent_messages
|
|
WHERE session_id = ?
|
|
ORDER BY timestamp ASC, id ASC`,
|
|
sessionID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query subagent messages: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var messages []SessionMessage
|
|
for rows.Next() {
|
|
var msg SessionMessage
|
|
var timestampStr string
|
|
if err := rows.Scan(&msg.Role, &msg.Content, ×tampStr); err != nil {
|
|
return nil, fmt.Errorf("failed to scan subagent message: %w", err)
|
|
}
|
|
msg.Timestamp, _ = time.Parse(time.RFC3339, timestampStr)
|
|
messages = append(messages, msg)
|
|
}
|
|
return messages, rows.Err()
|
|
}
|
|
|
|
func (s *SQLiteStore) Close() error {
|
|
return s.db.Close()
|
|
}
|
|
|
|
func (s *SQLiteStore) DB() *sql.DB {
|
|
return s.db
|
|
}
|
|
|
|
|