~emersion/soju-dev

This thread contains a patchset. You're looking at the original emails, but you may wish to use the patch review UI. Review patch
1

[PATCH v2 1/2] support msgtags in msgstore_fs

Details
Message ID
<20231011140644.156120-1-pounce@integraldoma.in>
DKIM signature
missing
Download raw message
Patch: +92 -5
test plan:
```
go test -v ./msgstore/znclog/
=== RUN   TestRoundtrip
=== RUN   TestRoundtrip/with_tags
=== RUN   TestRoundtrip/without_tags
--- PASS: TestRoundtrip (0.00s)
    --- PASS: TestRoundtrip/with_tags (0.00s)
    --- PASS: TestRoundtrip/without_tags (0.00s)
PASS
ok  	git.sr.ht/~emersion/soju/msgstore/znclog	0.003s
```
---
 msgstore/znclog/reader.go             | 18 +++++++++--
 msgstore/znclog/testdata/irc-tags.log |  3 ++
 msgstore/znclog/testdata/irc.log      |  6 ++++
 msgstore/znclog/writer.go             | 25 +++++++++++++--
 msgstore/znclog/znclog_test.go        | 45 +++++++++++++++++++++++++++
 5 files changed, 92 insertions(+), 5 deletions(-)
 create mode 100644 msgstore/znclog/testdata/irc-tags.log
 create mode 100644 msgstore/znclog/testdata/irc.log
 create mode 100644 msgstore/znclog/znclog_test.go

diff --git a/msgstore/znclog/reader.go b/msgstore/znclog/reader.go
index 5d7fe1f..ecf41f4 100644
--- a/msgstore/znclog/reader.go
+++ b/msgstore/znclog/reader.go
@@ -21,8 +21,18 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
	} else if len(line) < timestampPrefixLen {
		return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: too short")
	}
	tags := make(irc.Tags)
	line = line[timestampPrefixLen:]

	if strings.HasPrefix(line, "(") {
		parts := strings.SplitN(line[1:], ") ", 2)
		if len(parts) != 2 {
			return nil, time.Time{}, nil
		}
		tags = irc.ParseTags(parts[0])
		line = parts[1]
	}

	var cmd string
	var prefix *irc.Prefix
	var params []string
