~emersion/soju-dev

Add WHO cache v1 PROPOSED

delthas: 1
 Add WHO cache

 5 files changed, 367 insertions(+), 14 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~emersion/soju-dev/patches/34864/mbox | git am -3
Learn more about email & git

[PATCH] Add WHO cache Export this patch

From: Simon Ser <contact@emersion.fr>

This adds a new field to upstreams, members, which is a casemapped map
of upstream users known to the soju. The upstream users known to soju
are: self, any monitored user, and any user with whom we share a
channel.

The information stored for each upstream user corresponds to the info
that can be returned by a WHO/WHOX command.

We build the upstream user information both incrementally, capturing
information contained in JOIN and AWAY messages; and with the bulk user
information contained in WHO replies we receive.

This lets us build a user cache that can then be used to return
synthetic WHO responses to later WHO requests by downstreams.

This is useful because some networks (eg Libera) heavily throttle WHO
commands, and without this cache, any downstream connecting would send 1
WHO command per channel, so possibly more than a dozen WHO commands,
which soju then forwarded to the upstream as WHO commands.

With this cache most WHO commands can be cached and avoid sending
WHO commands to the upstream.

In order to cache the "flags" field, we synthetize the field from user
info we get from incremental messages: away status (H/G) and bot status
(B). This could result in incorrect values for proprietary user fields.
Support for the server-operator status (*) is also not supported.

Of note is that it is difficult to obtain a user "connected server"
field incrementally, so clients that want to maximize their WHO cache
hit ratio can use WHOX to only request fields they need, and in
particular not include the server field flag.
---
This builds upon your wip who-cache branch, with a few fixups and the 
logic to prune users we are not interested in anymore, eg after they 
part our last common channel. Tested locally.

 downstream.go |  34 +++++--
 irc.go        |  14 +++
 upstream.go   | 261 +++++++++++++++++++++++++++++++++++++++++++++++++-
 user.go       |   1 +
 xirc/whox.go  |  71 +++++++++++++-
 5 files changed, 367 insertions(+), 14 deletions(-)

diff --git a/downstream.go b/downstream.go
index f9d1d38..a3230c6 100644
--- a/downstream.go
+++ b/downstream.go
@@ -2275,16 +2275,7 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc.
			options = msg.Params[1]
		}

		optionsParts := strings.SplitN(options, "%", 2)
		// TODO: add support for WHOX flags in optionsParts[0]
		var fields, whoxToken string
		if len(optionsParts) == 2 {
			optionsParts := strings.SplitN(optionsParts[1], ",", 2)
			fields = strings.ToLower(optionsParts[0])
			if len(optionsParts) == 2 && strings.Contains(fields, "t") {
				whoxToken = optionsParts[1]
			}
		}
		fields, whoxToken := xirc.ParseWHOXOptions(options)

		// TODO: support mixed bouncer/upstream WHO queries
		maskCM := casemapASCII(mask)
@@ -2353,6 +2344,29 @@ func (dc *downstreamConn) handleMessageRegistered(ctx context.Context, msg *irc.
			return nil
		}

		// Check if we have the reply cached
		if l, ok := uc.getCachedWHO(upstreamMask, fields); ok {
			for _, uu := range l {
				info := xirc.WHOXInfo{
					Token:    whoxToken,
					Username: uu.Username,
					Hostname: uu.Hostname,
					Server:   uu.Server,
					Nickname: uu.Nickname,
					Flags:    uu.Flags,
					Account:  uu.Account,
					Realname: uu.Realname,
				}
				dc.SendMessage(xirc.GenerateWHOXReply(dc.srv.prefix(), dc.nick, fields, &info))
			}
			dc.SendMessage(&irc.Message{
				Prefix:  dc.srv.prefix(),
				Command: irc.RPL_ENDOFWHO,
				Params:  []string{dc.nick, endOfWhoToken, "End of /WHO list"},
			})
			return nil
		}

		params := []string{upstreamMask}
		if options != "" {
			params = append(params, options)
diff --git a/irc.go b/irc.go
index 2afd8ad..732bd9b 100644
--- a/irc.go
+++ b/irc.go
@@ -384,6 +384,20 @@ func (cm *upstreamChannelCasemapMap) ForEach(f func(*upstreamChannel)) {
	}
}

