security: cap sync storage growth
This commit is contained in:
parent
d08620abf9
commit
d410e9f76e
@ -27,6 +27,7 @@
|
||||
- Media: reject send-file uploads and media downloads larger than 100 MiB before reading or writing the payload. (#63 — thanks @alexander-morris)
|
||||
- Send: warn when send commands are invoked in rapid succession so automation rate-limit/account-risk is visible. (#53 — thanks @alexander-morris)
|
||||
- Send: validate phone-number recipients before constructing WhatsApp JIDs. (#144 — thanks @draix)
|
||||
- Sync: add message-count and database-size caps plus uncapped-sync warnings to avoid unbounded local history growth. (#54 — thanks @alexander-morris)
|
||||
- Store: restrict index and session SQLite database files to owner-only permissions. (#147 — thanks @draix)
|
||||
|
||||
### Fixed
|
||||
|
||||
@ -139,6 +139,7 @@ pnpm wacli presence paused --to 1234567890
|
||||
|
||||
- `wacli auth`: interactive login (shows QR code), then immediately performs initial data sync.
|
||||
- `wacli sync`: non-interactive sync loop (never shows QR; errors if not authenticated).
|
||||
- `wacli sync` warns when local storage is uncapped; use `--max-messages` or `--max-db-size` to bound history growth.
|
||||
- Output is human-readable by default; pass `--json` for machine-readable output.
|
||||
- Pass `--full` to keep full IDs in table output; non-TTY output keeps full IDs automatically.
|
||||
- Pass `--read-only` or set `WACLI_READONLY=1` to block commands that intentionally mutate WhatsApp or the local store.
|
||||
@ -150,7 +151,7 @@ Full command docs live under [docs/overview.md](docs/overview.md). Quick referen
|
||||
- `wacli auth [--follow] [--idle-exit 30s] [--download-media] [--qr-format terminal|text] [--phone PHONE]`
|
||||
- `wacli auth status`
|
||||
- `wacli auth logout`
|
||||
- `wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--download-media] [--refresh-contacts] [--refresh-groups]`
|
||||
- `wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--max-messages N] [--max-db-size SIZE] [--download-media] [--refresh-contacts] [--refresh-groups]`
|
||||
- `wacli messages list [--chat JID] [--sender JID] [--from-me|--from-them] [--asc] [--limit N] [--after DATE] [--before DATE] [--forwarded]`
|
||||
- `wacli messages search <query> [--chat JID] [--from JID] [--has-media] [--type text|image|video|audio|document] [--forwarded]`
|
||||
- `wacli messages show --chat JID --id MSG_ID`
|
||||
@ -203,6 +204,8 @@ Global flags:
|
||||
- `WACLI_DEVICE_LABEL`: override the linked device label shown in WhatsApp (defaults to `wacli - <OS> (<hostname>)` when detectable).
|
||||
- `WACLI_DEVICE_PLATFORM`: override the linked device platform (defaults to `DESKTOP`; invalid values fall back to `CHROME`).
|
||||
- `WACLI_READONLY`: set to `1`, `true`, `yes`, or `on` to enable read-only mode.
|
||||
- `WACLI_SYNC_MAX_MESSAGES`: stop `auth` bootstrap sync or `sync` before storing more than this many total local messages.
|
||||
- `WACLI_SYNC_MAX_DB_SIZE`: stop `auth` bootstrap sync or `sync` when `wacli.db` plus SQLite sidecars reaches a size such as `500MB` or `2GB`.
|
||||
- `WACLI_STORE_DIR`: override the default store directory.
|
||||
|
||||
## Backfilling older history
|
||||
|
||||
@ -41,6 +41,10 @@ func newAuthCmd(flags *rootFlags) *cobra.Command {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maxMessages, maxDBSize, err := resolveSyncStorageLimits(syncStorageLimitFlags{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, stop := signalContext()
|
||||
defer stop()
|
||||
|
||||
@ -66,6 +70,9 @@ func newAuthCmd(flags *rootFlags) *cobra.Command {
|
||||
OnQRCode: authQRWriter(qrFormat, os.Stdout, os.Stderr),
|
||||
PairPhoneNumber: pairPhone,
|
||||
OnPairCode: authPairCodeWriter(pairPhone, os.Stderr),
|
||||
MaxMessages: maxMessages,
|
||||
MaxDBSizeBytes: maxDBSize,
|
||||
WarnNoLimits: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
77
cmd/wacli/storage_limits.go
Normal file
77
cmd/wacli/storage_limits.go
Normal file
@ -0,0 +1,77 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
envSyncMaxMessages = "WACLI_SYNC_MAX_MESSAGES"
|
||||
envSyncMaxDBSize = "WACLI_SYNC_MAX_DB_SIZE"
|
||||
)
|
||||
|
||||
type syncStorageLimitFlags struct {
|
||||
maxMessages int64
|
||||
maxDBSize string
|
||||
}
|
||||
|
||||
func resolveSyncStorageLimits(flags syncStorageLimitFlags) (int64, int64, error) {
|
||||
maxMessages := flags.maxMessages
|
||||
if maxMessages <= 0 {
|
||||
raw := strings.TrimSpace(os.Getenv(envSyncMaxMessages))
|
||||
if raw != "" {
|
||||
n, err := strconv.ParseInt(raw, 10, 64)
|
||||
if err != nil || n < 0 {
|
||||
return 0, 0, fmt.Errorf("%s must be a non-negative integer", envSyncMaxMessages)
|
||||
}
|
||||
maxMessages = n
|
||||
}
|
||||
}
|
||||
|
||||
maxDBSizeRaw := strings.TrimSpace(flags.maxDBSize)
|
||||
if maxDBSizeRaw == "" {
|
||||
maxDBSizeRaw = strings.TrimSpace(os.Getenv(envSyncMaxDBSize))
|
||||
}
|
||||
maxDBSize, err := parseByteSize(maxDBSizeRaw)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
return maxMessages, maxDBSize, nil
|
||||
}
|
||||
|
||||
func parseByteSize(raw string) (int64, error) {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" || raw == "0" {
|
||||
return 0, nil
|
||||
}
|
||||
s := strings.ToUpper(raw)
|
||||
multiplier := int64(1)
|
||||
for _, suffix := range []struct {
|
||||
s string
|
||||
m int64
|
||||
}{
|
||||
{"KIB", 1024},
|
||||
{"KB", 1024},
|
||||
{"K", 1024},
|
||||
{"MIB", 1024 * 1024},
|
||||
{"MB", 1024 * 1024},
|
||||
{"M", 1024 * 1024},
|
||||
{"GIB", 1024 * 1024 * 1024},
|
||||
{"GB", 1024 * 1024 * 1024},
|
||||
{"G", 1024 * 1024 * 1024},
|
||||
{"B", 1},
|
||||
} {
|
||||
if strings.HasSuffix(s, suffix.s) {
|
||||
multiplier = suffix.m
|
||||
s = strings.TrimSpace(strings.TrimSuffix(s, suffix.s))
|
||||
break
|
||||
}
|
||||
}
|
||||
value, err := strconv.ParseFloat(s, 64)
|
||||
if err != nil || value < 0 {
|
||||
return 0, fmt.Errorf("invalid byte size %q", raw)
|
||||
}
|
||||
return int64(value * float64(multiplier)), nil
|
||||
}
|
||||
66
cmd/wacli/storage_limits_test.go
Normal file
66
cmd/wacli/storage_limits_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
package main
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestParseByteSize(t *testing.T) {
|
||||
tests := map[string]int64{
|
||||
"": 0,
|
||||
"0": 0,
|
||||
"512": 512,
|
||||
"1kb": 1024,
|
||||
"2 MB": 2 * 1024 * 1024,
|
||||
"1.5GB": int64(1.5 * 1024 * 1024 * 1024),
|
||||
}
|
||||
for raw, want := range tests {
|
||||
got, err := parseByteSize(raw)
|
||||
if err != nil {
|
||||
t.Fatalf("parseByteSize(%q): %v", raw, err)
|
||||
}
|
||||
if got != want {
|
||||
t.Fatalf("parseByteSize(%q) = %d, want %d", raw, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseByteSizeRejectsInvalid(t *testing.T) {
|
||||
for _, raw := range []string{"abc", "-1", "1XB"} {
|
||||
if _, err := parseByteSize(raw); err == nil {
|
||||
t.Fatalf("parseByteSize(%q) expected error", raw)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveSyncStorageLimitsReadsEnv(t *testing.T) {
|
||||
t.Setenv(envSyncMaxMessages, "123")
|
||||
t.Setenv(envSyncMaxDBSize, "2MB")
|
||||
|
||||
maxMessages, maxDBSize, err := resolveSyncStorageLimits(syncStorageLimitFlags{})
|
||||
if err != nil {
|
||||
t.Fatalf("resolveSyncStorageLimits: %v", err)
|
||||
}
|
||||
if maxMessages != 123 {
|
||||
t.Fatalf("maxMessages = %d, want 123", maxMessages)
|
||||
}
|
||||
if maxDBSize != 2*1024*1024 {
|
||||
t.Fatalf("maxDBSize = %d, want 2MiB", maxDBSize)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResolveSyncStorageLimitsFlagsOverrideEnv(t *testing.T) {
|
||||
t.Setenv(envSyncMaxMessages, "123")
|
||||
t.Setenv(envSyncMaxDBSize, "2MB")
|
||||
|
||||
maxMessages, maxDBSize, err := resolveSyncStorageLimits(syncStorageLimitFlags{
|
||||
maxMessages: 5,
|
||||
maxDBSize: "4MB",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("resolveSyncStorageLimits: %v", err)
|
||||
}
|
||||
if maxMessages != 5 {
|
||||
t.Fatalf("maxMessages = %d, want 5", maxMessages)
|
||||
}
|
||||
if maxDBSize != 4*1024*1024 {
|
||||
t.Fatalf("maxDBSize = %d, want 4MiB", maxDBSize)
|
||||
}
|
||||
}
|
||||
@ -18,6 +18,7 @@ func newSyncCmd(flags *rootFlags) *cobra.Command {
|
||||
var downloadMedia bool
|
||||
var refreshContacts bool
|
||||
var refreshGroups bool
|
||||
var storage syncStorageLimitFlags
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "sync",
|
||||
@ -26,6 +27,10 @@ func newSyncCmd(flags *rootFlags) *cobra.Command {
|
||||
if err := flags.requireWritable(); err != nil {
|
||||
return err
|
||||
}
|
||||
maxMessages, maxDBSize, err := resolveSyncStorageLimits(storage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ctx, stop := signalContext()
|
||||
defer stop()
|
||||
|
||||
@ -56,6 +61,9 @@ func newSyncCmd(flags *rootFlags) *cobra.Command {
|
||||
RefreshGroups: refreshGroups,
|
||||
IdleExit: idleExit,
|
||||
MaxReconnect: maxReconnect,
|
||||
MaxMessages: maxMessages,
|
||||
MaxDBSizeBytes: maxDBSize,
|
||||
WarnNoLimits: true,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -79,5 +87,7 @@ func newSyncCmd(flags *rootFlags) *cobra.Command {
|
||||
cmd.Flags().BoolVar(&downloadMedia, "download-media", false, "download media in the background during sync")
|
||||
cmd.Flags().BoolVar(&refreshContacts, "refresh-contacts", false, "refresh contacts from session store into local DB")
|
||||
cmd.Flags().BoolVar(&refreshGroups, "refresh-groups", false, "refresh joined groups (live) into local DB")
|
||||
cmd.Flags().Int64Var(&storage.maxMessages, "max-messages", 0, "maximum total messages to keep in the local DB before sync stops (0 = unlimited, or WACLI_SYNC_MAX_MESSAGES)")
|
||||
cmd.Flags().StringVar(&storage.maxDBSize, "max-db-size", "", "maximum wacli.db disk usage before sync stops, e.g. 500MB or 2GB (default: WACLI_SYNC_MAX_DB_SIZE or unlimited)")
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@ wacli auth logout
|
||||
- `--qr-format text` prints the raw QR payload for external renderers.
|
||||
- `--phone PHONE` uses WhatsApp phone-number pairing instead of QR pairing.
|
||||
- After pairing, auth runs bootstrap sync until idle unless `--follow` is set.
|
||||
- Bootstrap sync honors `WACLI_SYNC_MAX_MESSAGES` and `WACLI_SYNC_MAX_DB_SIZE` to cap local history growth.
|
||||
- `auth status` reports whether the local store is authenticated.
|
||||
- `auth logout` invalidates the linked-device session and requires writable mode.
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@ Read when: you need the user-facing command map, global flags, store model, or l
|
||||
- Use `--full` to avoid table truncation.
|
||||
- Write commands acquire the store lock; use `--lock-wait DURATION` to wait.
|
||||
- Use `--read-only` or `WACLI_READONLY=1` to reject commands that write WhatsApp or local state.
|
||||
- Use `sync --max-messages`, `sync --max-db-size`, `WACLI_SYNC_MAX_MESSAGES`, or `WACLI_SYNC_MAX_DB_SIZE` to bound local history growth.
|
||||
|
||||
## Command pages
|
||||
|
||||
|
||||
@ -7,7 +7,7 @@ Read when: running continuous capture, one-shot sync, contact/group refresh, or
|
||||
## Command
|
||||
|
||||
```bash
|
||||
wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--download-media] [--refresh-contacts] [--refresh-groups]
|
||||
wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--max-messages N] [--max-db-size SIZE] [--download-media] [--refresh-contacts] [--refresh-groups]
|
||||
```
|
||||
|
||||
## Modes
|
||||
@ -16,15 +16,20 @@ wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--downloa
|
||||
- `--once` exits after sync becomes idle.
|
||||
- `--idle-exit` controls idle exit timing in once mode.
|
||||
- `--max-reconnect 0` keeps reconnecting indefinitely.
|
||||
- `--max-messages N` stops before storing more than `N` total messages locally.
|
||||
- `--max-db-size SIZE` stops when `wacli.db` plus SQLite sidecars reaches `SIZE` (`500MB`, `2GB`, etc.).
|
||||
- `--download-media` runs a bounded media downloader for sync events.
|
||||
- `--refresh-contacts` imports contacts from the session store.
|
||||
- `--refresh-groups` fetches joined groups live and updates the local DB.
|
||||
- If neither storage cap is configured, sync prints one warning because WhatsApp history can grow the local database substantially.
|
||||
- `WACLI_SYNC_MAX_MESSAGES` and `WACLI_SYNC_MAX_DB_SIZE` apply the same caps to `auth` bootstrap sync and `sync`.
|
||||
|
||||
## Examples
|
||||
|
||||
```bash
|
||||
wacli sync --once
|
||||
wacli sync --follow --max-reconnect 10m
|
||||
wacli sync --follow --max-messages 250000 --max-db-size 2GB
|
||||
wacli sync --once --refresh-contacts --refresh-groups
|
||||
wacli sync --follow --download-media
|
||||
```
|
||||
|
||||
@ -3,6 +3,8 @@ package app
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -32,7 +34,10 @@ type SyncOptions struct {
|
||||
RefreshGroups bool
|
||||
IdleExit time.Duration // only used for bootstrap/once
|
||||
MaxReconnect time.Duration // max time to attempt reconnection before giving up (0 = unlimited)
|
||||
Verbosity int // future
|
||||
MaxMessages int64 // 0 = unlimited
|
||||
MaxDBSizeBytes int64 // 0 = unlimited
|
||||
WarnNoLimits bool
|
||||
Verbosity int // future
|
||||
}
|
||||
|
||||
type SyncResult struct {
|
||||
@ -46,6 +51,16 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
|
||||
if (opts.Mode == SyncModeBootstrap || opts.Mode == SyncModeOnce) && opts.IdleExit <= 0 {
|
||||
opts.IdleExit = 30 * time.Second
|
||||
}
|
||||
if opts.WarnNoLimits && opts.MaxMessages <= 0 && opts.MaxDBSizeBytes <= 0 {
|
||||
fmt.Fprintln(os.Stderr, "warning: sync storage is uncapped; use --max-messages or --max-db-size to bound local history growth")
|
||||
}
|
||||
if err := a.checkSyncStorageLimits(opts); err != nil {
|
||||
return SyncResult{}, err
|
||||
}
|
||||
|
||||
syncCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
limits := &syncStorageLimits{app: a, opts: opts, cancel: cancel}
|
||||
|
||||
if err := a.OpenWA(); err != nil {
|
||||
return SyncResult{}, err
|
||||
@ -62,22 +77,22 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
|
||||
enqueueMedia := func(chatJID, msgID string) {}
|
||||
if opts.DownloadMedia {
|
||||
mediaJobs = make(chan mediaJob, 512)
|
||||
enqueueMedia = newMediaEnqueuer(ctx, mediaJobs)
|
||||
enqueueMedia = newMediaEnqueuer(syncCtx, mediaJobs)
|
||||
}
|
||||
|
||||
handlerID := a.addSyncEventHandler(ctx, opts, &messagesStored, &lastEvent, disconnected, enqueueMedia)
|
||||
handlerID := a.addSyncEventHandler(syncCtx, opts, &messagesStored, &lastEvent, disconnected, enqueueMedia, limits)
|
||||
defer a.wa.RemoveEventHandler(handlerID)
|
||||
|
||||
if opts.DownloadMedia {
|
||||
var err error
|
||||
stopMedia, err = a.runMediaWorkers(ctx, mediaJobs, 4)
|
||||
stopMedia, err = a.runMediaWorkers(syncCtx, mediaJobs, 4)
|
||||
if err != nil {
|
||||
return SyncResult{}, err
|
||||
}
|
||||
defer stopMedia()
|
||||
}
|
||||
|
||||
if err := a.wa.Connect(ctx, wa.ConnectOptions{
|
||||
if err := a.wa.Connect(syncCtx, wa.ConnectOptions{
|
||||
AllowQR: opts.AllowQR,
|
||||
OnQRCode: opts.OnQRCode,
|
||||
PairPhoneNumber: opts.PairPhoneNumber,
|
||||
@ -89,22 +104,77 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
|
||||
|
||||
// Optional: bootstrap imports (helps contacts/groups management without waiting for events).
|
||||
if opts.RefreshContacts {
|
||||
_ = a.refreshContacts(ctx)
|
||||
_ = a.refreshContacts(syncCtx)
|
||||
}
|
||||
if opts.RefreshGroups {
|
||||
_ = a.refreshGroups(ctx)
|
||||
_ = a.refreshGroups(syncCtx)
|
||||
}
|
||||
if opts.AfterConnect != nil {
|
||||
if err := opts.AfterConnect(ctx); err != nil {
|
||||
if err := opts.AfterConnect(syncCtx); err != nil {
|
||||
return SyncResult{MessagesStored: messagesStored.Load()}, err
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
if opts.Mode == SyncModeFollow {
|
||||
return a.runSyncFollow(ctx, opts.MaxReconnect, &messagesStored, disconnected)
|
||||
_, err = a.runSyncFollow(syncCtx, opts.MaxReconnect, &messagesStored, disconnected)
|
||||
} else {
|
||||
_, err = a.runSyncUntilIdle(syncCtx, opts.IdleExit, opts.MaxReconnect, &messagesStored, &lastEvent, disconnected)
|
||||
}
|
||||
if limitErr := limits.Err(); limitErr != nil {
|
||||
return SyncResult{MessagesStored: messagesStored.Load()}, limitErr
|
||||
}
|
||||
if err != nil {
|
||||
return SyncResult{MessagesStored: messagesStored.Load()}, err
|
||||
}
|
||||
return SyncResult{MessagesStored: messagesStored.Load()}, nil
|
||||
}
|
||||
|
||||
return a.runSyncUntilIdle(ctx, opts.IdleExit, opts.MaxReconnect, &messagesStored, &lastEvent, disconnected)
|
||||
func (a *App) checkSyncStorageLimits(opts SyncOptions) error {
|
||||
if opts.MaxMessages > 0 {
|
||||
count, err := a.db.CountMessages()
|
||||
if err != nil {
|
||||
return fmt.Errorf("check message limit: %w", err)
|
||||
}
|
||||
if count >= opts.MaxMessages {
|
||||
return syncStorageLimitError("message", count, opts.MaxMessages)
|
||||
}
|
||||
}
|
||||
if opts.MaxDBSizeBytes > 0 {
|
||||
size, err := a.dbDiskSize()
|
||||
if err != nil {
|
||||
return fmt.Errorf("check database size limit: %w", err)
|
||||
}
|
||||
if size >= opts.MaxDBSizeBytes {
|
||||
return syncStorageLimitError("database size", size, opts.MaxDBSizeBytes)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) dbDiskSize() (int64, error) {
|
||||
var total int64
|
||||
for _, path := range []string{
|
||||
filepath.Join(a.opts.StoreDir, "wacli.db"),
|
||||
filepath.Join(a.opts.StoreDir, "wacli.db-wal"),
|
||||
filepath.Join(a.opts.StoreDir, "wacli.db-shm"),
|
||||
} {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
continue
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
total += info.Size()
|
||||
}
|
||||
}
|
||||
return total, nil
|
||||
}
|
||||
|
||||
func syncStorageLimitError(kind string, got, limit int64) error {
|
||||
return fmt.Errorf("sync storage limit reached: %s is %d, limit is %d", kind, got, limit)
|
||||
}
|
||||
|
||||
func chatKind(chat types.JID) string {
|
||||
|
||||
@ -25,7 +25,7 @@ func newMediaEnqueuer(ctx context.Context, jobs chan<- mediaJob) func(chatJID, m
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, messagesStored, lastEvent *atomic.Int64, disconnected chan<- struct{}, enqueueMedia func(string, string)) uint32 {
|
||||
func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, messagesStored, lastEvent *atomic.Int64, disconnected chan<- struct{}, enqueueMedia func(string, string), limits *syncStorageLimits) uint32 {
|
||||
var panicCount atomic.Int64
|
||||
return a.wa.AddEventHandler(func(evt interface{}) {
|
||||
// Recover from panics so unexpected message structures do not crash the
|
||||
@ -40,10 +40,10 @@ func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, message
|
||||
switch v := evt.(type) {
|
||||
case *events.Message:
|
||||
lastEvent.Store(nowUTC().UnixNano())
|
||||
a.handleLiveSyncMessage(ctx, opts, v, messagesStored, enqueueMedia)
|
||||
a.handleLiveSyncMessage(ctx, opts, v, messagesStored, enqueueMedia, limits)
|
||||
case *events.HistorySync:
|
||||
lastEvent.Store(nowUTC().UnixNano())
|
||||
a.handleHistorySync(ctx, opts, v, messagesStored, lastEvent, enqueueMedia)
|
||||
a.handleHistorySync(ctx, opts, v, messagesStored, lastEvent, enqueueMedia, limits)
|
||||
case *events.Connected:
|
||||
fmt.Fprintln(os.Stderr, "\nConnected.")
|
||||
case *events.Disconnected:
|
||||
@ -56,7 +56,7 @@ func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, message
|
||||
})
|
||||
}
|
||||
|
||||
func (a *App) handleLiveSyncMessage(ctx context.Context, opts SyncOptions, v *events.Message, messagesStored *atomic.Int64, enqueueMedia func(string, string)) {
|
||||
func (a *App) handleLiveSyncMessage(ctx context.Context, opts SyncOptions, v *events.Message, messagesStored *atomic.Int64, enqueueMedia func(string, string), limits ...*syncStorageLimits) {
|
||||
if historySyncNotificationFromMessage(v) != nil {
|
||||
return
|
||||
}
|
||||
@ -64,7 +64,7 @@ func (a *App) handleLiveSyncMessage(ctx context.Context, opts SyncOptions, v *ev
|
||||
if pm.ReactionToID != "" && pm.ReactionEmoji == "" && v.Message != nil && v.Message.GetEncReactionMessage() != nil {
|
||||
a.decryptEncryptedReaction(ctx, &pm, v)
|
||||
}
|
||||
if err := a.storeParsedMessage(ctx, pm); err == nil {
|
||||
if err := a.storeParsedMessageForSync(ctx, pm, limits...); err == nil {
|
||||
messagesStored.Add(1)
|
||||
}
|
||||
if opts.DownloadMedia && pm.Media != nil && pm.ID != "" {
|
||||
@ -82,7 +82,7 @@ func historySyncNotificationFromMessage(v *events.Message) *waE2E.HistorySyncNot
|
||||
return v.Message.GetProtocolMessage().GetHistorySyncNotification()
|
||||
}
|
||||
|
||||
func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events.HistorySync, messagesStored, lastEvent *atomic.Int64, enqueueMedia func(string, string)) {
|
||||
func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events.HistorySync, messagesStored, lastEvent *atomic.Int64, enqueueMedia func(string, string), limits ...*syncStorageLimits) {
|
||||
fmt.Fprintf(os.Stderr, "\nProcessing history sync (%d conversations)...\n", len(v.Data.Conversations))
|
||||
for _, conv := range v.Data.Conversations {
|
||||
lastEvent.Store(nowUTC().UnixNano())
|
||||
@ -107,8 +107,10 @@ func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events
|
||||
a.decryptEncryptedReaction(ctx, &pm, evt)
|
||||
}
|
||||
}
|
||||
if err := a.storeParsedMessage(ctx, pm); err == nil {
|
||||
if err := a.storeParsedMessageForSync(ctx, pm, limits...); err == nil {
|
||||
messagesStored.Add(1)
|
||||
} else if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if opts.DownloadMedia && pm.Media != nil && pm.ID != "" {
|
||||
enqueueMedia(pm.Chat.String(), pm.ID)
|
||||
@ -118,6 +120,13 @@ func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events
|
||||
fmt.Fprintf(os.Stderr, "\rSynced %d messages...", messagesStored.Load())
|
||||
}
|
||||
|
||||
func (a *App) storeParsedMessageForSync(ctx context.Context, pm wa.ParsedMessage, limits ...*syncStorageLimits) error {
|
||||
if len(limits) > 0 && limits[0] != nil {
|
||||
return limits[0].StoreParsedMessage(ctx, pm)
|
||||
}
|
||||
return a.storeParsedMessage(ctx, pm)
|
||||
}
|
||||
|
||||
func (a *App) decryptEncryptedReaction(ctx context.Context, pm *wa.ParsedMessage, msg *events.Message) {
|
||||
reaction, err := a.wa.DecryptReaction(ctx, msg)
|
||||
if err != nil {
|
||||
|
||||
57
internal/app/sync_limits.go
Normal file
57
internal/app/sync_limits.go
Normal file
@ -0,0 +1,57 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/steipete/wacli/internal/wa"
|
||||
)
|
||||
|
||||
type syncStorageLimits struct {
|
||||
app *App
|
||||
opts SyncOptions
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func (l *syncStorageLimits) StoreParsedMessage(ctx context.Context, pm wa.ParsedMessage) error {
|
||||
if l == nil || (l.opts.MaxMessages <= 0 && l.opts.MaxDBSizeBytes <= 0) {
|
||||
return l.app.storeParsedMessage(ctx, pm)
|
||||
}
|
||||
if err := l.app.checkSyncStorageLimits(l.opts); err != nil {
|
||||
l.setErr(err)
|
||||
return err
|
||||
}
|
||||
if err := l.app.storeParsedMessage(ctx, pm); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := l.app.checkSyncStorageLimits(l.opts); err != nil {
|
||||
l.setErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *syncStorageLimits) Err() error {
|
||||
if l == nil {
|
||||
return nil
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.err
|
||||
}
|
||||
|
||||
func (l *syncStorageLimits) setErr(err error) {
|
||||
if l == nil || err == nil {
|
||||
return
|
||||
}
|
||||
l.mu.Lock()
|
||||
if l.err == nil {
|
||||
l.err = err
|
||||
if l.cancel != nil {
|
||||
l.cancel()
|
||||
}
|
||||
}
|
||||
l.mu.Unlock()
|
||||
}
|
||||
106
internal/app/sync_limits_test.go
Normal file
106
internal/app/sync_limits_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
waProto "go.mau.fi/whatsmeow/binary/proto"
|
||||
"go.mau.fi/whatsmeow/proto/waCommon"
|
||||
"go.mau.fi/whatsmeow/proto/waHistorySync"
|
||||
"go.mau.fi/whatsmeow/proto/waWeb"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
"go.mau.fi/whatsmeow/types/events"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestSyncStopsAtMaxMessages(t *testing.T) {
|
||||
a := newTestApp(t)
|
||||
f := newFakeWA()
|
||||
a.wa = f
|
||||
|
||||
chat := types.JID{User: "123", Server: types.DefaultUserServer}
|
||||
base := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
f.connectEvents = []interface{}{historySyncWithTextMessages(chat, base, "m1", "m2", "m3")}
|
||||
|
||||
res, err := a.Sync(context.Background(), SyncOptions{
|
||||
Mode: SyncModeFollow,
|
||||
AllowQR: false,
|
||||
MaxMessages: 2,
|
||||
})
|
||||
if err == nil || !strings.Contains(err.Error(), "sync storage limit reached: message is 2, limit is 2") {
|
||||
t.Fatalf("Sync error = %v", err)
|
||||
}
|
||||
if res.MessagesStored != 2 {
|
||||
t.Fatalf("MessagesStored = %d, want 2", res.MessagesStored)
|
||||
}
|
||||
if n, err := a.db.CountMessages(); err != nil || n != 2 {
|
||||
t.Fatalf("db messages = %d, err=%v; want 2", n, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncRejectsExistingDBOverSizeLimit(t *testing.T) {
|
||||
a := newTestApp(t)
|
||||
f := newFakeWA()
|
||||
a.wa = f
|
||||
|
||||
_, err := a.Sync(context.Background(), SyncOptions{
|
||||
Mode: SyncModeOnce,
|
||||
AllowQR: false,
|
||||
MaxDBSizeBytes: 1,
|
||||
})
|
||||
if err == nil || !strings.Contains(err.Error(), "sync storage limit reached: database size") {
|
||||
t.Fatalf("Sync error = %v", err)
|
||||
}
|
||||
if f.IsConnected() {
|
||||
t.Fatal("sync connected before rejecting oversized DB")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncWarnsWhenStorageUncapped(t *testing.T) {
|
||||
a := newTestApp(t)
|
||||
f := newFakeWA()
|
||||
a.wa = f
|
||||
|
||||
out := captureStderr(t, func() {
|
||||
_, err := a.Sync(context.Background(), SyncOptions{
|
||||
Mode: SyncModeOnce,
|
||||
AllowQR: false,
|
||||
IdleExit: time.Millisecond,
|
||||
WarnNoLimits: true,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Sync: %v", err)
|
||||
}
|
||||
})
|
||||
if !strings.Contains(out, "warning: sync storage is uncapped") {
|
||||
t.Fatalf("stderr = %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
func historySyncWithTextMessages(chat types.JID, start time.Time, ids ...string) *events.HistorySync {
|
||||
msgs := make([]*waHistorySync.HistorySyncMsg, 0, len(ids))
|
||||
for i, id := range ids {
|
||||
msgs = append(msgs, &waHistorySync.HistorySyncMsg{
|
||||
Message: &waWeb.WebMessageInfo{
|
||||
Key: &waCommon.MessageKey{
|
||||
RemoteJID: proto.String(chat.String()),
|
||||
FromMe: proto.Bool(false),
|
||||
ID: proto.String(id),
|
||||
},
|
||||
MessageTimestamp: proto.Uint64(uint64(start.Add(time.Duration(i) * time.Second).Unix())),
|
||||
Message: &waProto.Message{Conversation: proto.String("text " + id)},
|
||||
},
|
||||
})
|
||||
}
|
||||
return &events.HistorySync{
|
||||
Data: &waHistorySync.HistorySync{
|
||||
SyncType: waHistorySync.HistorySync_FULL.Enum(),
|
||||
Conversations: []*waHistorySync.Conversation{{
|
||||
ID: proto.String(chat.String()),
|
||||
Messages: msgs,
|
||||
}},
|
||||
},
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user