orca.ai/internal/web/server.go

208 lines
4.2 KiB
Go

package web
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"github.com/orca/orca/pkg/actor"
"github.com/orca/orca/pkg/kernel"
)
type Server struct {
kernel *kernel.Kernel
port int
clients map[string]*SSEClient
clientsMu sync.RWMutex
msgCounter int
}
type SSEClient struct {
ID string
Writer http.ResponseWriter
Flusher http.Flusher
Done chan bool
}
type sseWriter struct {
client *SSEClient
mu sync.Mutex
}
func (w *sseWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
select {
case <-w.client.Done:
return len(p), nil
default:
}
data := string(p)
if strings.TrimSpace(data) == "" {
return len(p), nil
}
lines := strings.Split(data, "\n")
for _, line := range lines {
fmt.Fprintf(w.client.Writer, "data: %s\n", line)
}
fmt.Fprint(w.client.Writer, "\n")
w.client.Flusher.Flush()
return len(p), nil
}
func NewServer(k *kernel.Kernel, port int) *Server {
if port <= 0 {
port = 8081
}
return &Server{
kernel: k,
port: port,
clients: make(map[string]*SSEClient),
}
}
func (s *Server) Start() error {
mux := http.NewServeMux()
mux.HandleFunc("/api/stream", s.handleStream)
mux.HandleFunc("/api/chat", s.handleChat)
mux.HandleFunc("/api/stats", s.handleStats)
mux.HandleFunc("/api/agents", s.handleAgents)
mux.HandleFunc("/", s.handleIndex)
addr := fmt.Sprintf(":%d", s.port)
return http.ListenAndServe(addr, mux)
}
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
s.msgCounter++
clientID := fmt.Sprintf("client-%d", s.msgCounter)
client := &SSEClient{
ID: clientID,
Writer: w,
Flusher: flusher,
Done: make(chan bool),
}
s.clientsMu.Lock()
s.clients[clientID] = client
s.clientsMu.Unlock()
fmt.Fprintf(w, "event: connected\ndata: %s\n\n", clientID)
flusher.Flush()
<-r.Context().Done()
s.clientsMu.Lock()
delete(s.clients, clientID)
s.clientsMu.Unlock()
close(client.Done)
}
func (s *Server) handleChat(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Message string `json:"message"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.clientsMu.RLock()
var client *SSEClient
for _, c := range s.clients {
client = c
break
}
s.clientsMu.RUnlock()
if client != nil {
writer := &sseWriter{client: client}
s.kernel.SetStreamWriter(writer)
}
resp, err := s.kernel.SendMessage("user", "llm", req.Message)
if err != nil {
json.NewEncoder(w).Encode(map[string]string{
"error": err.Error(),
})
return
}
json.NewEncoder(w).Encode(map[string]string{
"response": resp,
})
}
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
stats := map[string]interface{}{
"tools": 0,
"skills": 0,
"agents": 0,
}
if tm := s.kernel.ToolManager(); tm != nil {
stats["tools"] = tm.Count()
}
if sm := s.kernel.SkillManager(); sm != nil {
stats["skills"] = len(sm.ListSkills())
}
if as := s.kernel.ActorSystem(); as != nil {
stats["agents"] = as.AgentCount()
}
json.NewEncoder(w).Encode(stats)
}
func (s *Server) handleAgents(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var agents []map[string]string
if as := s.kernel.ActorSystem(); as != nil {
for _, info := range as.AgentInfos() {
status := "idle"
if info.Status == actor.StatusProcessing {
status = "running"
}
agents = append(agents, map[string]string{
"id": info.ID,
"status": status,
})
}
}
json.NewEncoder(w).Encode(agents)
}
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write([]byte(indexHTML))
}