~emersion/soju-dev

support msgtags in msgstore_fs v1 SUPERSEDED

Calvin Lee: 2
 support msgtags in msgstore_fs
 contrib: add migrate-fs

 10 files changed, 277 insertions(+), 20 deletions(-)
Export patchset (mbox)
How do I use this?

Copy & paste the following snippet into your terminal to import this patchset into git:

curl -s https://lists.sr.ht/~emersion/soju-dev/patches/45565/mbox | git am -3
Learn more about email & git

[PATCH 1/2] support msgtags in msgstore_fs Export this patch

test plan:
```
go test -v ./msgstore/znclog/
=== RUN   TestRoundtrip
=== RUN   TestRoundtrip/with_tags
=== RUN   TestRoundtrip/without_tags
--- PASS: TestRoundtrip (0.00s)
    --- PASS: TestRoundtrip/with_tags (0.00s)
    --- PASS: TestRoundtrip/without_tags (0.00s)
PASS
ok  	git.sr.ht/~emersion/soju/msgstore/znclog	0.003s
```
---
 msgstore/znclog/reader.go             | 18 +++++++++--
 msgstore/znclog/testdata/irc-tags.log |  3 ++
 msgstore/znclog/testdata/irc.log      |  6 ++++
 msgstore/znclog/writer.go             | 25 +++++++++++++--
 msgstore/znclog/znclog_test.go        | 45 +++++++++++++++++++++++++++
 5 files changed, 92 insertions(+), 5 deletions(-)
 create mode 100644 msgstore/znclog/testdata/irc-tags.log
 create mode 100644 msgstore/znclog/testdata/irc.log
 create mode 100644 msgstore/znclog/znclog_test.go

diff --git a/msgstore/znclog/reader.go b/msgstore/znclog/reader.go
index 5d7fe1f..ecf41f4 100644
--- a/msgstore/znclog/reader.go
+++ b/msgstore/znclog/reader.go
@@ -21,8 +21,18 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
	} else if len(line) < timestampPrefixLen {
		return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: too short")
	}
	tags := make(irc.Tags)
	line = line[timestampPrefixLen:]

	if strings.HasPrefix(line, "(") {
		parts := strings.SplitN(line[1:], ") ", 2)
		if len(parts) != 2 {
			return nil, time.Time{}, nil
		}
		tags = irc.ParseTags(parts[0])
		line = parts[1]
	}

	var cmd string
	var prefix *irc.Prefix
	var params []string