@@ -118,6 +128,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
				return nil, time.Time{}, nil
			}
			sender, text = parts[0], parts[1]
		} else if strings.HasPrefix(line, ">") {
			cmd = "TAGMSG"
			sender, text = strings.TrimRight(line[1:], "< "), ""
		} else if strings.HasPrefix(line, "-") {
			cmd = "NOTICE"
			parts := strings.SplitN(line[1:], "- ", 2)
@@ -150,10 +163,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
	year, month, day := ref.Date()
	t := time.Date(year, month, day, hour, minute, second, 0, time.Local)

	tags["time"] = xirc.FormatServerTime(t)
	msg := &irc.Message{
		Tags: map[string]string{
			"time": xirc.FormatServerTime(t),
		},
		Tags:    tags,
		Prefix:  prefix,
		Command: cmd,
		Params:  params,
diff --git a/msgstore/znclog/testdata/irc-tags.log b/msgstore/znclog/testdata/irc-tags.log
new file mode 100644
index 0000000..e8752b9
--- /dev/null
+++ b/msgstore/znclog/testdata/irc-tags.log
@@ -0,0 +1,3 @@
[16:25:15] (msgid=123) <newbie> hi
[16:27:21] (+draft/reply=123;+draft/react=👋) >friend<
[16:28:57] (+draft/reply=123) <friend> newbie: hi!
diff --git a/msgstore/znclog/testdata/irc.log b/msgstore/znclog/testdata/irc.log
new file mode 100644
index 0000000..4acc0f6
--- /dev/null
+++ b/msgstore/znclog/testdata/irc.log
@@ -0,0 +1,6 @@
[05:46:00] *** Quits: user (~User@0.0.0.0) (Ping timeout: 255 seconds)
[05:46:30] *** user_ is now known as user
[05:47:00] <spammer> im a spammer
[05:47:30] *** mod sets mode: +b *!*spammer@1.1.1.*
[05:48:00] *** spammer was kicked by mod (spammer)
[05:48:30] -bot- im a bot
diff --git a/msgstore/znclog/writer.go b/msgstore/znclog/writer.go
index b955edf..c559768 100644
--- a/msgstore/znclog/writer.go
+++ b/msgstore/znclog/writer.go
@@ -11,11 +11,30 @@ import (
)

func MarshalLine(msg *irc.Message, t time.Time) string {
	s := formatMessage(msg)
	s, tags := formatMessage(msg), formatMessageTags(msg)
	if s == "" {
		return ""
	}
	return fmt.Sprintf("[%02d:%02d:%02d] %s", t.Hour(), t.Minute(), t.Second(), s)
	ret := fmt.Sprintf("[%02d:%02d:%02d] ", t.Hour(), t.Minute(), t.Second())

	if tags != "" {
		ret += tags + " "
	}

	return ret + s
}
func formatMessageTags(msg *irc.Message) string {
	tags := make(irc.Tags)
	for k, v := range msg.Tags {
		if k != "time" {
			tags[k] = v
		}
	}
	if encoding := tags.String(); encoding != "" {
		return "(" + encoding + ")"
	}
	return ""

}

// formatMessage formats a message log line. It assumes a well-formed IRC
@@ -61,6 +80,8 @@ func formatMessage(msg *irc.Message) string {
		} else {
			return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
		}
	case "TAGMSG":
		return fmt.Sprintf(">%s<", msg.Prefix.Name)
	default:
		return ""
	}
diff --git a/msgstore/znclog/znclog_test.go b/msgstore/znclog/znclog_test.go
new file mode 100644
index 0000000..61e4589
--- /dev/null
+++ b/msgstore/znclog/znclog_test.go
@@ -0,0 +1,45 @@
package znclog_test

import (
	"bufio"
	"os"
	"testing"
	"time"

	"git.sr.ht/~emersion/soju/msgstore/znclog"
)

func testRoundtrip(t *testing.T, filename string) {
	file, err := os.Open(filename)
	if err != nil {
		t.Fatal(err)
	}
	defer file.Close()

	// we pretend that the irc logs occured today
	ref := time.Now()
	entity := "#test"

	scanner := bufio.NewScanner(file)
	scanner.Split(bufio.ScanLines)
	// Count the words.
	for scanner.Scan() {
		originalLine := scanner.Text()
		msg, time, err := znclog.UnmarshalLine(originalLine, nil, nil, entity, ref, true)
		if err != nil {
			t.Fatal(err)
		}
		roundtrip := znclog.MarshalLine(msg, time)
		if originalLine != roundtrip {
			t.Fatalf("not equal after round trip:\n\t%v\n\t%v", originalLine, roundtrip)
		}
	}
	if err := scanner.Err(); err != nil {
		t.Fatal(err)
	}
}

func TestRoundtrip(t *testing.T) {
	t.Run("with tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc-tags.log") })
	t.Run("without tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc.log")  })
}
-- 
2.42.0

[PATCH v2 2/2] contrib: add migrate-fs

Details
Message ID
<20231011140644.156120-2-pounce@integraldoma.in>
In-Reply-To
<20231011140644.156120-1-pounce@integraldoma.in> (view parent)
DKIM signature
missing
Download raw message
Patch: +181 -16
This commit adds a command for migrating messages to the filesystem
backend from a database. Although the logic is rather generic, it
shouldn't replace `migrate-logs` as the latter has less of a chance of
loosing data by walking the file tree manually.

Unfortunately this command can be lossy, as CHATHISTORY with
second-granularity can miss messages. In order to remedy this,
`migrate-fs` tries to use database IDs of messages instead of time
whenever possible.
---
In this version, a formatting change has been fixed and parsing of times
in the sqlite backend now works consistently.

 contrib/migrate-fs/main.go | 144 +++++++++++++++++++++++++++++++++++++
 database/database.go       |  10 ++-
 database/postgres.go       |  12 ++--
 database/sqlite.go         |  15 ++--
 msgstore/db.go             |  16 +++--
 5 files changed, 181 insertions(+), 16 deletions(-)
 create mode 100644 contrib/migrate-fs/main.go

