Search Apps Documentation Source Content File Folder Download Copy Actions Download

broker.gno

3.00 Kb ยท 127 lines
  1package message
  2
  3import (
  4	"errors"
  5	"strings"
  6
  7	"gno.land/p/moul/ulist"
  8	"gno.land/p/nt/avl"
  9)
 10
 11var (
 12	// ErrInvalidTopic is triggered when an invalid topic is used.
 13	ErrInvalidTopic = errors.New("invalid topic")
 14
 15	// ErrRequiredCallback is triggered when subscribing without a callback.
 16	ErrRequiredCallback = errors.New("message callback is required")
 17
 18	// ErrRequiredSubscriptionID is triggered when unsubscribing without an ID.
 19	ErrRequiredSubscriptionID = errors.New("message sibscription ID is required")
 20
 21	// ErrRequiredTopic is triggered when (un)subscribing without a topic.
 22	ErrRequiredTopic = errors.New("message topic is required")
 23)
 24
 25// NewBroker creates a new message broker.
 26func NewBroker() *Broker {
 27	return &Broker{}
 28}
 29
 30// Broker is a message broker that handles subscriptions and message publishing.
 31type Broker struct {
 32	callbacks avl.Tree // string(topic) -> *ulist.List(Callback)
 33}
 34
 35// Topics returns the list of current subscription topics.
 36func (b Broker) Topics() []Topic {
 37	var topics []Topic
 38	b.callbacks.Iterate("", "", func(k string, _ any) bool {
 39		topic := Topic(k)
 40		if topic == TopicAll {
 41			// Skip catchall topic from the list
 42			return false
 43		}
 44
 45		topics = append(topics, topic)
 46		return false
 47	})
 48	return topics
 49}
 50
 51// Subscribe subscribes to messages published for a topic.
 52// It returns the callback ID within the topic.
 53func (b *Broker) Subscribe(topic Topic, cb Callback) (id int, _ error) {
 54	key := strings.TrimSpace(string(topic))
 55	if key == "" {
 56		return 0, ErrRequiredTopic
 57	}
 58
 59	if cb == nil {
 60		return 0, ErrRequiredCallback
 61	}
 62
 63	v, _ := b.callbacks.Get(key)
 64	callbacks, _ := v.(*ulist.List)
 65	if callbacks == nil {
 66		callbacks = ulist.New()
 67	}
 68
 69	callbacks.Append(cb)
 70	b.callbacks.Set(key, callbacks)
 71	return callbacks.TotalSize(), nil
 72}
 73
 74// Unsubscribe unsubscribes a callback from a message topic.
 75// ID is the callback ID within the topic, returned on subscription.
 76func (b *Broker) Unsubscribe(topic Topic, id int) (unsubscribed bool, _ error) {
 77	key := strings.TrimSpace(string(topic))
 78	if key == "" {
 79		return false, ErrRequiredTopic
 80	}
 81
 82	if id == 0 {
 83		return false, ErrRequiredSubscriptionID
 84	}
 85
 86	v, found := b.callbacks.Get(key)
 87	if !found {
 88		return false, errors.New("message topic not found: " + key)
 89	}
 90
 91	callbacks := v.(*ulist.List)
 92	i := id - 1
 93	return callbacks.Delete(i) == nil, nil
 94}
 95
 96// Publish publishes a message for a topic.
 97func (b Broker) Publish(topic Topic, data any) error {
 98	if topic == TopicAll {
 99		return ErrInvalidTopic
100	}
101
102	key := strings.TrimSpace(string(topic))
103	if key == "" {
104		return ErrRequiredTopic
105	}
106
107	iterCb := func(_ int, v any) bool {
108		cb := v.(Callback)
109		cb(Message{topic, data})
110		return false
111	}
112
113	// Trigger callbacks subscribed to current topic
114	v, found := b.callbacks.Get(key)
115	if found {
116		callbacks := v.(*ulist.List)
117		callbacks.Iterator(0, callbacks.Size(), iterCb)
118	}
119
120	// Trigger callbacks subscribed to all topics
121	v, found = b.callbacks.Get(string(TopicAll))
122	if found {
123		callbacks := v.(*ulist.List)
124		callbacks.Iterator(0, callbacks.Size(), iterCb)
125	}
126	return nil
127}