@@ -118,6 +128,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
				return nil, time.Time{}, nil
			}
			sender, text = parts[0], parts[1]
		} else if strings.HasPrefix(line, ">") {
			cmd = "TAGMSG"
			sender, text = strings.TrimRight(line[1:], "< "), ""
		} else if strings.HasPrefix(line, "-") {
			cmd = "NOTICE"
			parts := strings.SplitN(line[1:], "- ", 2)
@@ -150,10 +163,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
	year, month, day := ref.Date()
	t := time.Date(year, month, day, hour, minute, second, 0, time.Local)

	tags["time"] = xirc.FormatServerTime(t)
	msg := &irc.Message{
		Tags: map[string]string{
			"time": xirc.FormatServerTime(t),
		},
		Tags:    tags,
		Prefix:  prefix,
		Command: cmd,
		Params:  params,
diff --git a/msgstore/znclog/testdata/irc-tags.log b/msgstore/znclog/testdata/irc-tags.log
new file mode 100644
index 0000000..e8752b9
--- /dev/null
+++ b/msgstore/znclog/testdata/irc-tags.log
@@ -0,0 +1,3 @@
[16:25:15] (msgid=123) <newbie> hi
[16:27:21] (+draft/reply=123;+draft/react=👋) >friend<
[16:28:57] (+draft/reply=123) <friend> newbie: hi!
diff --git a/msgstore/znclog/testdata/irc.log b/msgstore/znclog/testdata/irc.log
new file mode 100644
index 0000000..4acc0f6
--- /dev/null
+++ b/msgstore/znclog/testdata/irc.log
@@ -0,0 +1,6 @@
[05:46:00] *** Quits: user (~User@0.0.0.0) (Ping timeout: 255 seconds)
[05:46:30] *** user_ is now known as user
[05:47:00] <spammer> im a spammer
[05:47:30] *** mod sets mode: +b *!*spammer@1.1.1.*
[05:48:00] *** spammer was kicked by mod (spammer)
[05:48:30] -bot- im a bot
diff --git a/msgstore/znclog/writer.go b/msgstore/znclog/writer.go
index b955edf..c559768 100644
--- a/msgstore/znclog/writer.go
+++ b/msgstore/znclog/writer.go
@@ -11,11 +11,30 @@ import (
)

func MarshalLine(msg *irc.Message, t time.Time) string {
	s := formatMessage(msg)
	s, tags := formatMessage(msg), formatMessageTags(msg)
	if s == "" {
		return ""
	}
	return fmt.Sprintf("[%02d:%02d:%02d] %s", t.Hour(), t.Minute(), t.Second(), s)
	ret := fmt.Sprintf("[%02d:%02d:%02d] ", t.Hour(), t.Minute(), t.Second())

	if tags != "" {
		ret += tags + " "
	}

	return ret + s
}
func formatMessageTags(msg *irc.Message) string {
	tags := make(irc.Tags)
	for k, v := range msg.Tags {
		if k != "time" {
			tags[k] = v
		}
	}
	if encoding := tags.String(); encoding != "" {
		return "(" + encoding + ")"
	}
	return ""

}

// formatMessage formats a message log line. It assumes a well-formed IRC
@@ -61,6 +80,8 @@ func formatMessage(msg *irc.Message) string {
		} else {
			return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
		}
	case "TAGMSG":
		return fmt.Sprintf(">%s<", msg.Prefix.Name)
	default:
		return ""
	}
diff --git a/msgstore/znclog/znclog_test.go b/msgstore/znclog/znclog_test.go
new file mode 100644
index 0000000..61e4589
--- /dev/null
+++ b/msgstore/znclog/znclog_test.go
@@ -0,0 +1,45 @@
package znclog_test

import (
	"bufio"
	"os"
	"testing"
	"time"

	"git.sr.ht/~emersion/soju/msgstore/znclog"
)

func testRoundtrip(t *testing.T, filename string) {
	file, err := os.Open(filename)
	if err != nil {
		t.Fatal(err)
	}
	defer file.Close()

	// we pretend that the irc logs occured today
	ref := time.Now()
	entity := "#test"

	scanner := bufio.NewScanner(file)
	scanner.Split(bufio.ScanLines)
	// Count the words.
	for scanner.Scan() {
		originalLine := scanner.Text()
		msg, time, err := znclog.UnmarshalLine(originalLine, nil, nil, entity, ref, true)
		if err != nil {
			t.Fatal(err)
		}
		roundtrip := znclog.MarshalLine(msg, time)
		if originalLine != roundtrip {
			t.Fatalf("not equal after round trip:\n\t%v\n\t%v", originalLine, roundtrip)
		}
	}
	if err := scanner.Err(); err != nil {
		t.Fatal(err)
	}
}

func TestRoundtrip(t *testing.T) {
	t.Run("with tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc-tags.log") })
	t.Run("without tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc.log")  })
}
-- 
2.42.0
Sorry, but I don't think this is desirable. The fs message store is
specifically designed to be compatible with ZNC, breaking it would not
allow other tools which also support this format to continue to work
correctly.

[PATCH 2/2] contrib: add migrate-fs Export this patch

This commit adds a command for migrating messages to the filesystem
backend from a database. Although the logic is rather generic, it
shouldn't replace `migrate-logs` as the latter has less of a chance of
loosing data by walking the file tree manually.

Unfortunately this command can be lossy, as CHATHISTORY with
second-granularity can miss messages. In order to remedy this,
`migrate-fs` tries to use database IDs of messages instead of time
whenever possible.
---
 contrib/migrate-fs/main.go | 150 +++++++++++++++++++++++++++++++++++++
 database/database.go       |  10 ++-
 database/postgres.go       |  12 +--
 database/sqlite.go         |  12 +--
 msgstore/db.go             |  16 +++-
 5 files changed, 185 insertions(+), 15 deletions(-)
 create mode 100644 contrib/migrate-fs/main.go