diff --git a/contrib/migrate-fs/main.go b/contrib/migrate-fs/main.go
new file mode 100644
index 0000000..954980e
--- /dev/null
+++ b/contrib/migrate-fs/main.go
@@ -0,0 +1,144 @@
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"strings"
	"time"

	"git.sr.ht/~emersion/soju/database"
	"git.sr.ht/~emersion/soju/msgstore"
)

const usage = `usage: migrate-fs <source logs> <destination directory>

Migrates existing Soju logs from the database to the filesystem. The source is
specified in the format of "driver:source" where driver is sqlite3 or postgres
and destination is specified in the format of "fs:path" as would be specified in
the 'message-store' option of the Soju config file.

Options:

  -help               Show this help message
`
const (
	targetPageSize  = 100
	messagePageSize = 1000
)

func init() {
	flag.Usage = func() {
		fmt.Fprint(flag.CommandLine.Output(), usage)
	}
}

func migrateTarget(ctx context.Context, db database.Database, user *database.User, network *database.Network, target string, fsStore msgstore.Store) error {
	log.Printf("\t%s\n", target)
	start := time.Unix(0, 0)
	end := time.Now()
	var lastId int64
	for {
		opts := &database.MessageOptions{
			AfterID:    lastId,
			BeforeTime: end,
			Limit:      messagePageSize,
			Events:     true,
		}
		if lastId == 0 {
			opts.AfterTime = start
		} else {
			opts.AfterID = lastId
		}

		msgs, err := db.ListMessages(ctx, network.ID, target, opts)

		if err != nil {
			return err
		}
		if len(msgs) == 0 {
			break
		}
		for _, msg := range msgs {
			lastId = msg.Id
			if _, err = fsStore.Append(network, target, &msg.Message); err != nil {
				return err
			}
		}
	}

	return nil
}

func migrateNetwork(ctx context.Context, db database.Database, user *database.User, network *database.Network, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error {
	log.Printf("Migrating logs for network: %s\n", network.Name)

	start := time.Unix(0, 0)
	end := time.Now()
	for {
		targets, err := dbStore.ListTargets(ctx, network, start, end, targetPageSize, true)
		if err != nil {
			return err
		}
		if len(targets) == 0 {
			break
		}
		for _, target := range targets {
			if start.Before(target.LatestMessage) {
				start = target.LatestMessage
			}
			if err = migrateTarget(ctx, db, user, network, target.Name, fsStore); err != nil {
				return err
			}
		}
	}
	return nil
}

func main() {
	flag.Parse()

	ctx := context.Background()

	dbParams := strings.Split(flag.Arg(0), ":")
	fsparams := strings.Split(flag.Arg(1), ":")

	if len(dbParams) != 2 {
		log.Fatalf("database not properly specified: %s", flag.Arg(1))
	}
	if len(fsparams) != 2 || fsparams[0] != "fs" {
		log.Fatalf("fs not properly specified: %s", flag.Arg(1))
	}
	logRoot := fsparams[1]

	db, err := database.Open(dbParams[0], dbParams[1])
	if err != nil {
		log.Fatalf("failed to open database: %v", err)
	}
	defer db.Close()

	users, err := db.ListUsers(ctx)
	if err != nil {
		log.Fatalf("unable to get users: %v", err)
	}

	dbStore := msgstore.NewDBStore(db)

	for _, user := range users {
		log.Printf("Migrating logs for user: %s\n", user.Username)

		fsStore := msgstore.NewFSStore(logRoot, &user)

		networks, err := db.ListNetworks(ctx, user.ID)
		if err != nil {
			log.Fatalf("unable to get networks for user: #%d %s", user.ID, user.Username)
		}

		for _, network := range networks {
			if err := migrateNetwork(ctx, db, &user, &network, fsStore, dbStore); err != nil {
				log.Fatalf("migrating %v: %v", network.Name, err)
			}
		}
	}
}
diff --git a/database/database.go b/database/database.go
index 15c04b9..965b323 100644
--- a/database/database.go
+++ b/database/database.go
@@ -29,6 +29,14 @@ type MessageOptions struct {
	TakeLast   bool
}

type Message struct {
	irc.Message
	Time time.Time
	Id    int64
}



type Database interface {
	Close() error
	Stats(ctx context.Context) (*DatabaseStats, error)
@@ -62,7 +70,7 @@ type Database interface {
	GetMessageLastID(ctx context.Context, networkID int64, name string) (int64, error)
	StoreMessage(ctx context.Context, networkID int64, name string, msg *irc.Message) (int64, error)
	ListMessageLastPerTarget(ctx context.Context, networkID int64, options *MessageOptions) ([]MessageTarget, error)
	ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error)
	ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error)
}