type upstreamUserCasemapMap struct{ casemapMap }

func (cm *upstreamUserCasemapMap) Get(name string) *upstreamUser {
	if v := cm.get(name); v == nil {
		return nil
	} else {
		return v.(*upstreamUser)
	}
}

func (cm *upstreamUserCasemapMap) Set(u *upstreamUser) {
	cm.set(u.Nickname, u)
}

type channelCasemapMap struct{ casemapMap }

func (cm *channelCasemapMap) Get(name string) *database.Channel {
diff --git a/upstream.go b/upstream.go
index 3c22173..bf522c9 100644
--- a/upstream.go
+++ b/upstream.go
@@ -110,6 +110,69 @@ type upstreamBatch struct {
	Label  string
}

type upstreamUser struct {
	Nickname string
	Username string
	Hostname string
	Server   string
	Flags    string
	Account  string
	Realname string
}

func (uu *upstreamUser) hasWHOXFields(fields string) bool {
	for i := 0; i < len(fields); i++ {
		ok := false
		switch fields[i] {
		case 'n':
			ok = uu.Nickname != ""
		case 'u':
			ok = uu.Username != ""
		case 'h':
			ok = uu.Hostname != ""
		case 's':
			ok = uu.Server != ""
		case 'f':
			ok = uu.Flags != ""
		case 'a':
			ok = uu.Account != ""
		case 'r':
			ok = uu.Realname != ""
		case 't', 'c', 'i', 'd', 'l', 'o':
			// we return static values for those fields, so they are always available
			ok = true
		}
		if !ok {
			return false
		}
	}
	return true
}

func (uu *upstreamUser) updateFrom(update *upstreamUser) {
	if update.Nickname != "" {
		uu.Nickname = update.Nickname
	}
	if update.Username != "" {
		uu.Username = update.Username
	}
	if update.Hostname != "" {
		uu.Hostname = update.Hostname
	}
	if update.Server != "" {
		uu.Server = update.Server
	}
	if update.Flags != "" {
		uu.Flags = update.Flags
	}
	if update.Account != "" {
		uu.Account = update.Account
	}
	if update.Realname != "" {
		uu.Realname = update.Realname
	}
}

type pendingUpstreamCommand struct {
	downstreamID uint64
	msg          *irc.Message
@@ -137,6 +200,7 @@ type upstreamConn struct {
	hostname    string
	modes       userModes
	channels    upstreamChannelCasemapMap
	users       upstreamUserCasemapMap
	caps        xirc.CapRegistry
	batches     map[string]upstreamBatch
	away        bool
@@ -260,6 +324,7 @@ func connectToUpstream(ctx context.Context, network *network) (*upstreamConn, er
		network:               network,
		user:                  network.user,
		channels:              upstreamChannelCasemapMap{newCasemapMap()},
		users:                 upstreamUserCasemapMap{newCasemapMap()},
		caps:                  xirc.NewCapRegistry(),
		batches:               make(map[string]upstreamBatch),
		serverPrefix:          &irc.Prefix{Name: "*"},
@@ -959,6 +1024,10 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			}
		})

		uc.cacheUserInfo(msg.Prefix.Name, &upstreamUser{
			Nickname: newNick,
		})

		if !me {
			uc.forEachDownstream(func(dc *downstreamConn) {
				dc.SendMessage(dc.marshalMessage(msg, uc.network))
@@ -975,6 +1044,10 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			return err
		}

		uc.cacheUserInfo(msg.Prefix.Name, &upstreamUser{
			Realname: newRealname,
		})

		// TODO: consider appending this message to logs

		if uc.isOurNick(msg.Prefix.Name) {
@@ -1021,6 +1094,30 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			return err
		}

		uu := &upstreamUser{
			Username: msg.Prefix.User,
			Hostname: msg.Prefix.Host,
		}
		if uc.caps.IsEnabled("away-notify") {
			// we have enough info to build the user flags in a best-effort manner:
			// - the H/G flag is set to Here first, will be replaced by Gone later if the user is AWAY
			uu.Flags = "H"
			// - the B (bot mode) flag is set if the JOIN comes from a bot
			//   note: we have no way to track the user bot mode after they have joined
			//         (we are not notified of the bot mode updates), but this is good enough.
			if _, ok := msg.Tags["bot"]; ok {
				if bot := uc.isupport["BOT"]; bot != nil {
					uu.Flags += *bot
				}
			}
			// TODO: add the server operator flag (`*`) if the message has an oper-tag
		}
		if len(msg.Params) > 2 { // extended-join
			uu.Account = msg.Params[1]
			uu.Realname = msg.Params[2]
		}
		uc.cacheUserInfo(msg.Prefix.Name, uu)

		for _, ch := range strings.Split(channels, ",") {
			if uc.isOurNick(msg.Prefix.Name) {
				uc.logger.Printf("joined channel %q", ch)
@@ -1061,6 +1158,11 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
				if uch := uc.channels.Get(ch); uch != nil {
					uc.channels.Del(ch)
					uch.updateAutoDetach(0)
					uch.Members.ForEach(func(nick string, memberships *xirc.MembershipSet) {
						if !uc.shouldCacheUserInfo(nick) {
							uc.users.Del(nick)
						}
					})
				}
			} else {
				ch, err := uc.getChannel(ch)
@@ -1068,6 +1170,9 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
					return err
				}
				ch.Members.Del(msg.Prefix.Name)
				if !uc.shouldCacheUserInfo(msg.Prefix.Name) {
					uc.users.Del(msg.Prefix.Name)
				}
			}

			chMsg := msg.Copy()
@@ -1082,13 +1187,23 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err

		if uc.isOurNick(user) {
			uc.logger.Printf("kicked from channel %q by %s", channel, msg.Prefix.Name)
			uc.channels.Del(channel)
			if uch := uc.channels.Get(channel); uch != nil {
				uc.channels.Del(channel)
				uch.Members.ForEach(func(nick string, memberships *xirc.MembershipSet) {
					if !uc.shouldCacheUserInfo(nick) {
						uc.users.Del(nick)
					}
				})
			}
		} else {
			ch, err := uc.getChannel(channel)
			if err != nil {
				return err
			}
			ch.Members.Del(user)
			if !uc.shouldCacheUserInfo(user) {
				uc.users.Del(user)
			}
		}

		uc.produce(channel, msg, 0)
@@ -1104,6 +1219,8 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			}
		})

		uc.users.Del(msg.Prefix.Name)

		if msg.Prefix.Name != uc.nick {
			uc.forEachDownstream(func(dc *downstreamConn) {
				dc.SendMessage(dc.marshalMessage(msg, uc.network))
@@ -1426,6 +1543,12 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			return nil
		}

		parts := strings.SplitN(trailing, " ", 2)
		if len(parts) != 2 {
			return fmt.Errorf("malformed RPL_WHOREPLY: failed to parse real name")
		}
		realname := parts[1]

		if channel != "*" {
			channel = dc.marshalEntity(uc.network, channel)
		}
@@ -1435,6 +1558,17 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
			Command: irc.RPL_WHOREPLY,
			Params:  []string{dc.nick, channel, username, host, server, nick, flags, trailing},
		})

		if uc.shouldCacheUserInfo(nick) {
			uc.cacheUserInfo(nick, &upstreamUser{
				Username: username,
				Hostname: host,
				Server:   server,
				Nickname: nick,
				Flags:    flags,
				Realname: realname,
			})
		}
	case xirc.RPL_WHOSPCRPL:
		dc, cmd := uc.currentPendingCommand("WHO")
		if cmd == nil {
@@ -1445,6 +1579,28 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err

		// Only supported in single-upstream mode, so forward as-is
		dc.SendMessage(msg)

		if len(cmd.Params) > 1 {
			fields, _ := xirc.ParseWHOXOptions(cmd.Params[1])
			if strings.IndexByte(fields, 'n') < 0 {
				return nil
			}
			info, err := xirc.ParseWHOXReply(msg, fields)
			if err != nil {
				return err
			}
			if uc.shouldCacheUserInfo(info.Nickname) {
				uc.cacheUserInfo(info.Nickname, &upstreamUser{
					Nickname: info.Nickname,
					Username: info.Username,
					Hostname: info.Hostname,
					Server:   info.Server,
					Flags:    info.Flags,
					Account:  info.Account,
					Realname: info.Realname,
				})
			}
		}
	case irc.RPL_ENDOFWHO:
		var name string
		if err := parseMessageParams(msg, nil, &name); err != nil {
@@ -1642,7 +1798,29 @@ func (uc *upstreamConn) handleMessage(ctx context.Context, msg *irc.Message) err
				Params:  []string{dc.nick, dc.marshalEntity(uc.network, nick), reason},
			})
		})
	case "AWAY", "ACCOUNT":
	case "AWAY":
		// Update user flags, if we already have the flags cached
		uu := uc.users.Get(msg.Prefix.Name)
		if uu != nil && uu.Flags != "" {
			flags := uu.Flags
			if isAway := len(msg.Params) > 0; isAway {
				flags = strings.ReplaceAll(flags, "H", "G")
			} else {
				flags = strings.ReplaceAll(flags, "G", "H")
			}
			uc.cacheUserInfo(msg.Prefix.Name, &upstreamUser{
				Flags: flags,
			})
		}

		uc.forEachDownstream(func(dc *downstreamConn) {
			dc.SendMessage(&irc.Message{
				Prefix:  dc.marshalUserPrefix(uc.network, msg.Prefix),
				Command: msg.Command,
				Params:  msg.Params,
			})
		})
	case "ACCOUNT":
		uc.forEachDownstream(func(dc *downstreamConn) {
			dc.SendMessage(&irc.Message{
				Prefix:  dc.marshalUserPrefix(uc.network, msg.Prefix),
@@ -2272,6 +2450,9 @@ func (uc *upstreamConn) updateMonitor() {

	for _, target := range removeList {
		uc.monitored.Del(target)
		if !uc.shouldCacheUserInfo(target) {
			uc.users.Del(target)
		}
	}
}

@@ -2338,3 +2519,79 @@ func (uc *upstreamConn) tryRegainNick(nick string) {
	})
	uc.pendingRegainNick = wantNick
}

func (uc *upstreamConn) getCachedWHO(mask, fields string) (l []*upstreamUser, ok bool) {
	// Non-extended WHO fields
	if fields == "" {
		fields = "cuhsnfdr"
	}

	// Some extensions are required to keep our cached state in sync. We could
	// require setname for 'r' and chghost for 'h'/'s', but servers usually
	// implement a QUIT/JOIN fallback, so let's not bother.
	if strings.IndexByte(fields, 'a') >= 0 && !uc.caps.IsEnabled("account-notify") {
		return nil, false
	}
	if strings.IndexByte(fields, 'f') >= 0 && !uc.caps.IsEnabled("away-notify") {
		return nil, false
	}

	if uu := uc.users.Get(mask); uu != nil {
		if uu.hasWHOXFields(fields) {
			return []*upstreamUser{uu}, true
		}
	} else if uch := uc.channels.Get(mask); uch != nil {
		l = make([]*upstreamUser, 0, uch.Members.Len())
		uch.Members.ForEach(func(nick string, membershipSet *xirc.MembershipSet) {
			if l == nil {
				return
			}
			uu := uc.users.Get(nick)
			if uu == nil || !uu.hasWHOXFields(fields) {
				l = nil
			} else {
				l = append(l, uu)
			}
		})
		return l, l != nil
	}

	return nil, false
}

func (uc *upstreamConn) cacheUserInfo(nick string, info *upstreamUser) {
	if nick == "" {
		panic("cacheUserInfo called with empty nickname")
	}

	uu := uc.users.Get(nick)
	if uu == nil {
		if info.Nickname != "" {
			nick = info.Nickname
		} else {
			info.Nickname = nick
		}
		uc.users.Set(info)
	} else {
		uu.updateFrom(info)
		if info.Nickname != "" && nick != info.Nickname {
			uc.users.Del(nick)
			uc.users.Set(uu)
		}
	}
}

func (uc *upstreamConn) shouldCacheUserInfo(nick string) bool {
	if uc.isOurNick(nick) {
		return true
	}
	// keep the cached user info only if we MONITOR it, or we share a channel with them
	if uc.monitored.Has(nick) {
		return true
	}
	found := false
	uc.channels.ForEach(func(ch *upstreamChannel) {
		found = found || ch.Members.Has(nick)
	})
	return found
}
diff --git a/user.go b/user.go
index 6d93bc2..28753ef 100644
--- a/user.go
+++ b/user.go
@@ -390,6 +390,7 @@ func (net *network) updateCasemapping(newCasemap casemapping) {
		uc.channels.ForEach(func(uch *upstreamChannel) {
			uch.Members.SetCasemapping(newCasemap)
		})
		uc.users.SetCasemapping(newCasemap)
		uc.monitored.SetCasemapping(newCasemap)
	}
	net.forEachDownstream(func(dc *downstreamConn) {
diff --git a/xirc/whox.go b/xirc/whox.go
index 7fb06b4..562b6cb 100644
--- a/xirc/whox.go
+++ b/xirc/whox.go
@@ -2,6 +2,9 @@ package xirc

import (
	"gopkg.in/irc.v3"

	"fmt"
	"strings"
)

// whoxFields is the list of all WHOX field letters, by order of appearance in
@@ -19,8 +22,8 @@ type WHOXInfo struct {
	Realname string
}

func (info *WHOXInfo) get(field byte) string {
	switch field {
func (info *WHOXInfo) get(k byte) string {
	switch k {
	case 't':
		return info.Token
	case 'c':
@@ -55,6 +58,27 @@ func (info *WHOXInfo) get(field byte) string {
	return ""
}

func (info *WHOXInfo) set(k byte, v string) {
	switch k {
	case 't':
		info.Token = v
	case 'u':
		info.Username = v
	case 'h':
		info.Hostname = v
	case 's':
		info.Server = v
	case 'n':
		info.Nickname = v
	case 'f':
		info.Flags = v
	case 'a':
		info.Account = v
	case 'r':
		info.Realname = v
	}
}

func GenerateWHOXReply(prefix *irc.Prefix, nick, fields string, info *WHOXInfo) *irc.Message {
	if fields == "" {
		return &irc.Message{
@@ -83,3 +107,46 @@ func GenerateWHOXReply(prefix *irc.Prefix, nick, fields string, info *WHOXInfo)
		Params:  append([]string{nick}, values...),
	}
}

func ParseWHOXOptions(options string) (fields, whoxToken string) {
	optionsParts := strings.SplitN(options, "%", 2)
	// TODO: add support for WHOX flags in optionsParts[0]
	if len(optionsParts) == 2 {
		optionsParts := strings.SplitN(optionsParts[1], ",", 2)
		fields = strings.ToLower(optionsParts[0])
		if len(optionsParts) == 2 && strings.Contains(fields, "t") {
			whoxToken = optionsParts[1]
		}
	}
	return fields, whoxToken
}

func ParseWHOXReply(msg *irc.Message, fields string) (*WHOXInfo, error) {
	if msg.Command != RPL_WHOSPCRPL {
		return nil, fmt.Errorf("invalid WHOX reply %q", msg.Command)
	} else if len(msg.Params) == 0 {
		return nil, fmt.Errorf("invalid RPL_WHOSPCRPL: no params")
	}

	fieldSet := make(map[byte]bool)
	for i := 0; i < len(fields); i++ {
		fieldSet[fields[i]] = true
	}

	var info WHOXInfo
	values := msg.Params[1:]
	for _, field := range whoxFields {
		if !fieldSet[field] {
			continue
		}

		if len(values) == 0 {
			return nil, fmt.Errorf("invalid RPL_WHOSPCRPL: missing value for field %q", string(field))
		}

		info.set(field, values[0])
		values = values[1:]
	}

	return &info, nil
}

base-commit: 5e56cc30c538eb88c21cce732e2e357e80098164
-- 
2.17.1