diff --git a/contrib/migrate-fs/main.go b/contrib/migrate-fs/main.go
new file mode 100644
index 0000000..aebdf41
--- /dev/null
+++ b/contrib/migrate-fs/main.go
@@ -0,0 +1,150 @@
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"strings"
	"time"

	"git.sr.ht/~emersion/soju/database"
	"git.sr.ht/~emersion/soju/msgstore"
)

const usage = `usage: migrate-fs <source logs> <destination directory>

Migrates existing Soju logs from the database to the filesystem. The source is
specified in the format of "driver:source" where driver is sqlite3 or postgres
and destination is specified in the format of "fs:path" as would be specified in
the 'message-store' option of the Soju config file.

Options:

  -help               Show this help message
`
const (
	targetPageSize  = 100
	messagePageSize = 1000
)

func init() {
	flag.Usage = func() {
		fmt.Fprint(flag.CommandLine.Output(), usage)
	}
}

func migrateTarget(ctx context.Context, db database.Database, user *database.User, network *database.Network, target string, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error {
	log.Printf("\t%s\n", target)
	start := time.Unix(0, 0)
	end := time.Now()
	opts := &msgstore.LoadMessageOptions{
		Network: network,
		Entity: target,
		Limit: messagePageSize,
		Events: true,
	}
	var lastId int64
	for {
		opts := &database.MessageOptions{
			AfterID:  lastId,
			BeforeTime: end,
			Limit:      messagePageSize,
			Events:     true,
		}
		if lastId == 0 {
			opts.AfterTime = start
		} else {
			opts.AfterID = lastId
		}

		msgs, err := db.ListMessages(ctx, network.ID, target, opts)

		if err != nil {
			return err
		}
		if len(msgs) == 0 {
			break
		}
		for _, msg := range msgs {
			lastId = msg.Id
			if _, err = fsStore.Append(network, target, &msg.Message); err != nil {
				return err
			}
		}
	}

	return nil
}

func migrateNetwork(ctx context.Context, db database.Database, user *database.User, network *database.Network, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error {
	log.Printf("Migrating logs for network: %s\n", network.Name)

	start := time.Unix(0, 0)
	end := time.Now()
	for {
		targets, err := dbStore.ListTargets(ctx, network, start, end, targetPageSize, true)
		if err != nil {
			return err
		}
		if len(targets) == 0 {
			break
		}
		for _, target := range targets {
			if start.Before(target.LatestMessage) {
				start = target.LatestMessage
			}
			if err = migrateTarget(ctx, db, user, network, target.Name, fsStore, dbStore); err != nil {
				return err
			}
		}
	}
	return nil
}

func main() {
	flag.Parse()

	ctx := context.Background()

	dbParams := strings.Split(flag.Arg(0), ":")
	fsparams := strings.Split(flag.Arg(1), ":")

	if len(dbParams) != 2 {
		log.Fatalf("database not properly specified: %s", flag.Arg(1))
	}
	if len(fsparams) != 2 || fsparams[0] != "fs" {
		log.Fatalf("fs not properly specified: %s", flag.Arg(1))
	}
	logRoot := fsparams[1]

	db, err := database.Open(dbParams[0], dbParams[1])
	if err != nil {
		log.Fatalf("failed to open database: %v", err)
	}
	defer db.Close()

	users, err := db.ListUsers(ctx)
	if err != nil {
		log.Fatalf("unable to get users: %v", err)
	}

	dbStore := msgstore.NewDBStore(db)

	for _, user := range users {
		log.Printf("Migrating logs for user: %s\n", user.Username)

		fsStore := msgstore.NewFSStore(logRoot, &user)

		networks, err := db.ListNetworks(ctx, user.ID)
		if err != nil {
			log.Fatalf("unable to get networks for user: #%d %s", user.ID, user.Username)
		}

		for _, network := range networks {
			if err := migrateNetwork(ctx, db, &user, &network, fsStore, dbStore); err != nil {
				log.Fatalf("migrating %v: %v", network.Name, err)
			}
		}
	}
}
diff --git a/database/database.go b/database/database.go
index 15c04b9..965b323 100644
--- a/database/database.go
+++ b/database/database.go
@@ -29,6 +29,14 @@ type MessageOptions struct {
	TakeLast   bool
}

type Message struct {
	irc.Message
	Time time.Time
	Id    int64
}



type Database interface {
	Close() error
	Stats(ctx context.Context) (*DatabaseStats, error)
@@ -62,7 +70,7 @@ type Database interface {
	GetMessageLastID(ctx context.Context, networkID int64, name string) (int64, error)
	StoreMessage(ctx context.Context, networkID int64, name string, msg *irc.Message) (int64, error)
	ListMessageLastPerTarget(ctx context.Context, networkID int64, options *MessageOptions) ([]MessageTarget, error)
	ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error)
	ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error)
}

type MetricsCollectorDatabase interface {
diff --git a/database/postgres.go b/database/postgres.go
index e18cc05..f1419c4 100644
--- a/database/postgres.go
+++ b/database/postgres.go
@@ -1050,7 +1050,7 @@ func (db *PostgresDB) ListMessageLastPerTarget(ctx context.Context, networkID in
	return l, nil
}

func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
	ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
	defer cancel()

@@ -1059,7 +1059,7 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
		name,
	}
	query := `
		SELECT m.raw
		SELECT m.id, m.raw, m.time
		FROM "Message" AS m, "MessageTarget" AS t
		WHERE m.target = t.id AND t.network = $1 AND t.target = $2 `
	if options.AfterID > 0 {
@@ -1101,10 +1101,11 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
	}
	defer rows.Close()

	var l []*irc.Message
	var l []*Message
	for rows.Next() {
		var raw string
		if err := rows.Scan(&raw); err != nil {
		var message Message
		if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil {
			return nil, err
		}

@@ -1112,8 +1113,9 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
		if err != nil {
			return nil, err
		}
		message.Message = *msg

		l = append(l, msg)
		l = append(l, &message)
	}
	if err := rows.Err(); err != nil {
		return nil, err
diff --git a/database/sqlite.go b/database/sqlite.go
index 99c2f05..d5ae2cb 100644
--- a/database/sqlite.go
+++ b/database/sqlite.go
@@ -1321,12 +1321,12 @@ func (db *SqliteDB) ListMessageLastPerTarget(ctx context.Context, networkID int6
	return l, nil
}

func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
	ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
	defer cancel()

	query := `
		SELECT m.raw
		SELECT m.id, m.raw, m.time
		FROM Message AS m, MessageTarget AS t
		WHERE m.target = t.id AND t.network = :network AND t.target = :target `
	if options.AfterID > 0 {
@@ -1371,10 +1371,11 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri
	}
	defer rows.Close()

	var l []*irc.Message
	var l []*Message
	for rows.Next() {
		var raw string
		if err := rows.Scan(&raw); err != nil {
		var message Message
		if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil {
			return nil, err
		}

@@ -1382,8 +1383,9 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri
		if err != nil {
			return nil, err
		}
		message.Message = *msg

		l = append(l, msg)
		l = append(l, &message)
	}
	if err := rows.Err(); err != nil {
		return nil, err
diff --git a/msgstore/db.go b/msgstore/db.go
index 289253a..18a428a 100644
--- a/msgstore/db.go
+++ b/msgstore/db.go
@@ -17,6 +17,14 @@ func (dbMsgID) msgIDType() msgIDType {
	return msgIDDB
}

func forgetDBSlice(l []*database.Message) []*irc.Message {
	ret := make([]*irc.Message, len(l))
	for i, m := range l {
		ret[i] = &m.Message
	}
	return ret
}

func parseDBMsgID(s string) (msgID int64, err error) {
	var id dbMsgID
	_, _, err = ParseMsgID(s, &id)
@@ -77,7 +85,7 @@ func (ms *dbMessageStore) LoadLatestID(ctx context.Context, id string, options *
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
@@ -119,7 +127,7 @@ func (ms *dbMessageStore) LoadBeforeTime(ctx context.Context, start, end time.Ti
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Time, options *LoadMessageOptions) ([]*irc.Message, error) {
@@ -132,7 +140,7 @@ func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Tim
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, options *SearchMessageOptions) ([]*irc.Message, error) {
@@ -147,5 +155,5 @@ func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network,
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}
-- 
2.42.0