broker.gno
3.00 Kb ยท 127 lines
1package message
2
3import (
4 "errors"
5 "strings"
6
7 "gno.land/p/moul/ulist"
8 "gno.land/p/nt/avl"
9)
10
11var (
12 // ErrInvalidTopic is triggered when an invalid topic is used.
13 ErrInvalidTopic = errors.New("invalid topic")
14
15 // ErrRequiredCallback is triggered when subscribing without a callback.
16 ErrRequiredCallback = errors.New("message callback is required")
17
18 // ErrRequiredSubscriptionID is triggered when unsubscribing without an ID.
19 ErrRequiredSubscriptionID = errors.New("message sibscription ID is required")
20
21 // ErrRequiredTopic is triggered when (un)subscribing without a topic.
22 ErrRequiredTopic = errors.New("message topic is required")
23)
24
25// NewBroker creates a new message broker.
26func NewBroker() *Broker {
27 return &Broker{}
28}
29
30// Broker is a message broker that handles subscriptions and message publishing.
31type Broker struct {
32 callbacks avl.Tree // string(topic) -> *ulist.List(Callback)
33}
34
35// Topics returns the list of current subscription topics.
36func (b Broker) Topics() []Topic {
37 var topics []Topic
38 b.callbacks.Iterate("", "", func(k string, _ any) bool {
39 topic := Topic(k)
40 if topic == TopicAll {
41 // Skip catchall topic from the list
42 return false
43 }
44
45 topics = append(topics, topic)
46 return false
47 })
48 return topics
49}
50
51// Subscribe subscribes to messages published for a topic.
52// It returns the callback ID within the topic.
53func (b *Broker) Subscribe(topic Topic, cb Callback) (id int, _ error) {
54 key := strings.TrimSpace(string(topic))
55 if key == "" {
56 return 0, ErrRequiredTopic
57 }
58
59 if cb == nil {
60 return 0, ErrRequiredCallback
61 }
62
63 v, _ := b.callbacks.Get(key)
64 callbacks, _ := v.(*ulist.List)
65 if callbacks == nil {
66 callbacks = ulist.New()
67 }
68
69 callbacks.Append(cb)
70 b.callbacks.Set(key, callbacks)
71 return callbacks.TotalSize(), nil
72}
73
74// Unsubscribe unsubscribes a callback from a message topic.
75// ID is the callback ID within the topic, returned on subscription.
76func (b *Broker) Unsubscribe(topic Topic, id int) (unsubscribed bool, _ error) {
77 key := strings.TrimSpace(string(topic))
78 if key == "" {
79 return false, ErrRequiredTopic
80 }
81
82 if id == 0 {
83 return false, ErrRequiredSubscriptionID
84 }
85
86 v, found := b.callbacks.Get(key)
87 if !found {
88 return false, errors.New("message topic not found: " + key)
89 }
90
91 callbacks := v.(*ulist.List)
92 i := id - 1
93 return callbacks.Delete(i) == nil, nil
94}
95
96// Publish publishes a message for a topic.
97func (b Broker) Publish(topic Topic, data any) error {
98 if topic == TopicAll {
99 return ErrInvalidTopic
100 }
101
102 key := strings.TrimSpace(string(topic))
103 if key == "" {
104 return ErrRequiredTopic
105 }
106
107 iterCb := func(_ int, v any) bool {
108 cb := v.(Callback)
109 cb(Message{topic, data})
110 return false
111 }
112
113 // Trigger callbacks subscribed to current topic
114 v, found := b.callbacks.Get(key)
115 if found {
116 callbacks := v.(*ulist.List)
117 callbacks.Iterator(0, callbacks.Size(), iterCb)
118 }
119
120 // Trigger callbacks subscribed to all topics
121 v, found = b.callbacks.Get(string(TopicAll))
122 if found {
123 callbacks := v.(*ulist.List)
124 callbacks.Iterator(0, callbacks.Size(), iterCb)
125 }
126 return nil
127}