package bus import ( "errors" "fmt" "sync" "sync/atomic" ) // Handler is a callback function that processes a delivered message. type Handler func(Message) // Subscription represents an active subscription to a message bus topic. type Subscription interface { ID() string Topic() string Unsubscribe() } // MessageBus is the central communication hub of the Orca framework. // // It uses a publish/subscribe pattern built on Go channels. Components // publish messages to named topics, and all subscribers to that topic // receive the message asynchronously. type MessageBus interface { // Publish sends a message to all active subscribers of the given topic. Publish(topic string, msg Message) error // Subscribe registers a handler for the given topic. Subscribe(topic string, handler Handler) (Subscription, error) // Close gracefully shuts down the bus, cleaning up all subscriptions. Close() error } // subscription implements the Subscription interface. type subscription struct { id string topic string ch chan Message bus *messageBus active *atomic.Bool } func (s *subscription) ID() string { return s.id } func (s *subscription) Topic() string { return s.topic } func (s *subscription) Unsubscribe() { s.bus.unsubscribe(s) } // messageBus is the channel-based implementation of MessageBus. type messageBus struct { mu sync.RWMutex topics map[string][]*subscription nextID int64 closed bool } // New creates a new message bus instance. func New() MessageBus { return &messageBus{ topics: make(map[string][]*subscription), } } // Publish sends a message to all subscribers of the given topic. // The send is non-blocking: if a subscriber's channel buffer is full, // the message is dropped for that subscriber. func (mb *messageBus) Publish(topic string, msg Message) error { mb.mu.RLock() defer mb.mu.RUnlock() if mb.closed { return errors.New("message bus is closed") } subs, ok := mb.topics[topic] if !ok { return nil } for _, sub := range subs { if sub.active.Load() { select { case sub.ch <- msg: default: } } } return nil } // Subscribe adds a handler for the given topic. func (mb *messageBus) Subscribe(topic string, handler Handler) (Subscription, error) { mb.mu.Lock() defer mb.mu.Unlock() if mb.closed { return nil, errors.New("message bus is closed") } id := fmt.Sprintf("sub-%d", atomic.AddInt64(&mb.nextID, 1)) sub := &subscription{ id: id, topic: topic, ch: make(chan Message, 64), bus: mb, active: &atomic.Bool{}, } sub.active.Store(true) mb.topics[topic] = append(mb.topics[topic], sub) go sub.deliver(handler) return sub, nil } // deliver reads messages from the subscription channel and calls the handler. func (s *subscription) deliver(handler Handler) { for msg := range s.ch { if !s.active.Load() { return } handler(msg) } } // unsubscribe removes a subscription from the bus and closes its channel. func (mb *messageBus) unsubscribe(sub *subscription) { mb.mu.Lock() defer mb.mu.Unlock() sub.active.Store(false) subs, ok := mb.topics[sub.Topic()] if !ok { return } for i, s := range subs { if s.ID() == sub.ID() { mb.topics[sub.Topic()] = append(subs[:i], subs[i+1:]...) close(s.ch) return } } } // Close shuts down the bus, unsubscribing all active subscriptions. func (mb *messageBus) Close() error { mb.mu.Lock() defer mb.mu.Unlock() if mb.closed { return nil } mb.closed = true for topic, subs := range mb.topics { for _, sub := range subs { sub.active.Store(false) close(sub.ch) } delete(mb.topics, topic) } return nil }