orca.ai/pkg/bus/bus.go
大森 6b94476347 Initial commit: Orca Agent Framework
Core features:
- Microkernel architecture with Actor model
- Session management with JSONL persistence
- Tool system (5 built-in tools)
- Skill system with SKILL.md parsing
- Sandbox security execution
- Ollama integration with gemma4:e4b
- Prompt-based tool calling (compatible with native function calling)
- REPL interface

11 packages, all tests passing
2026-05-08 00:55:48 +08:00

165 lines
3.6 KiB
Go

package bus
import (
"errors"
"fmt"
"sync"
"sync/atomic"
)
// Handler is a callback function that processes a delivered message.
type Handler func(Message)
// Subscription represents an active subscription to a message bus topic.
type Subscription interface {
ID() string
Topic() string
Unsubscribe()
}
// MessageBus is the central communication hub of the Orca framework.
//
// It uses a publish/subscribe pattern built on Go channels. Components
// publish messages to named topics, and all subscribers to that topic
// receive the message asynchronously.
type MessageBus interface {
// Publish sends a message to all active subscribers of the given topic.
Publish(topic string, msg Message) error
// Subscribe registers a handler for the given topic.
Subscribe(topic string, handler Handler) (Subscription, error)
// Close gracefully shuts down the bus, cleaning up all subscriptions.
Close() error
}
// subscription implements the Subscription interface.
type subscription struct {
id string
topic string
ch chan Message
bus *messageBus
active *atomic.Bool
}
func (s *subscription) ID() string { return s.id }
func (s *subscription) Topic() string { return s.topic }
func (s *subscription) Unsubscribe() { s.bus.unsubscribe(s) }
// messageBus is the channel-based implementation of MessageBus.
type messageBus struct {
mu sync.RWMutex
topics map[string][]*subscription
nextID int64
closed bool
}
// New creates a new message bus instance.
func New() MessageBus {
return &messageBus{
topics: make(map[string][]*subscription),
}
}
// Publish sends a message to all subscribers of the given topic.
// The send is non-blocking: if a subscriber's channel buffer is full,
// the message is dropped for that subscriber.
func (mb *messageBus) Publish(topic string, msg Message) error {
mb.mu.RLock()
defer mb.mu.RUnlock()
if mb.closed {
return errors.New("message bus is closed")
}
subs, ok := mb.topics[topic]
if !ok {
return nil
}
for _, sub := range subs {
if sub.active.Load() {
select {
case sub.ch <- msg:
default:
}
}
}
return nil
}
// Subscribe adds a handler for the given topic.
func (mb *messageBus) Subscribe(topic string, handler Handler) (Subscription, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.closed {
return nil, errors.New("message bus is closed")
}
id := fmt.Sprintf("sub-%d", atomic.AddInt64(&mb.nextID, 1))
sub := &subscription{
id: id,
topic: topic,
ch: make(chan Message, 64),
bus: mb,
active: &atomic.Bool{},
}
sub.active.Store(true)
mb.topics[topic] = append(mb.topics[topic], sub)
go sub.deliver(handler)
return sub, nil
}
// deliver reads messages from the subscription channel and calls the handler.
func (s *subscription) deliver(handler Handler) {
for msg := range s.ch {
if !s.active.Load() {
return
}
handler(msg)
}
}
// unsubscribe removes a subscription from the bus and closes its channel.
func (mb *messageBus) unsubscribe(sub *subscription) {
mb.mu.Lock()
defer mb.mu.Unlock()
sub.active.Store(false)
subs, ok := mb.topics[sub.Topic()]
if !ok {
return
}
for i, s := range subs {
if s.ID() == sub.ID() {
mb.topics[sub.Topic()] = append(subs[:i], subs[i+1:]...)
close(s.ch)
return
}
}
}
// Close shuts down the bus, unsubscribing all active subscriptions.
func (mb *messageBus) Close() error {
mb.mu.Lock()
defer mb.mu.Unlock()
if mb.closed {
return nil
}
mb.closed = true
for topic, subs := range mb.topics {
for _, sub := range subs {
sub.active.Store(false)
close(sub.ch)
}
delete(mb.topics, topic)
}
return nil
}