test plan:
```
go test -v ./msgstore/znclog/
=== RUN TestRoundtrip
=== RUN TestRoundtrip/with_tags
=== RUN TestRoundtrip/without_tags
--- PASS: TestRoundtrip (0.00s)
--- PASS: TestRoundtrip/with_tags (0.00s)
--- PASS: TestRoundtrip/without_tags (0.00s)
PASS
ok git.sr.ht/~emersion/soju/msgstore/znclog 0.003s
```
---
msgstore/znclog/reader.go | 18 +++++++++--
msgstore/znclog/testdata/irc-tags.log | 3 ++
msgstore/znclog/testdata/irc.log | 6 ++++
msgstore/znclog/writer.go | 25 +++++++++++++--
msgstore/znclog/znclog_test.go | 45 +++++++++++++++++++++++++++
5 files changed, 92 insertions(+), 5 deletions(-)
create mode 100644 msgstore/znclog/testdata/irc-tags.log
create mode 100644 msgstore/znclog/testdata/irc.log
create mode 100644 msgstore/znclog/znclog_test.go
diff --git a/msgstore/znclog/reader.go b/msgstore/znclog/reader.go
index 5d7fe1f..ecf41f4 100644
--- a/msgstore/znclog/reader.go
+++ b/msgstore/znclog/reader.go
@@ -21,8 +21,18 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
} else if len(line) < timestampPrefixLen {
return nil, time.Time{}, fmt.Errorf("malformed timestamp prefix: too short")
}
+ tags := make(irc.Tags)
line = line[timestampPrefixLen:]
+ if strings.HasPrefix(line, "(") {
+ parts := strings.SplitN(line[1:], ") ", 2)
+ if len(parts) != 2 {
+ return nil, time.Time{}, nil
+ }
+ tags = irc.ParseTags(parts[0])
+ line = parts[1]
+ }
+
var cmd string
var prefix *irc.Prefix
var params []string
@@ -118,6 +128,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
return nil, time.Time{}, nil
}
sender, text = parts[0], parts[1]
+ } else if strings.HasPrefix(line, ">") {
+ cmd = "TAGMSG"
+ sender, text = strings.TrimRight(line[1:], "< "), ""
} else if strings.HasPrefix(line, "-") {
cmd = "NOTICE"
parts := strings.SplitN(line[1:], "- ", 2)
@@ -150,10 +163,9 @@ func UnmarshalLine(line string, user *database.User, network *database.Network,
year, month, day := ref.Date()
t := time.Date(year, month, day, hour, minute, second, 0, time.Local)
+ tags["time"] = xirc.FormatServerTime(t)
msg := &irc.Message{
- Tags: map[string]string{
- "time": xirc.FormatServerTime(t),
- },
+ Tags: tags,
Prefix: prefix,
Command: cmd,
Params: params,
diff --git a/msgstore/znclog/testdata/irc-tags.log b/msgstore/znclog/testdata/irc-tags.log
new file mode 100644
index 0000000..e8752b9
--- /dev/null
+++ b/msgstore/znclog/testdata/irc-tags.log
@@ -0,0 +1,3 @@
+[16:25:15] (msgid=123) <newbie> hi
+[16:27:21] (+draft/reply=123;+draft/react=👋) >friend<
+[16:28:57] (+draft/reply=123) <friend> newbie: hi!
diff --git a/msgstore/znclog/testdata/irc.log b/msgstore/znclog/testdata/irc.log
new file mode 100644
index 0000000..4acc0f6
--- /dev/null
+++ b/msgstore/znclog/testdata/irc.log
@@ -0,0 +1,6 @@
+[05:46:00] *** Quits: user (~User@0.0.0.0) (Ping timeout: 255 seconds)
+[05:46:30] *** user_ is now known as user
+[05:47:00] <spammer> im a spammer
+[05:47:30] *** mod sets mode: +b *!*spammer@1.1.1.*
+[05:48:00] *** spammer was kicked by mod (spammer)
+[05:48:30] -bot- im a bot
diff --git a/msgstore/znclog/writer.go b/msgstore/znclog/writer.go
index b955edf..c559768 100644
--- a/msgstore/znclog/writer.go
+++ b/msgstore/znclog/writer.go
@@ -11,11 +11,30 @@ import (
)
func MarshalLine(msg *irc.Message, t time.Time) string {
- s := formatMessage(msg)
+ s, tags := formatMessage(msg), formatMessageTags(msg)
if s == "" {
return ""
}
- return fmt.Sprintf("[%02d:%02d:%02d] %s", t.Hour(), t.Minute(), t.Second(), s)
+ ret := fmt.Sprintf("[%02d:%02d:%02d] ", t.Hour(), t.Minute(), t.Second())
+
+ if tags != "" {
+ ret += tags + " "
+ }
+
+ return ret + s
+}
+func formatMessageTags(msg *irc.Message) string {
+ tags := make(irc.Tags)
+ for k, v := range msg.Tags {
+ if k != "time" {
+ tags[k] = v
+ }
+ }
+ if encoding := tags.String(); encoding != "" {
+ return "(" + encoding + ")"
+ }
+ return ""
+
}
// formatMessage formats a message log line. It assumes a well-formed IRC
@@ -61,6 +80,8 @@ func formatMessage(msg *irc.Message) string {
} else {
return fmt.Sprintf("<%s> %s", msg.Prefix.Name, msg.Params[1])
}
+ case "TAGMSG":
+ return fmt.Sprintf(">%s<", msg.Prefix.Name)
default:
return ""
}
diff --git a/msgstore/znclog/znclog_test.go b/msgstore/znclog/znclog_test.go
new file mode 100644
index 0000000..61e4589
--- /dev/null
+++ b/msgstore/znclog/znclog_test.go
@@ -0,0 +1,45 @@
+package znclog_test
+
+import (
+ "bufio"
+ "os"
+ "testing"
+ "time"
+
+ "git.sr.ht/~emersion/soju/msgstore/znclog"
+)
+
+func testRoundtrip(t *testing.T, filename string) {
+ file, err := os.Open(filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer file.Close()
+
+ // we pretend that the irc logs occured today
+ ref := time.Now()
+ entity := "#test"
+
+ scanner := bufio.NewScanner(file)
+ scanner.Split(bufio.ScanLines)
+ // Count the words.
+ for scanner.Scan() {
+ originalLine := scanner.Text()
+ msg, time, err := znclog.UnmarshalLine(originalLine, nil, nil, entity, ref, true)
+ if err != nil {
+ t.Fatal(err)
+ }
+ roundtrip := znclog.MarshalLine(msg, time)
+ if originalLine != roundtrip {
+ t.Fatalf("not equal after round trip:\n\t%v\n\t%v", originalLine, roundtrip)
+ }
+ }
+ if err := scanner.Err(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestRoundtrip(t *testing.T) {
+ t.Run("with tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc-tags.log") })
+ t.Run("without tags", func(t *testing.T) { testRoundtrip(t, "testdata/irc.log") })
+}
--
2.42.0
This commit adds a command for migrating messages to the filesystem
backend from a database. Although the logic is rather generic, it
shouldn't replace `migrate-logs` as the latter has less of a chance of
loosing data by walking the file tree manually.
Unfortunately this command can be lossy, as CHATHISTORY with
second-granularity can miss messages. In order to remedy this,
`migrate-fs` tries to use database IDs of messages instead of time
whenever possible.
---
In this version, a formatting change has been fixed and parsing of times
in the sqlite backend now works consistently.
contrib/migrate-fs/main.go | 144 +++++++++++++++++++++++++++++++++++++
database/database.go | 10 ++-
database/postgres.go | 12 ++--
database/sqlite.go | 15 ++--
msgstore/db.go | 16 +++--
5 files changed, 181 insertions(+), 16 deletions(-)
create mode 100644 contrib/migrate-fs/main.go
diff --git a/contrib/migrate-fs/main.go b/contrib/migrate-fs/main.go
new file mode 100644
index 0000000..954980e
--- /dev/null
+++ b/contrib/migrate-fs/main.go
@@ -0,0 +1,144 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "strings"
+ "time"
+
+ "git.sr.ht/~emersion/soju/database"
+ "git.sr.ht/~emersion/soju/msgstore"
+)
+
+const usage = `usage: migrate-fs <source logs> <destination directory>
+
+Migrates existing Soju logs from the database to the filesystem. The source is
+specified in the format of "driver:source" where driver is sqlite3 or postgres
+and destination is specified in the format of "fs:path" as would be specified in
+the 'message-store' option of the Soju config file.
+
+Options:
+
+ -help Show this help message
+`
+const (
+ targetPageSize = 100
+ messagePageSize = 1000
+)
+
+func init() {
+ flag.Usage = func() {
+ fmt.Fprint(flag.CommandLine.Output(), usage)
+ }
+}
+
+func migrateTarget(ctx context.Context, db database.Database, user *database.User, network *database.Network, target string, fsStore msgstore.Store) error {
+ log.Printf("\t%s\n", target)
+ start := time.Unix(0, 0)
+ end := time.Now()
+ var lastId int64
+ for {
+ opts := &database.MessageOptions{
+ AfterID: lastId,
+ BeforeTime: end,
+ Limit: messagePageSize,
+ Events: true,
+ }
+ if lastId == 0 {
+ opts.AfterTime = start
+ } else {
+ opts.AfterID = lastId
+ }
+
+ msgs, err := db.ListMessages(ctx, network.ID, target, opts)
+
+ if err != nil {
+ return err
+ }
+ if len(msgs) == 0 {
+ break
+ }
+ for _, msg := range msgs {
+ lastId = msg.Id
+ if _, err = fsStore.Append(network, target, &msg.Message); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
+}
+
+func migrateNetwork(ctx context.Context, db database.Database, user *database.User, network *database.Network, fsStore msgstore.Store, dbStore msgstore.ChatHistoryStore) error {
+ log.Printf("Migrating logs for network: %s\n", network.Name)
+
+ start := time.Unix(0, 0)
+ end := time.Now()
+ for {
+ targets, err := dbStore.ListTargets(ctx, network, start, end, targetPageSize, true)
+ if err != nil {
+ return err
+ }
+ if len(targets) == 0 {
+ break
+ }
+ for _, target := range targets {
+ if start.Before(target.LatestMessage) {
+ start = target.LatestMessage
+ }
+ if err = migrateTarget(ctx, db, user, network, target.Name, fsStore); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func main() {
+ flag.Parse()
+
+ ctx := context.Background()
+
+ dbParams := strings.Split(flag.Arg(0), ":")
+ fsparams := strings.Split(flag.Arg(1), ":")
+
+ if len(dbParams) != 2 {
+ log.Fatalf("database not properly specified: %s", flag.Arg(1))
+ }
+ if len(fsparams) != 2 || fsparams[0] != "fs" {
+ log.Fatalf("fs not properly specified: %s", flag.Arg(1))
+ }
+ logRoot := fsparams[1]
+
+ db, err := database.Open(dbParams[0], dbParams[1])
+ if err != nil {
+ log.Fatalf("failed to open database: %v", err)
+ }
+ defer db.Close()
+
+ users, err := db.ListUsers(ctx)
+ if err != nil {
+ log.Fatalf("unable to get users: %v", err)
+ }
+
+ dbStore := msgstore.NewDBStore(db)
+
+ for _, user := range users {
+ log.Printf("Migrating logs for user: %s\n", user.Username)
+
+ fsStore := msgstore.NewFSStore(logRoot, &user)
+
+ networks, err := db.ListNetworks(ctx, user.ID)
+ if err != nil {
+ log.Fatalf("unable to get networks for user: #%d %s", user.ID, user.Username)
+ }
+
+ for _, network := range networks {
+ if err := migrateNetwork(ctx, db, &user, &network, fsStore, dbStore); err != nil {
+ log.Fatalf("migrating %v: %v", network.Name, err)
+ }
+ }
+ }
+}
diff --git a/database/database.go b/database/database.go
index 15c04b9..965b323 100644
--- a/database/database.go
+++ b/database/database.go
@@ -29,6 +29,14 @@ type MessageOptions struct {
TakeLast bool
}
+type Message struct {
+ irc.Message
+ Time time.Time
+ Id int64
+}
+
+
+
type Database interface {
Close() error
Stats(ctx context.Context) (*DatabaseStats, error)
@@ -62,7 +70,7 @@ type Database interface {
GetMessageLastID(ctx context.Context, networkID int64, name string) (int64, error)
StoreMessage(ctx context.Context, networkID int64, name string, msg *irc.Message) (int64, error)
ListMessageLastPerTarget(ctx context.Context, networkID int64, options *MessageOptions) ([]MessageTarget, error)
- ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error)
+ ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error)
}
type MetricsCollectorDatabase interface {
diff --git a/database/postgres.go b/database/postgres.go
index e18cc05..f1419c4 100644
--- a/database/postgres.go
+++ b/database/postgres.go
@@ -1050,7 +1050,7 @@ func (db *PostgresDB) ListMessageLastPerTarget(ctx context.Context, networkID in
return l, nil
}
-func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
+func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
ctx, cancel := context.WithTimeout(ctx, postgresQueryTimeout)
defer cancel()
@@ -1059,7 +1059,7 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
name,
}
query := `
- SELECT m.raw
+ SELECT m.id, m.raw, m.time
FROM "Message" AS m, "MessageTarget" AS t
WHERE m.target = t.id AND t.network = $1 AND t.target = $2 `
if options.AfterID > 0 {
@@ -1101,10 +1101,11 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
}
defer rows.Close()
- var l []*irc.Message
+ var l []*Message
for rows.Next() {
var raw string
- if err := rows.Scan(&raw); err != nil {
+ var message Message
+ if err := rows.Scan(&message.Id, &raw, &message.Time); err != nil {
return nil, err
}
@@ -1112,8 +1113,9 @@ func (db *PostgresDB) ListMessages(ctx context.Context, networkID int64, name st
if err != nil {
return nil, err
}
+ message.Message = *msg
- l = append(l, msg)
+ l = append(l, &message)
}
if err := rows.Err(); err != nil {
return nil, err
diff --git a/database/sqlite.go b/database/sqlite.go
index 99c2f05..35d9bc5 100644
--- a/database/sqlite.go
+++ b/database/sqlite.go
@@ -1321,12 +1321,12 @@ func (db *SqliteDB) ListMessageLastPerTarget(ctx context.Context, networkID int6
return l, nil
}
-func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*irc.Message, error) {
+func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name string, options *MessageOptions) ([]*Message, error) {
ctx, cancel := context.WithTimeout(ctx, sqliteQueryTimeout)
defer cancel()
query := `
- SELECT m.raw
+ SELECT m.id, m.raw, m.time
FROM Message AS m, MessageTarget AS t
WHERE m.target = t.id AND t.network = :network AND t.target = :target `
if options.AfterID > 0 {
@@ -1371,19 +1371,22 @@ func (db *SqliteDB) ListMessages(ctx context.Context, networkID int64, name stri
}
defer rows.Close()
- var l []*irc.Message
+ var l []*Message
for rows.Next() {
var raw string
- if err := rows.Scan(&raw); err != nil {
+ var message Message
+ var timestamp sqliteTime
+ if err := rows.Scan(&message.Id, &raw, ×tamp); err != nil {
return nil, err
}
-
+ message.Time = timestamp.Time
msg, err := irc.ParseMessage(raw)
if err != nil {
return nil, err
}
+ message.Message = *msg
- l = append(l, msg)
+ l = append(l, &message)
}
if err := rows.Err(); err != nil {
return nil, err
diff --git a/msgstore/db.go b/msgstore/db.go
index 289253a..18a428a 100644
--- a/msgstore/db.go
+++ b/msgstore/db.go
@@ -17,6 +17,14 @@ func (dbMsgID) msgIDType() msgIDType {
return msgIDDB
}
+func forgetDBSlice(l []*database.Message) []*irc.Message {
+ ret := make([]*irc.Message, len(l))
+ for i, m := range l {
+ ret[i] = &m.Message
+ }
+ return ret
+}
+
func parseDBMsgID(s string) (msgID int64, err error) {
var id dbMsgID
_, _, err = ParseMsgID(s, &id)
@@ -77,7 +85,7 @@ func (ms *dbMessageStore) LoadLatestID(ctx context.Context, id string, options *
if err != nil {
return nil, err
}
- return l, nil
+ return forgetDBSlice(l), nil
}
func (ms *dbMessageStore) Append(network *database.Network, entity string, msg *irc.Message) (string, error) {
@@ -119,7 +127,7 @@ func (ms *dbMessageStore) LoadBeforeTime(ctx context.Context, start, end time.Ti
if err != nil {
return nil, err
}
- return l, nil
+ return forgetDBSlice(l), nil
}
func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Time, options *LoadMessageOptions) ([]*irc.Message, error) {
@@ -132,7 +140,7 @@ func (ms *dbMessageStore) LoadAfterTime(ctx context.Context, start, end time.Tim
if err != nil {
return nil, err
}
- return l, nil
+ return forgetDBSlice(l), nil
}
func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network, options *SearchMessageOptions) ([]*irc.Message, error) {
@@ -147,5 +155,5 @@ func (ms *dbMessageStore) Search(ctx context.Context, network *database.Network,
if err != nil {
return nil, err
}
- return l, nil
+ return forgetDBSlice(l), nil
}
--
2.42.0