~emersion/soju-dev

Add support for MONITOR v1 APPLIED

Simon Ser: 1
 Add support for MONITOR

 4 files changed, 234 insertions(+), 3 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~emersion/soju-dev/patches/26341/mbox | git am -3
Learn more about email & git
View this thread in the archives

[PATCH] Add support for MONITOR Export this patch

Add support for MONITOR in single-upstream mode.

Each downstream has its own set of monitored targets. These sets
are merged together to compute the MONITOR commands to send to
upstream.

Each upstream has a set of monitored targets accepted by the server
alongside with their status (online/offline). This is used to
directly send replies to downstreams adding a target another
downstream has already added, and send MONITOR S[TATUS] replies.

Co-authored-by: delthas <delthas@dille.cc>
---

This is a reworked version of the GitHub pull request [1] sent by
delthas.

[1]: https://github.com/emersion/soju/pull/34

 downstream.go | 87 +++++++++++++++++++++++++++++++++++++++++++++++
 irc.go        | 40 ++++++++++++++++++++++
 upstream.go   | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++
 user.go       | 16 +++++++--
 4 files changed, 234 insertions(+), 3 deletions(-)

diff --git a/downstream.go b/downstream.go
index 2d80d9df0b00..3d6eb3b26069 100644
--- a/downstream.go
+++ b/downstream.go
@@ -227,6 +227,7 @@ var passthroughIsupport = map[string]bool{
	"MAXLIST":       true,
	"MAXTARGETS":    true,
	"MODES":         true,
	"MONITOR":       true,
	"NAMELEN":       true,
	"NETWORK":       true,
	"NICKLEN":       true,
@@ -263,6 +264,8 @@ type downstreamConn struct {

	lastBatchRef uint64

	monitored casemapMap

	saslServer sasl.Server
}

@@ -275,6 +278,7 @@ func newDownstreamConn(srv *Server, ic ircConn, id uint64) *downstreamConn {
		id:            id,
		supportedCaps: make(map[string]string),
		caps:          make(map[string]bool),
		monitored:     newCasemapMap(0),
	}
	dc.hostname = remoteAddr
	if host, _, err := net.SplitHostPort(dc.hostname); err == nil {
@@ -2268,6 +2272,89 @@ func (dc *downstreamConn) handleMessageRegistered(msg *irc.Message) error {
			Command: "INVITE",
			Params:  []string{upstreamUser, upstreamChannel},
		})
	case "MONITOR":
		// MONITOR is unsupported in multi-upstream mode
		uc := dc.upstream()
		if uc == nil {
			return newUnknownCommandError(msg.Command)
		}

		var subcommand string
		if err := parseMessageParams(msg, &subcommand); err != nil {
			return err
		}

		switch strings.ToUpper(subcommand) {
		case "+", "-":
			var targets string
			if err := parseMessageParams(msg, nil, &targets); err != nil {
				return err
			}
			for _, target := range strings.Split(targets, ",") {
				if subcommand == "+" {
					// Hard limit, just to avoid having downstreams fill our map
					if len(dc.monitored.innerMap) >= 1000 {
						dc.SendMessage(&irc.Message{
							Prefix:  dc.srv.prefix(),
							Command: irc.ERR_MONLISTFULL,
							Params:  []string{dc.nick, "1000", target, "Bouncer monitor list is full"},
						})
						continue
					}

					dc.monitored.SetValue(target, nil)

					if uc.monitored.Has(target) {
						cmd := irc.RPL_MONOFFLINE
						if online := uc.monitored.Value(target); online {
							cmd = irc.RPL_MONONLINE
						}

						dc.SendMessage(&irc.Message{
							Prefix:  dc.srv.prefix(),
							Command: cmd,
							Params:  []string{dc.nick, target},
						})
					}
				} else {
					dc.monitored.Delete(target)
				}
			}
			uc.updateMonitor()
		case "C": // clear
			dc.monitored = newCasemapMap(0)
			uc.updateMonitor()
		case "L": // list
			// TODO: be less lazy and pack the list
			for _, entry := range dc.monitored.innerMap {
				dc.SendMessage(&irc.Message{
					Prefix:  dc.srv.prefix(),
					Command: irc.RPL_MONLIST,
					Params:  []string{dc.nick, entry.originalKey},
				})
			}
			dc.SendMessage(&irc.Message{
				Prefix:  dc.srv.prefix(),
				Command: irc.RPL_ENDOFMONLIST,
				Params:  []string{dc.nick, "End of MONITOR list"},
			})
		case "S": // status
			// TODO: be less lazy and pack the lists
			for _, entry := range dc.monitored.innerMap {
				target := entry.originalKey

				cmd := irc.RPL_MONOFFLINE
				if online := uc.monitored.Value(target); online {
					cmd = irc.RPL_MONONLINE
				}

				dc.SendMessage(&irc.Message{
					Prefix:  dc.srv.prefix(),
					Command: cmd,
					Params:  []string{dc.nick, target},
				})
			}
		}
	case "CHATHISTORY":
		var subcommand string
		if err := parseMessageParams(msg, &subcommand); err != nil {
diff --git a/irc.go b/irc.go
index ae3ff479315e..5355f1a490a2 100644
--- a/irc.go
+++ b/irc.go
@@ -408,6 +408,36 @@ func generateMOTD(prefix *irc.Prefix, nick string, motd string) []*irc.Message {
	return msgs
}

func generateMonitor(subcmd string, targets []string) []*irc.Message {
	maxLength := maxMessageLength - len("MONITOR "+subcmd+" ")

	var msgs []*irc.Message
	var buf []string
	n := 0
	for _, target := range targets {
		if n+len(target)+1 > maxLength {
			msgs = append(msgs, &irc.Message{
				Command: "MONITOR",
				Params:  []string{subcmd, strings.Join(buf, ",")},
			})
			buf = buf[:0]
			n = 0
		}

		buf = append(buf, target)
		n += len(target) + 1
	}

	if len(buf) > 0 {
		msgs = append(msgs, &irc.Message{
			Command: "MONITOR",
			Params:  []string{subcmd, strings.Join(buf, ",")},
		})
	}

	return msgs
}

type joinSorter struct {
	channels []string
	keys     []string
@@ -634,6 +664,16 @@ func (cm *deliveredCasemapMap) Value(name string) deliveredClientMap {
	return entry.value.(deliveredClientMap)
}

type monitorCasemapMap struct{ casemapMap }

func (cm *monitorCasemapMap) Value(name string) (online bool) {
	entry, ok := cm.innerMap[cm.casemap(name)]
	if !ok {
		return false
	}
	return entry.value.(bool)
}

func isWordBoundary(r rune) bool {
	switch r {
	case '-', '_', '|':
diff --git a/upstream.go b/upstream.go
index eaefff63d3bf..5a70fce469b1 100644
--- a/upstream.go
+++ b/upstream.go
@@ -98,6 +98,7 @@ type upstreamConn struct {
	away          bool
	account       string
	nextLabelID   uint64
	monitored     monitorCasemapMap

	saslClient  sasl.Client
	saslStarted bool
@@ -202,6 +203,7 @@ func connectToUpstream(network *network) (*upstreamConn, error) {
		availableMemberships:     stdMemberships,
		isupport:                 make(map[string]*string),
		pendingLISTDownstreamSet: make(map[uint64]struct{}),
		monitored:                monitorCasemapMap{newCasemapMap(0)},
	}
	return uc, nil
}
@@ -1359,6 +1361,49 @@ func (uc *upstreamConn) handleMessage(msg *irc.Message) error {
				Params:  []string{dc.nick, dc.marshalEntity(uc.network, nick), dc.marshalEntity(uc.network, channel)},
			})
		})
	case irc.RPL_MONONLINE, irc.RPL_MONOFFLINE:
		var targetsStr string
		if err := parseMessageParams(msg, nil, &targetsStr); err != nil {
			return err
		}
		targets := strings.Split(targetsStr, ",")

		online := msg.Command == irc.RPL_MONONLINE
		for _, target := range targets {
			prefix := irc.ParsePrefix(target)
			uc.monitored.SetValue(prefix.Name, online)
		}

		uc.forEachDownstream(func(dc *downstreamConn) {
			for _, target := range targets {
				prefix := irc.ParsePrefix(target)
				if dc.monitored.Has(prefix.Name) {
					dc.SendMessage(&irc.Message{
						Prefix:  dc.srv.prefix(),
						Command: msg.Command,
						Params:  []string{dc.nick, target},
					})
				}
			}
		})
	case irc.ERR_MONLISTFULL:
		var limit, targetsStr string
		if err := parseMessageParams(msg, nil, &limit, &targetsStr); err != nil {
			return err
		}

		targets := strings.Split(targetsStr, ",")
		uc.forEachDownstream(func(dc *downstreamConn) {
			for _, target := range targets {
				if dc.monitored.Has(target) {
					dc.SendMessage(&irc.Message{
						Prefix:  dc.srv.prefix(),
						Command: msg.Command,
						Params:  []string{dc.nick, limit, target},
					})
				}
			}
		})
	case irc.RPL_AWAY:
		var nick, reason string
		if err := parseMessageParams(msg, nil, &nick, &reason); err != nil {
@@ -1863,3 +1908,52 @@ func (uc *upstreamConn) updateChannelAutoDetach(name string) {
	}
	uch.updateAutoDetach(ch.DetachAfter)
}

func (uc *upstreamConn) updateMonitor() {
	add := make(map[string]struct{})
	var addList []string
	seen := make(map[string]struct{})
	uc.forEachDownstream(func(dc *downstreamConn) {
		for targetCM := range dc.monitored.innerMap {
			if !uc.monitored.Has(targetCM) {
				if _, ok := add[targetCM]; !ok {
					addList = append(addList, targetCM)
				}
				add[targetCM] = struct{}{}
			} else {
				seen[targetCM] = struct{}{}
			}
		}
	})

	removeAll := true
	var removeList []string
	for targetCM, entry := range uc.monitored.innerMap {
		if _, ok := seen[targetCM]; ok {
			removeAll = false
		} else {
			removeList = append(removeList, entry.originalKey)
		}
	}

	// TODO: better handle the case where len(uc.monitored) + len(addList)
	// exceeds the limit, probably by immediately sending ERR_MONLISTFULL?

	if removeAll && len(addList) == 0 && len(removeList) > 0 {
		// Optimization when the last MONITOR-aware downstream disconnects
		uc.SendMessage(&irc.Message{
			Command: "MONITOR",
			Params:  []string{"C"},
		})
	} else {
		msgs := generateMonitor("-", removeList)
		msgs = append(msgs, generateMonitor("+", addList)...)
		for _, msg := range msgs {
			uc.SendMessage(msg)
		}
	}

	for _, target := range removeList {
		uc.monitored.Delete(target)
	}
}
diff --git a/user.go b/user.go
index f3db7ebfaadf..9e745f308801 100644
--- a/user.go
+++ b/user.go
@@ -342,13 +342,17 @@ func (net *network) updateCasemapping(newCasemap casemapping) {
	net.casemap = newCasemap
	net.channels.SetCasemapping(newCasemap)
	net.delivered.m.SetCasemapping(newCasemap)
	if net.conn != nil {
		net.conn.channels.SetCasemapping(newCasemap)
		for _, entry := range net.conn.channels.innerMap {
	if uc := net.conn; uc != nil {
		uc.channels.SetCasemapping(newCasemap)
		for _, entry := range uc.channels.innerMap {
			uch := entry.value.(*upstreamChannel)
			uch.Members.SetCasemapping(newCasemap)
		}
		uc.monitored.SetCasemapping(newCasemap)
	}
	net.forEachDownstream(func(dc *downstreamConn) {
		dc.monitored.SetCasemapping(newCasemap)
	})
}

func (net *network) storeClientDeliveryReceipts(clientName string) {
@@ -528,6 +532,7 @@ func (u *user) run() {
			uc.network.conn = uc

			uc.updateAway()
			uc.updateMonitor()

			netIDStr := fmt.Sprintf("%v", uc.network.ID)
			uc.forEachDownstream(func(dc *downstreamConn) {
@@ -597,6 +602,10 @@ func (u *user) run() {
		case eventDownstreamConnected:
			dc := e.dc

			if dc.network != nil {
				dc.monitored.SetCasemapping(dc.network.casemap)
			}

			if err := dc.welcome(); err != nil {
				dc.logger.Printf("failed to handle new registered connection: %v", err)
				break
@@ -629,6 +638,7 @@ func (u *user) run() {

			u.forEachUpstream(func(uc *upstreamConn) {
				uc.updateAway()
				uc.updateMonitor()
			})
		case eventDownstreamMessage:
			msg, dc := e.msg, e.dc

base-commit: d870efa6660b561daa55b806c9e0835597308733
-- 
2.33.1
lgtm!