type MetricsCollectorDatabase interface {
diff --git a/database/postgres.go b/database/postgres.go
index e18cc05..f1419c4 100644
--- a/database/postgres.go
+++ b/database/postgres.go
@@ -1050,7 +1050,7 @@ func (db *PostgresDB) ListMessageLastPerTarget(ctx context.Context, networkID in
	return l, nil
}

func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
	ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
	defer cancel()

@@ -1059,7 +1059,7 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
		name,
	}
	query := `
		SELECT m.raw
		SELECT m.id, m.raw, m.time
		FROM "Message" AS m, "MessageTarget" AS t
		WHERE m.target = t.id AND t.network = $1 AND t.target = $2 `
	if options.AfterID > 0 {
@@ -1101,10 +1101,11 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
	}
	defer rows.Close()

	var l []*irc.Message
	var l []*Message
	for rows.Next() {
		var raw string
		if err := rows.Scan(&raw); err != nil {
		var message Message
		if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil {
			return nil, err
		}

@@ -1112,8 +1113,9 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
		if err != nil {
			return nil, err
		}
		message.Message = *msg

		l = append(l, msg)
		l = append(l, &message)
	}
	if err := rows.Err(); err != nil {
		return nil, err
diff --git a/database/sqlite.go b/database/sqlite.go
index 99c2f05..35d9bc5 100644
--- a/database/sqlite.go
+++ b/database/sqlite.go
@@ -1321,12 +1321,12 @@ func (db *SqliteDB) ListMessageLastPerTarget(ctx context.Context, networkID int6
	return l, nil
}

func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
	ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
	defer cancel()

	query := `
		SELECT m.raw
		SELECT m.id, m.raw, m.time
		FROM Message AS m, MessageTarget AS t
		WHERE m.target = t.id AND t.network = :network AND t.target = :target `
	if options.AfterID > 0 {
@@ -1371,19 +1371,22 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri
	}
	defer rows.Close()

	var l []*irc.Message
	var l []*Message
	for rows.Next() {
		var raw string
		if err := rows.Scan(&raw); err != nil {
		var message Message
		var timestamp sqliteTime
		if err := rows.Scan(&message.Id, &raw, &timestamp); err != nil {
			return nil, err
		}

		message.Time = timestamp.Time
		msg, err := irc.ParseMessage(raw)
		if err != nil {
			return nil, err
		}
		message.Message = *msg

		l = append(l, msg)
		l = append(l, &message)
	}
	if err := rows.Err(); err != nil {
		return nil, err
diff --git a/msgstore/db.go b/msgstore/db.go
index 289253a..18a428a 100644
--- a/msgstore/db.go
+++ b/msgstore/db.go
@@ -17,6 +17,14 @@ func (dbMsgID) msgIDType() msgIDType {
	return msgIDDB
}

func forgetDBSlice(l []*database.Message) []*irc.Message {
	ret := make([]*irc.Message, len(l))
	for i, m := range l {
		ret[i] = &m.Message
	}
	return ret
}

func parseDBMsgID(s string) (msgID int64, err error) {
	var id dbMsgID
	_, _, err = ParseMsgID(s, &id)
@@ -77,7 +85,7 @@ func (ms *dbMessageStore) LoadLatestID(ctx context.Context, id string, options *
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
@@ -119,7 +127,7 @@ func (ms *dbMessageStore) LoadBeforeTime(ctx context.Context, start, end time.Ti
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Time, options *LoadMessageOptions) ([]*irc.Message, error) {
@@ -132,7 +140,7 @@ func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Tim
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}

func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, options *SearchMessageOptions) ([]*irc.Message, error) {
@@ -147,5 +155,5 @@ func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network,
	if err != nil {
		return nil, err
	}
	return l, nil
	return forgetDBSlice(l), nil
}
-- 
2.42.0
Reply to thread Export thread (mbox)