Calvin Lee: 2 support msgtags in msgstore_fs contrib: add migrate-fs 10 files changed, 277 insertions(+), 20 deletions(-)
Copy & paste the following snippet into your terminal to import this patchset into git:
curl -s https://lists.sr.ht/~emersion/soju-dev/patches/45565/mbox | git am -3Learn more about email & git
test plan: ``` go test -v ./msgstore/znclog/ === RUN TestRoundtrip === RUN TestRoundtrip/with_tags === RUN TestRoundtrip/without_tags --- PASS: TestRoundtrip (0.00s) --- PASS: TestRoundtrip/with_tags (0.00s) --- PASS: TestRoundtrip/without_tags (0.00s) PASS ok git.sr.ht/~emersion/soju/msgstore/znclog 0.003s ``` --- msgstore/znclog/reader.go | 18 +++++++++-- msgstore/znclog/testdata/irc-tags.log | 3 ++ msgstore/znclog/testdata/irc.log | 6 ++++ msgstore/znclog/writer.go | 25 +++++++++++++-- msgstore/znclog/znclog_test.go | 45 +++++++++++++++++++++++++++ 5 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 msgstore/znclog/testdata/irc-tags.log create mode 100644 msgstore/znclog/testdata/irc.log create mode 100644 msgstore/znclog/znclog_test.go diff --git a/msgstore/znclog/reader.go b/msgstore/znclog/reader.go index 5d7fe1f..ecf41f4 100644 --- a/msgstore/znclog/reader.go +++ b/msgstore/znclog/reader.go @@ -21,8 +21,18 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, } else if len(line) < timestampPrefixLen { return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: too short") } + tags := make(irc.Tags) line = line[timestampPrefixLen:] + if strings.HasPrefix(line, "(") { + parts := strings.SplitN(line[1:], ") ", 2) + if len(parts) != 2 { + return nil, time.Time{}, nil + } + tags = irc.ParseTags(parts[0]) + line = parts[1] + } + var cmd string var prefix *irc.Prefix var params []string @@ -118,6 +128,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, return nil, time.Time{}, nil } sender, text = parts[0], parts[1] + } else if strings.HasPrefix(line, ">") { + cmd = "TAGMSG" + sender, text = strings.TrimRight(line[1:], "< "), "" } else if strings.HasPrefix(line, "-") { cmd = "NOTICE" parts := strings.SplitN(line[1:], "- ", 2) @@ -150,10 +163,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network, year, month, day := ref.Date() t := time.Date(year, month, day, hour, minute, second, 0, time.Local) + tags["time"] = xirc.FormatServerTime(t) msg := &irc.Message{ - Tags: map[string]string{ - "time": xirc.FormatServerTime(t), - }, + Tags: tags, Prefix: prefix, Command: cmd, Params: params, diff --git a/msgstore/znclog/testdata/irc-tags.log b/msgstore/znclog/testdata/irc-tags.log new file mode 100644 index 0000000..e8752b9 --- /dev/null +++ b/msgstore/znclog/testdata/irc-tags.log @@ -0,0 +1,3 @@ +[16:25:15] (msgid=123) <newbie> hi +[16:27:21] (+draft/reply=123;+draft/react=👋) >friend< +[16:28:57] (+draft/reply=123) <friend> newbie: hi! diff --git a/msgstore/znclog/testdata/irc.log b/msgstore/znclog/testdata/irc.log new file mode 100644 index 0000000..4acc0f6 --- /dev/null +++ b/msgstore/znclog/testdata/irc.log @@ -0,0 +1,6 @@ +[05:46:00] *** Quits: user (~User@0.0.0.0) (Ping timeout: 255 seconds) +[05:46:30] *** user_ is now known as user +[05:47:00] <spammer> im a spammer +[05:47:30] *** mod sets mode: +b *!*spammer@1.1.1.* +[05:48:00] *** spammer was kicked by mod (spammer) +[05:48:30] -bot- im a bot diff --git a/msgstore/znclog/writer.go b/msgstore/znclog/writer.go index b955edf..c559768 100644 --- a/msgstore/znclog/writer.go +++ b/msgstore/znclog/writer.go @@ -11,11 +11,30 @@ import ( ) func MarshalLine(msg *irc.Message, t time.Time) string { - s := formatMessage(msg) + s, tags := formatMessage(msg), formatMessageTags(msg) if s == "" { return "" } - return fmt.Sprintf("[%02d:%02d:%02d] %s", t.Hour(), t.Minute(), t.Second(), s) + ret := fmt.Sprintf("[%02d:%02d:%02d] ", t.Hour(), t.Minute(), t.Second()) + + if tags != "" { + ret += tags + " " + } + + return ret + s +} +func formatMessageTags(msg *irc.Message) string { + tags := make(irc.Tags) + for k, v := range msg.Tags { + if k != "time" { + tags[k] = v + } + } + if encoding := tags.String(); encoding != "" { + return "(" + encoding + ")" + } + return "" + } // formatMessage formats a message log line. It assumes a well-formed IRC @@ -61,6 +80,8 @@ func formatMessage(msg *irc.Message) string { } else { return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1]) } + case "TAGMSG": + return fmt.Sprintf(">%s<", msg.Prefix.Name) default: return "" } diff --git a/msgstore/znclog/znclog_test.go b/msgstore/znclog/znclog_test.go new file mode 100644 index 0000000..61e4589 --- /dev/null +++ b/msgstore/znclog/znclog_test.go @@ -0,0 +1,45 @@ +package znclog_test + +import ( + "bufio" + "os" + "testing" + "time" + + "git.sr.ht/~emersion/soju/msgstore/znclog" +) + +func testRoundtrip(t *testing.T, filename string) { + file, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + defer file.Close() + + // we pretend that the irc logs occured today + ref := time.Now() + entity := "#test" + + scanner := bufio.NewScanner(file) + scanner.Split(bufio.ScanLines) + // Count the words. + for scanner.Scan() { + originalLine := scanner.Text() + msg, time, err := znclog.UnmarshalLine(originalLine, nil, nil, entity, ref, true) + if err != nil { + t.Fatal(err) + } + roundtrip := znclog.MarshalLine(msg, time) + if originalLine != roundtrip { + t.Fatalf("not equal after round trip:\n\t%v\n\t%v", originalLine, roundtrip) + } + } + if err := scanner.Err(); err != nil { + t.Fatal(err) + } +} + +func TestRoundtrip(t *testing.T) { + t.Run("with tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc-tags.log") }) + t.Run("without tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc.log") }) +} -- 2.42.0
Sorry, but I don't think this is desirable. The fs message store is specifically designed to be compatible with ZNC, breaking it would not allow other tools which also support this format to continue to work correctly.
This commit adds a command for migrating messages to the filesystem backend from a database. Although the logic is rather generic, it shouldn't replace `migrate-logs` as the latter has less of a chance of loosing data by walking the file tree manually. Unfortunately this command can be lossy, as CHATHISTORY with second-granularity can miss messages. In order to remedy this, `migrate-fs` tries to use database IDs of messages instead of time whenever possible. --- contrib/migrate-fs/main.go | 150 +++++++++++++++++++++++++++++++++++++ database/database.go | 10 ++- database/postgres.go | 12 +-- database/sqlite.go | 12 +-- msgstore/db.go | 16 +++- 5 files changed, 185 insertions(+), 15 deletions(-) create mode 100644 contrib/migrate-fs/main.go diff --git a/contrib/migrate-fs/main.go b/contrib/migrate-fs/main.go new file mode 100644 index 0000000..aebdf41 --- /dev/null +++ b/contrib/migrate-fs/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "context" + "flag" + "fmt" + "log" + "strings" + "time" + + "git.sr.ht/~emersion/soju/database" + "git.sr.ht/~emersion/soju/msgstore" +) + +const usage = `usage: migrate-fs <source logs> <destination directory> + +Migrates existing Soju logs from the database to the filesystem. The source is +specified in the format of "driver:source" where driver is sqlite3 or postgres +and destination is specified in the format of "fs:path" as would be specified in +the 'message-store' option of the Soju config file. + +Options: + + -help Show this help message +` +const ( + targetPageSize = 100 + messagePageSize = 1000 +) + +func init() { + flag.Usage = func() { + fmt.Fprint(flag.CommandLine.Output(), usage) + } +} + +func migrateTarget(ctx context.Context, db database.Database, user *database.User, network *database.Network, target string, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error { + log.Printf("\t%s\n", target) + start := time.Unix(0, 0) + end := time.Now() + opts := &msgstore.LoadMessageOptions{ + Network: network, + Entity: target, + Limit: messagePageSize, + Events: true, + } + var lastId int64 + for { + opts := &database.MessageOptions{ + AfterID: lastId, + BeforeTime: end, + Limit: messagePageSize, + Events: true, + } + if lastId == 0 { + opts.AfterTime = start + } else { + opts.AfterID = lastId + } + + msgs, err := db.ListMessages(ctx, network.ID, target, opts) + + if err != nil { + return err + } + if len(msgs) == 0 { + break + } + for _, msg := range msgs { + lastId = msg.Id + if _, err = fsStore.Append(network, target, &msg.Message); err != nil { + return err + } + } + } + + return nil +} + +func migrateNetwork(ctx context.Context, db database.Database, user *database.User, network *database.Network, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error { + log.Printf("Migrating logs for network: %s\n", network.Name) + + start := time.Unix(0, 0) + end := time.Now() + for { + targets, err := dbStore.ListTargets(ctx, network, start, end, targetPageSize, true) + if err != nil { + return err + } + if len(targets) == 0 { + break + } + for _, target := range targets { + if start.Before(target.LatestMessage) { + start = target.LatestMessage + } + if err = migrateTarget(ctx, db, user, network, target.Name, fsStore, dbStore); err != nil { + return err + } + } + } + return nil +} + +func main() { + flag.Parse() + + ctx := context.Background() + + dbParams := strings.Split(flag.Arg(0), ":") + fsparams := strings.Split(flag.Arg(1), ":") + + if len(dbParams) != 2 { + log.Fatalf("database not properly specified: %s", flag.Arg(1)) + } + if len(fsparams) != 2 || fsparams[0] != "fs" { + log.Fatalf("fs not properly specified: %s", flag.Arg(1)) + } + logRoot := fsparams[1] + + db, err := database.Open(dbParams[0], dbParams[1]) + if err != nil { + log.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + users, err := db.ListUsers(ctx) + if err != nil { + log.Fatalf("unable to get users: %v", err) + } + + dbStore := msgstore.NewDBStore(db) + + for _, user := range users { + log.Printf("Migrating logs for user: %s\n", user.Username) + + fsStore := msgstore.NewFSStore(logRoot, &user) + + networks, err := db.ListNetworks(ctx, user.ID) + if err != nil { + log.Fatalf("unable to get networks for user: #%d %s", user.ID, user.Username) + } + + for _, network := range networks { + if err := migrateNetwork(ctx, db, &user, &network, fsStore, dbStore); err != nil { + log.Fatalf("migrating %v: %v", network.Name, err) + } + } + } +} diff --git a/database/database.go b/database/database.go index 15c04b9..965b323 100644 --- a/database/database.go +++ b/database/database.go @@ -29,6 +29,14 @@ type MessageOptions struct { TakeLast bool } +type Message struct { + irc.Message + Time time.Time + Id int64 +} + + + type Database interface { Close() error Stats(ctx context.Context) (*DatabaseStats, error) @@ -62,7 +70,7 @@ type Database interface { GetMessageLastID(ctx context.Context, networkID int64, name string) (int64, error) StoreMessage(ctx context.Context, networkID int64, name string, msg *irc.Message) (int64, error) ListMessageLastPerTarget(ctx context.Context, networkID int64, options *MessageOptions) ([]MessageTarget, error) - ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) + ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) } type MetricsCollectorDatabase interface { diff --git a/database/postgres.go b/database/postgres.go index e18cc05..f1419c4 100644 --- a/database/postgres.go +++ b/database/postgres.go @@ -1050,7 +1050,7 @@ func (db *PostgresDB) ListMessageLastPerTarget(ctx context.Context, networkID in return l, nil } -func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) { +func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) { ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout) defer cancel() @@ -1059,7 +1059,7 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st name, } query := ` - SELECT m.raw + SELECT m.id, m.raw, m.time FROM "Message" AS m, "MessageTarget" AS t WHERE m.target = t.id AND t.network = $1 AND t.target = $2 ` if options.AfterID > 0 { @@ -1101,10 +1101,11 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st } defer rows.Close() - var l []*irc.Message + var l []*Message for rows.Next() { var raw string - if err := rows.Scan(&raw); err != nil { + var message Message + if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil { return nil, err } @@ -1112,8 +1113,9 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st if err != nil { return nil, err } + message.Message = *msg - l = append(l, msg) + l = append(l, &message) } if err := rows.Err(); err != nil { return nil, err diff --git a/database/sqlite.go b/database/sqlite.go index 99c2f05..d5ae2cb 100644 --- a/database/sqlite.go +++ b/database/sqlite.go @@ -1321,12 +1321,12 @@ func (db *SqliteDB) ListMessageLastPerTarget(ctx context.Context, networkID int6 return l, nil } -func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) { +func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) { ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout) defer cancel() query := ` - SELECT m.raw + SELECT m.id, m.raw, m.time FROM Message AS m, MessageTarget AS t WHERE m.target = t.id AND t.network = :network AND t.target = :target ` if options.AfterID > 0 { @@ -1371,10 +1371,11 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri } defer rows.Close() - var l []*irc.Message + var l []*Message for rows.Next() { var raw string - if err := rows.Scan(&raw); err != nil { + var message Message + if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil { return nil, err } @@ -1382,8 +1383,9 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri if err != nil { return nil, err } + message.Message = *msg - l = append(l, msg) + l = append(l, &message) } if err := rows.Err(); err != nil { return nil, err diff --git a/msgstore/db.go b/msgstore/db.go index 289253a..18a428a 100644 --- a/msgstore/db.go +++ b/msgstore/db.go @@ -17,6 +17,14 @@ func (dbMsgID) msgIDType() msgIDType { return msgIDDB } +func forgetDBSlice(l []*database.Message) []*irc.Message { + ret := make([]*irc.Message, len(l)) + for i, m := range l { + ret[i] = &m.Message + } + return ret +} + func parseDBMsgID(s string) (msgID int64, err error) { var id dbMsgID _, _, err = ParseMsgID(s, &id) @@ -77,7 +85,7 @@ func (ms *dbMessageStore) LoadLatestID(ctx context.Context, id string, options * if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) { @@ -119,7 +127,7 @@ func (ms *dbMessageStore) LoadBeforeTime(ctx context.Context, start, end time.Ti if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Time, options *LoadMessageOptions) ([]*irc.Message, error) { @@ -132,7 +140,7 @@ func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Tim if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, options *SearchMessageOptions) ([]*irc.Message, error) { @@ -147,5 +155,5 @@ func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, if err != nil { return nil, err } - return l, nil + return forgetDBSlice(l), nil } -- 2.42.0