208 lines
4.2 KiB
Go
208 lines
4.2 KiB
Go
package web
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/orca/orca/pkg/actor"
|
|
"github.com/orca/orca/pkg/kernel"
|
|
)
|
|
|
|
type Server struct {
|
|
kernel *kernel.Kernel
|
|
port int
|
|
clients map[string]*SSEClient
|
|
clientsMu sync.RWMutex
|
|
msgCounter int
|
|
}
|
|
|
|
type SSEClient struct {
|
|
ID string
|
|
Writer http.ResponseWriter
|
|
Flusher http.Flusher
|
|
Done chan bool
|
|
}
|
|
|
|
type sseWriter struct {
|
|
client *SSEClient
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (w *sseWriter) Write(p []byte) (n int, err error) {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
select {
|
|
case <-w.client.Done:
|
|
return len(p), nil
|
|
default:
|
|
}
|
|
|
|
data := string(p)
|
|
if strings.TrimSpace(data) == "" {
|
|
return len(p), nil
|
|
}
|
|
|
|
lines := strings.Split(data, "\n")
|
|
for _, line := range lines {
|
|
fmt.Fprintf(w.client.Writer, "data: %s\n", line)
|
|
}
|
|
fmt.Fprint(w.client.Writer, "\n")
|
|
w.client.Flusher.Flush()
|
|
return len(p), nil
|
|
}
|
|
|
|
func NewServer(k *kernel.Kernel, port int) *Server {
|
|
if port <= 0 {
|
|
port = 8081
|
|
}
|
|
return &Server{
|
|
kernel: k,
|
|
port: port,
|
|
clients: make(map[string]*SSEClient),
|
|
}
|
|
}
|
|
|
|
func (s *Server) Start() error {
|
|
mux := http.NewServeMux()
|
|
|
|
mux.HandleFunc("/api/stream", s.handleStream)
|
|
mux.HandleFunc("/api/chat", s.handleChat)
|
|
mux.HandleFunc("/api/stats", s.handleStats)
|
|
mux.HandleFunc("/api/agents", s.handleAgents)
|
|
mux.HandleFunc("/", s.handleIndex)
|
|
|
|
addr := fmt.Sprintf(":%d", s.port)
|
|
return http.ListenAndServe(addr, mux)
|
|
}
|
|
|
|
func (s *Server) handleStream(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
s.msgCounter++
|
|
clientID := fmt.Sprintf("client-%d", s.msgCounter)
|
|
client := &SSEClient{
|
|
ID: clientID,
|
|
Writer: w,
|
|
Flusher: flusher,
|
|
Done: make(chan bool),
|
|
}
|
|
|
|
s.clientsMu.Lock()
|
|
s.clients[clientID] = client
|
|
s.clientsMu.Unlock()
|
|
|
|
fmt.Fprintf(w, "event: connected\ndata: %s\n\n", clientID)
|
|
flusher.Flush()
|
|
|
|
<-r.Context().Done()
|
|
|
|
s.clientsMu.Lock()
|
|
delete(s.clients, clientID)
|
|
s.clientsMu.Unlock()
|
|
close(client.Done)
|
|
}
|
|
|
|
func (s *Server) handleChat(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var req struct {
|
|
Message string `json:"message"`
|
|
}
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
s.clientsMu.RLock()
|
|
var client *SSEClient
|
|
for _, c := range s.clients {
|
|
client = c
|
|
break
|
|
}
|
|
s.clientsMu.RUnlock()
|
|
|
|
if client != nil {
|
|
writer := &sseWriter{client: client}
|
|
s.kernel.SetStreamWriter(writer)
|
|
}
|
|
|
|
resp, err := s.kernel.SendMessage("user", "llm", req.Message)
|
|
if err != nil {
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(map[string]string{
|
|
"response": resp,
|
|
})
|
|
}
|
|
|
|
func (s *Server) handleStats(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
stats := map[string]interface{}{
|
|
"tools": 0,
|
|
"skills": 0,
|
|
"agents": 0,
|
|
}
|
|
|
|
if tm := s.kernel.ToolManager(); tm != nil {
|
|
stats["tools"] = tm.Count()
|
|
}
|
|
if sm := s.kernel.SkillManager(); sm != nil {
|
|
stats["skills"] = len(sm.ListSkills())
|
|
}
|
|
if as := s.kernel.ActorSystem(); as != nil {
|
|
stats["agents"] = as.AgentCount()
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(stats)
|
|
}
|
|
|
|
func (s *Server) handleAgents(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
var agents []map[string]string
|
|
if as := s.kernel.ActorSystem(); as != nil {
|
|
for _, info := range as.AgentInfos() {
|
|
status := "idle"
|
|
if info.Status == actor.StatusProcessing {
|
|
status = "running"
|
|
}
|
|
agents = append(agents, map[string]string{
|
|
"id": info.ID,
|
|
"status": status,
|
|
})
|
|
}
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(agents)
|
|
}
|
|
|
|
func (s *Server) handleIndex(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path != "/" {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.Write([]byte(indexHTML))
|
|
}
|