Calvin Lee: 2 support msgtags in msgstore_fs contrib: add migrate-fs 10 files changed, 273 insertions(+), 21 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~emersion/soju-dev/patches/45571/mbox | git am -3Learn more about email & git
test plan: ``` go test -v ./msgstore/znclog/ === RUN TestRoundtrip === RUN TestRoundtrip/with_tags === RUN TestRoundtrip/without_tags --- PASS: TestRoundtrip (0.00s) --- PASS: TestRoundtrip/with_tags (0.00s) --- PASS: TestRoundtrip/without_tags (0.00s) PASS ok git.sr.ht/~emersion/soju/msgstore/znclog 0.003s ``` --- msgstore/znclog/reader.go | 18 +++++++++-- msgstore/znclog/testdata/irc-tags.log | 3 ++ msgstore/znclog/testdata/irc.log | 6 ++++ msgstore/znclog/writer.go | 25 +++++++++++++-- msgstore/znclog/znclog_test.go | 45 +++++++++++++++++++++++++++ 5 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 msgstore/znclog/testdata/irc-tags.log create mode 100644 msgstore/znclog/testdata/irc.log create mode 100644 msgstore/znclog/znclog_test.go diff --git a/msgstore/znclog/reader.go b/msgstore/znclog/reader.go index 5d7fe1f..ecf41f4 100644 --- a/msgstore/znclog/reader.go +++ b/msgstore/znclog/reader.go @@ -21,8 +21,18 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, } else if len(line) < timestampPrefixLen { return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: too short") } + tags := make(irc.Tags) line = line[timestampPrefixLen:] + if strings.HasPrefix(line, "(") { + parts := strings.SplitN(line[1:], ") ", 2) + if len(parts) != 2 { + return nil, time.Time{}, nil + } + tags = irc.ParseTags(parts[0]) + line = parts[1] + } + var cmd string var prefix *irc.Prefix var params []string @@ -118,6 +128,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, return nil, time.Time{}, nil } sender, text = parts[0], parts[1] + } else if strings.HasPrefix(line, ">") { + cmd = "TAGMSG" + sender, text = strings.TrimRight(line[1:], "< "), "" } else if strings.HasPrefix(line, "-") { cmd = "NOTICE" parts := strings.SplitN(line[1:], "- ", 2) @@ -150,10 +163,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, year, month, day := ref.Date() t := time.Date(year, month, day, hour, minute, second, 0, time.Local) + tags["time"] = xirc.FormatServerTime(t) msg := &irc.Message{ - Tags: map[string]string{ - "time": xirc.FormatServerTime(t), - }, + Tags: tags, Prefix: prefix, Command: cmd, Params: params, diff --git a/msgstore/znclog/testdata/irc-tags.log b/msgstore/znclog/testdata/irc-tags.log new file mode 100644 index 0000000..e8752b9 --- /dev/null +++ b/msgstore/znclog/testdata/irc-tags.log @@ -0,0 +1,3 @@ +[16:25:15] (msgid=123) <newbie> hi +[16:27:21] (+draft/reply=123;+draft/react=👋) >friend< +[16:28:57] (+draft/reply=123) <friend> newbie: hi! diff --git a/msgstore/znclog/testdata/irc.log b/msgstore/znclog/testdata/irc.log new file mode 100644 index 0000000..4acc0f6 --- /dev/null +++ b/msgstore/znclog/testdata/irc.log @@ -0,0 +1,6 @@ +[05:46:00] *** Quits: user (~User@0.0.0.0) (Ping timeout: 255 seconds) +[05:46:30] *** user_ is now known as user +[05:47:00] <spammer> im a spammer +[05:47:30] *** mod sets mode: +b *!*spammer@1.1.1.* +[05:48:00] *** spammer was kicked by mod (spammer) +[05:48:30] -bot- im a bot diff --git a/msgstore/znclog/writer.go b/msgstore/znclog/writer.go index b955edf..c559768 100644 --- a/msgstore/znclog/writer.go +++ b/msgstore/znclog/writer.go @@ -11,11 +11,30 @@ import ( ) func MarshalLine(msg *irc.Message, t time.Time) string { - s := formatMessage(msg) + s, tags := formatMessage(msg), formatMessageTags(msg) if s == "" { return "" } - return fmt.Sprintf("[%02d:%02d:%02d] %s", t.Hour(), t.Minute(), t.Second(), s) + ret := fmt.Sprintf("[%02d:%02d:%02d] ", t.Hour(), t.Minute(), t.Second()) + + if tags != "" { + ret += tags + " " + } + + return ret + s +} +func formatMessageTags(msg *irc.Message) string { + tags := make(irc.Tags) + for k, v := range msg.Tags { + if k != "time" { + tags[k] = v + } + } + if encoding := tags.String(); encoding != "" { + return "(" + encoding + ")" + } + return "" + } // formatMessage formats a message log line. It assumes a well-formed IRC @@ -61,6 +80,8 @@ func formatMessage(msg *irc.Message) string { } else { return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1]) } + case "TAGMSG": + return fmt.Sprintf(">%s<", msg.Prefix.Name) default: return "" } diff --git a/msgstore/znclog/znclog_test.go b/msgstore/znclog/znclog_test.go new file mode 100644 index 0000000..61e4589 --- /dev/null +++ b/msgstore/znclog/znclog_test.go @@ -0,0 +1,45 @@ +package znclog_test + +import ( + "bufio" + "os" + "testing" + "time" + + "git.sr.ht/~emersion/soju/msgstore/znclog" +) + +func testRoundtrip(t *testing.T, filename string) { + file, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + // we pretend that the irc logs occured today + ref := time.Now() + entity := "#test" + + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + // Count the words. + for scanner.Scan() { + originalLine := scanner.Text() + msg, time, err := znclog.UnmarshalLine(originalLine, nil, nil, entity, ref, true) + if err != nil { + t.Fatal(err) + } + roundtrip := znclog.MarshalLine(msg, time) + if originalLine != roundtrip { + t.Fatalf("not equal after round trip:\n\t%v\n\t%v", originalLine, roundtrip) + } + } + if err := scanner.Err(); err != nil { + t.Fatal(err) + } +} + +func TestRoundtrip(t *testing.T) { + t.Run("with tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc-tags.log") }) + t.Run("without tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc.log") }) +} -- 2.42.0
This commit adds a command for migrating messages to the filesystem backend from a database. Although the logic is rather generic, it shouldn't replace `migrate-logs` as the latter has less of a chance of loosing data by walking the file tree manually. Unfortunately this command can be lossy, as CHATHISTORY with second-granularity can miss messages. In order to remedy this, `migrate-fs` tries to use database IDs of messages instead of time whenever possible. --- In this version, a formatting change has been fixed and parsing of times in the sqlite backend now works consistently. contrib/migrate-fs/main.go | 144 +++++++++++++++++++++++++++++++++++++ database/database.go | 10 ++- database/postgres.go | 12 ++-- database/sqlite.go | 15 ++-- msgstore/db.go | 16 +++-- 5 files changed, 181 insertions(+), 16 deletions(-) create mode 100644 contrib/migrate-fs/main.go diff --git a/contrib/migrate-fs/main.go b/contrib/migrate-fs/main.go new file mode 100644 index 0000000..954980e --- /dev/null +++ b/contrib/migrate-fs/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "strings" + "time" + + "git.sr.ht/~emersion/soju/database" + "git.sr.ht/~emersion/soju/msgstore" +) + +const usage = `usage: migrate-fs <source logs> <destination directory> + +Migrates existing Soju logs from the database to the filesystem. The source is +specified in the format of "driver:source" where driver is sqlite3 or postgres +and destination is specified in the format of "fs:path" as would be specified in +the 'message-store' option of the Soju config file. + +Options: + + -help Show this help message +` +const ( + targetPageSize = 100 + messagePageSize = 1000 +) + +func init() { + flag.Usage = func() { + fmt.Fprint(flag.CommandLine.Output(), usage) + } +} + +func migrateTarget(ctx context.Context, db database.Database, user *database.User, network *database.Network, target string, fsStore msgstore.Store) error { + log.Printf("\t%s\n", target) + start := time.Unix(0, 0) + end := time.Now() + var lastId int64 + for { + opts := &database.MessageOptions{ + AfterID: lastId, + BeforeTime: end, + Limit: messagePageSize, + Events: true, + } + if lastId == 0 { + opts.AfterTime = start + } else { + opts.AfterID = lastId + } + + msgs, err := db.ListMessages(ctx, network.ID, target, opts) + + if err != nil { + return err + } + if len(msgs) == 0 { + break + } + for _, msg := range msgs { + lastId = msg.Id + if _, err = fsStore.Append(network, target, &msg.Message); err != nil { + return err + } + } + } + + return nil +} + +func migrateNetwork(ctx context.Context, db database.Database, user *database.User, network *database.Network, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error { + log.Printf("Migrating logs for network: %s\n", network.Name) + + start := time.Unix(0, 0) + end := time.Now() + for { + targets, err := dbStore.ListTargets(ctx, network, start, end, targetPageSize, true) + if err != nil { + return err + } + if len(targets) == 0 { + break + } + for _, target := range targets { + if start.Before(target.LatestMessage) { + start = target.LatestMessage + } + if err = migrateTarget(ctx, db, user, network, target.Name, fsStore); err != nil { + return err + } + } + } + return nil +} + +func main() { + flag.Parse() + + ctx := context.Background() + + dbParams := strings.Split(flag.Arg(0), ":") + fsparams := strings.Split(flag.Arg(1), ":") + + if len(dbParams) != 2 { + log.Fatalf("database not properly specified: %s", flag.Arg(1)) + } + if len(fsparams) != 2 || fsparams[0] != "fs" { + log.Fatalf("fs not properly specified: %s", flag.Arg(1)) + } + logRoot := fsparams[1] + + db, err := database.Open(dbParams[0], dbParams[1]) + if err != nil { + log.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + users, err := db.ListUsers(ctx) + if err != nil { + log.Fatalf("unable to get users: %v", err) + } + + dbStore := msgstore.NewDBStore(db) + + for _, user := range users { + log.Printf("Migrating logs for user: %s\n", user.Username) + + fsStore := msgstore.NewFSStore(logRoot, &user) + + networks, err := db.ListNetworks(ctx, user.ID) + if err != nil { + log.Fatalf("unable to get networks for user: #%d %s", user.ID, user.Username) + } + + for _, network := range networks { + if err := migrateNetwork(ctx, db, &user, &network, fsStore, dbStore); err != nil { + log.Fatalf("migrating %v: %v", network.Name, err) + } + } + } +} diff --git a/database/database.go b/database/database.go index 15c04b9..965b323 100644 --- a/database/database.go +++ b/database/database.go @@ -29,6 +29,14 @@ type MessageOptions struct { TakeLast bool } +type Message struct { + irc.Message + Time time.Time + Id int64 +} + + + type Database interface { Close() error Stats(ctx context.Context) (*DatabaseStats, error) @@ -62,7 +70,7 @@ type Database interface { GetMessageLastID(ctx context.Context, networkID int64, name string) (int64, error) StoreMessage(ctx context.Context, networkID int64, name string, msg *irc.Message) (int64, error) ListMessageLastPerTarget(ctx context.Context, networkID int64, options *MessageOptions) ([]MessageTarget, error) - ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) + ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) } type MetricsCollectorDatabase interface { diff --git a/database/postgres.go b/database/postgres.go index e18cc05..f1419c4 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -1050,7 +1050,7 @@ func (db *PostgresDB) ListMessageLastPerTarget(ctx context.Context, networkID in return l, nil } -func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) { +func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) { ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) defer cancel() @@ -1059,7 +1059,7 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st name, } query := ` - SELECT m.raw + SELECT m.id, m.raw, m.time FROM "Message" AS m, "MessageTarget" AS t WHERE m.target = t.id AND t.network = $1 AND t.target = $2 ` if options.AfterID > 0 { @@ -1101,10 +1101,11 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st } defer rows.Close() - var l []*irc.Message + var l []*Message for rows.Next() { var raw string - if err := rows.Scan(&raw); err != nil { + var message Message + if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil { return nil, err } @@ -1112,8 +1113,9 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st if err != nil { return nil, err } + message.Message = *msg - l = append(l, msg) + l = append(l, &message) } if err := rows.Err(); err != nil { return nil, err diff --git a/database/sqlite.go b/database/sqlite.go index 99c2f05..35d9bc5 100644 --- a/database/sqlite.go +++ b/database/sqlite.go @@ -1321,12 +1321,12 @@ func (db *SqliteDB) ListMessageLastPerTarget(ctx context.Context, networkID int6 return l, nil } -func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) { +func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) { ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) defer cancel() query := ` - SELECT m.raw + SELECT m.id, m.raw, m.time FROM Message AS m, MessageTarget AS t WHERE m.target = t.id AND t.network = :network AND t.target = :target ` if options.AfterID > 0 { @@ -1371,19 +1371,22 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri } defer rows.Close() - var l []*irc.Message + var l []*Message for rows.Next() { var raw string - if err := rows.Scan(&raw); err != nil { + var message Message + var timestamp sqliteTime + if err := rows.Scan(&message.Id, &raw, ×tamp); err != nil { return nil, err } - + message.Time = timestamp.Time msg, err := irc.ParseMessage(raw) if err != nil { return nil, err } + message.Message = *msg - l = append(l, msg) + l = append(l, &message) } if err := rows.Err(); err != nil { return nil, err diff --git a/msgstore/db.go b/msgstore/db.go index 289253a..18a428a 100644 --- a/msgstore/db.go +++ b/msgstore/db.go @@ -17,6 +17,14 @@ func (dbMsgID) msgIDType() msgIDType { return msgIDDB } +func forgetDBSlice(l []*database.Message) []*irc.Message { + ret := make([]*irc.Message, len(l)) + for i, m := range l { + ret[i] = &m.Message + } + return ret +} + func parseDBMsgID(s string) (msgID int64, err error) { var id dbMsgID _, _, err = ParseMsgID(s, &id) @@ -77,7 +85,7 @@ func (ms *dbMessageStore) LoadLatestID(ctx context.Context, id string, options * if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) { @@ -119,7 +127,7 @@ func (ms *dbMessageStore) LoadBeforeTime(ctx context.Context, start, end time.Ti if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Time, options *LoadMessageOptions) ([]*irc.Message, error) { @@ -132,7 +140,7 @@ func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Tim if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, options *SearchMessageOptions) ([]*irc.Message, error) { @@ -147,5 +155,5 @@ func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } -- 2.42.0