feat(cli): add NDJSON lifecycle events

Adds a global --events flag for machine-readable lifecycle telemetry on auth, sync, and history backfill. Keeps stderr parseable as NDJSON, including progress, idle, reconnect, warning, and interrupt-signal paths.

Co-authored-by: Dinakar Sarbada <dinakars777@users.noreply.github.com>
Co-authored-by: acxtrilla <cazz9584@gmail.com>
This commit is contained in:
Dinakar Sarbada 2026-05-04 23:50:14 -07:00 committed by GitHub
parent 03e53644f9
commit 108da989f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 465 additions and 58 deletions

View File

@ -6,6 +6,7 @@
- CLI: add `--read-only`/`WACLI_READONLY` to reject commands that write WhatsApp or the local store.
- CLI: add `--lock-wait` to wait for transient store locks before failing write commands.
- CLI: add `--events` to emit machine-readable NDJSON lifecycle events for `auth`, `sync`, and `history backfill`. (#204 — thanks @dinakars777 and @0xatrilla)
- CLI: add `--full` to disable table truncation; piped output now keeps full message IDs. (#13 — thanks @rickhallett)
- CLI: add `presence typing` and `presence paused` commands for WhatsApp composing indicators. (#76 — thanks @redemerco)
- Diagnostics: show linked JID and local store counts in `auth status` and `doctor`. (#149 — thanks @draix)

View File

@ -205,6 +205,7 @@ Global flags:
- `--store DIR`: store directory.
- `--json`: JSON output.
- `--events`: emit machine-readable NDJSON lifecycle events on stderr for long-running commands, including interrupt signals.
- `--full`: disable table truncation.
- `--timeout DURATION`: timeout for non-sync commands.
- `--lock-wait DURATION`: wait for the store lock before failing write commands.

View File

@ -45,7 +45,7 @@ func newAuthCmd(flags *rootFlags) *cobra.Command {
if err != nil {
return err
}
ctx, stop := signalContext()
ctx, stop := signalContextWithEvents(out.NewEventWriter(os.Stderr, flags.events))
defer stop()
a, lk, err := newApp(ctx, flags, true, true)
@ -59,7 +59,11 @@ func newAuthCmd(flags *rootFlags) *cobra.Command {
mode = appPkg.SyncModeFollow
}
fmt.Fprintln(os.Stderr, "Starting authentication…")
if a.Events().Enabled() {
_ = a.Events().Emit("auth_starting", nil)
} else {
fmt.Fprintln(os.Stderr, "Starting authentication…")
}
res, err := a.Sync(ctx, appPkg.SyncOptions{
Mode: mode,
AllowQR: true,
@ -67,9 +71,9 @@ func newAuthCmd(flags *rootFlags) *cobra.Command {
RefreshContacts: true,
RefreshGroups: true,
IdleExit: idleExit,
OnQRCode: authQRWriter(qrFormat, os.Stdout, os.Stderr),
OnQRCode: authQRWriter(qrFormat, os.Stdout, os.Stderr, a.Events()),
PairPhoneNumber: pairPhone,
OnPairCode: authPairCodeWriter(pairPhone, os.Stderr),
OnPairCode: authPairCodeWriter(pairPhone, os.Stderr, a.Events()),
MaxMessages: maxMessages,
MaxDBSizeBytes: maxDBSize,
WarnNoLimits: true,
@ -130,24 +134,35 @@ func normalizeAuthQRFormat(format string) (string, error) {
}
}
func authQRWriter(format string, stdout, stderr io.Writer) func(string) {
func authQRWriter(format string, stdout, stderr io.Writer, events *out.EventWriter) func(string) {
if format == "text" {
return func(code string) {
if events.Enabled() {
_ = events.Emit("qr_code", map[string]any{"code": code})
}
fmt.Fprintln(stdout, code)
}
}
return func(code string) {
if events.Enabled() {
_ = events.Emit("qr_code", map[string]any{"code": code})
return
}
fmt.Fprintln(stderr, "\nScan this QR code with WhatsApp (Linked Devices):")
qrterminal.GenerateHalfBlock(code, qrterminal.M, stderr)
fmt.Fprintln(stderr)
}
}
func authPairCodeWriter(phone string, stderr io.Writer) func(string) {
func authPairCodeWriter(phone string, stderr io.Writer, events *out.EventWriter) func(string) {
if phone == "" {
return nil
}
return func(code string) {
if events.Enabled() {
_ = events.Emit("pair_code", map[string]any{"phone": phone, "code": code})
return
}
fmt.Fprintf(stderr, "\nPairing code for +%s: %s\n", phone, code)
fmt.Fprintln(stderr, "On your phone: WhatsApp > Linked Devices > Link a Device > Link with phone number.")
fmt.Fprintln(stderr, "Enter the code above and keep this command running until authentication completes.")

View File

@ -81,7 +81,7 @@ func TestNormalizeAuthQRFormat(t *testing.T) {
func TestAuthQRWriterText(t *testing.T) {
var stdout, stderr bytes.Buffer
authQRWriter("text", &stdout, &stderr)("2@test-code")
authQRWriter("text", &stdout, &stderr, nil)("2@test-code")
if got := strings.TrimSpace(stdout.String()); got != "2@test-code" {
t.Fatalf("stdout = %q", got)
}
@ -121,7 +121,7 @@ func TestNormalizePairPhone(t *testing.T) {
func TestAuthPairCodeWriter(t *testing.T) {
var stderr bytes.Buffer
writer := authPairCodeWriter("15551234567", &stderr)
writer := authPairCodeWriter("15551234567", &stderr, nil)
if writer == nil {
t.Fatal("expected writer")
}
@ -130,7 +130,7 @@ func TestAuthPairCodeWriter(t *testing.T) {
if !strings.Contains(got, "Pairing code for +15551234567: ABCD-1234") {
t.Fatalf("stderr = %q", got)
}
if authPairCodeWriter("", &stderr) != nil {
if authPairCodeWriter("", &stderr, nil) != nil {
t.Fatal("expected nil writer without phone")
}
}

View File

@ -37,7 +37,7 @@ func newHistoryBackfillCmd(flags *rootFlags) *cobra.Command {
return err
}
ctx, stop := signalContext()
ctx, stop := signalContextWithEvents(out.NewEventWriter(os.Stderr, flags.events))
defer stop()
a, lk, err := newApp(ctx, flags, true, false)

View File

@ -22,6 +22,7 @@ type rootFlags struct {
storeDir string
asJSON bool
fullOutput bool
events bool
timeout time.Duration
readOnly bool
lockWait time.Duration
@ -41,6 +42,7 @@ func execute(args []string) error {
rootCmd.PersistentFlags().StringVar(&flags.storeDir, "store", "", "store directory (default: $WACLI_STORE_DIR, XDG state dir on Linux, or ~/.wacli)")
rootCmd.PersistentFlags().BoolVar(&flags.asJSON, "json", false, "output JSON instead of human-readable text")
rootCmd.PersistentFlags().BoolVar(&flags.fullOutput, "full", false, "disable truncation in table output")
rootCmd.PersistentFlags().BoolVar(&flags.events, "events", false, "emit machine-readable NDJSON lifecycle events on stderr")
rootCmd.PersistentFlags().DurationVar(&flags.timeout, "timeout", 5*time.Minute, "command timeout (non-sync commands)")
rootCmd.PersistentFlags().DurationVar(&flags.lockWait, "lock-wait", 0, "wait for the store lock before failing (write commands)")
rootCmd.PersistentFlags().BoolVar(&flags.readOnly, "read-only", false, "reject commands that intentionally write WhatsApp or the local store (or set WACLI_READONLY=1)")
@ -83,6 +85,7 @@ func newApp(ctx context.Context, flags *rootFlags, needLock bool, allowUnauthed
StoreDir: storeDir,
Version: version,
JSON: flags.asJSON,
Events: out.NewEventWriter(os.Stderr, flags.events),
AllowUnauthed: allowUnauthed,
})
if err != nil {

View File

@ -6,29 +6,47 @@ import (
"os"
"os/signal"
"syscall"
"github.com/steipete/wacli/internal/out"
)
// signalContext returns a context that is cancelled on the first SIGINT/SIGTERM.
// A second signal force-kills the process so that a stuck cleanup never leaves
// the user unable to get their terminal back.
func signalContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
return signalContextWithEvents(nil)
}
func signalContextWithEvents(events *out.EventWriter) (context.Context, context.CancelFunc) {
sigCh := make(chan os.Signal, 2)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
return signalContextForChannel(events, sigCh, func() { signal.Stop(sigCh) }, os.Exit)
}
func signalContextForChannel(events *out.EventWriter, sigCh <-chan os.Signal, stopNotify func(), forceExit func(int)) (context.Context, context.CancelFunc) {
ctx, ctxCancel := context.WithCancel(context.Background())
go func() {
<-sigCh
fmt.Fprintln(os.Stderr, "\nShutting down (interrupt again to force quit)...")
cancel()
sig := <-sigCh
if events.Enabled() {
_ = events.Emit("signal", map[string]any{"signal": sig.String(), "action": "shutdown"})
} else {
fmt.Fprintln(os.Stderr, "\nShutting down (interrupt again to force quit)...")
}
ctxCancel()
<-sigCh
fmt.Fprintln(os.Stderr, "Force quit.")
os.Exit(1)
sig = <-sigCh
if events.Enabled() {
_ = events.Emit("signal", map[string]any{"signal": sig.String(), "action": "force_quit"})
} else {
fmt.Fprintln(os.Stderr, "Force quit.")
}
forceExit(1)
}()
return ctx, func() {
signal.Stop(sigCh)
cancel()
if stopNotify != nil {
stopNotify()
}
ctxCancel()
}
}

73
cmd/wacli/signal_test.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"bytes"
"context"
"encoding/json"
"os"
"strings"
"syscall"
"testing"
"time"
"github.com/steipete/wacli/internal/out"
)
func TestSignalContextWithEventsKeepsStderrNDJSON(t *testing.T) {
var stderr bytes.Buffer
exits := make(chan int, 1)
sigCh := make(chan os.Signal, 2)
ctx, stop := signalContextForChannel(out.NewEventWriter(&stderr, true), sigCh, nil, func(code int) {
exits <- code
})
defer stop()
sigCh <- os.Interrupt
select {
case <-ctx.Done():
case <-time.After(time.Second):
t.Fatal("context was not canceled after first signal")
}
sigCh <- syscall.SIGTERM
select {
case code := <-exits:
if code != 1 {
t.Fatalf("exit code = %d, want 1", code)
}
case <-time.After(time.Second):
t.Fatal("force-exit callback was not called after second signal")
}
raw := stderr.String()
if strings.Contains(raw, "Shutting down") || strings.Contains(raw, "Force quit") {
t.Fatalf("human signal text leaked into --events stderr:\n%s", raw)
}
var sawShutdown, sawForceQuit bool
for _, line := range strings.Split(strings.TrimSpace(raw), "\n") {
var evt struct {
Event string `json:"event"`
Data map[string]any `json:"data"`
}
if err := json.Unmarshal([]byte(line), &evt); err != nil {
t.Fatalf("signal line is not JSON %q: %v", line, err)
}
if evt.Event != "signal" {
t.Fatalf("event = %q, want signal", evt.Event)
}
switch evt.Data["action"] {
case "shutdown":
sawShutdown = true
case "force_quit":
sawForceQuit = true
}
}
if !sawShutdown || !sawForceQuit {
t.Fatalf("missing signal events shutdown=%v force_quit=%v in:\n%s", sawShutdown, sawForceQuit, raw)
}
if err := ctx.Err(); err != context.Canceled {
t.Fatalf("ctx.Err() = %v, want context.Canceled", err)
}
}

View File

@ -32,7 +32,7 @@ func newSyncCmd(flags *rootFlags) *cobra.Command {
if err != nil {
return err
}
ctx, stop := signalContext()
ctx, stop := signalContextWithEvents(out.NewEventWriter(os.Stderr, flags.events))
defer stop()
a, lk, err := newApp(ctx, flags, true, false)

View File

@ -7,7 +7,7 @@ Read when: pairing a store, checking auth state, logging out, or choosing QR vs
## Commands
```bash
wacli auth [--follow] [--idle-exit 30s] [--download-media] [--qr-format terminal|text] [--phone PHONE]
wacli auth [--follow] [--idle-exit 30s] [--download-media] [--qr-format terminal|text] [--phone PHONE] [--events]
wacli auth status
wacli auth logout
```
@ -20,6 +20,7 @@ wacli auth logout
- Transient websocket drops before pairing completes are retried with a fresh QR/code.
- After pairing, auth runs bootstrap sync until idle unless `--follow` is set.
- Bootstrap sync honors `WACLI_SYNC_MAX_MESSAGES` and `WACLI_SYNC_MAX_DB_SIZE` to cap local history growth.
- `--events` emits NDJSON lifecycle events on stderr, including raw QR and phone-pairing codes for external renderers.
- `auth status` reports whether the local store is authenticated.
- `auth logout` invalidates the linked-device session and requires writable mode.

View File

@ -7,7 +7,7 @@ Read when: trying to fetch older messages for a known chat.
## Command
```bash
wacli history backfill --chat JID [--count 50] [--requests N] [--wait 1m] [--idle-exit 5s]
wacli history backfill --chat JID [--count 50] [--requests N] [--wait 1m] [--idle-exit 5s] [--events]
```
## Limits
@ -17,6 +17,7 @@ wacli history backfill --chat JID [--count 50] [--requests N] [--wait 1m] [--idl
- Requests are per chat.
- The anchor is the oldest locally stored message in that chat.
- Automatic initial history-sync blob downloads are disabled during backfill; only on-demand responses are processed.
- `--events` emits NDJSON request/response/stop lifecycle events on stderr.
## Examples

View File

@ -7,7 +7,7 @@ Read when: running continuous capture, one-shot sync, contact/group refresh, or
## Command
```bash
wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--max-messages N] [--max-db-size SIZE] [--download-media] [--refresh-contacts] [--refresh-groups]
wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--max-messages N] [--max-db-size SIZE] [--download-media] [--refresh-contacts] [--refresh-groups] [--events]
```
## Modes
@ -25,6 +25,7 @@ wacli sync [--once] [--follow] [--idle-exit 30s] [--max-reconnect 5m] [--max-mes
- `WACLI_SYNC_MAX_MESSAGES` and `WACLI_SYNC_MAX_DB_SIZE` apply the same caps to `auth` bootstrap sync and `sync`.
- While `sync --follow` is running, `send text`, `send file`, `send voice`, and `send react` commands for the same store are delegated to the running sync process so they do not fail on the store lock.
- If whatsmeow reports an app-state LTHash mismatch, sync asks the primary device for the official recovery snapshot once for that app-state collection. If recovery also fails, the warning is printed and sync keeps handling normal message/history events.
- `--events` emits one NDJSON lifecycle event per stderr line for machine consumers. Routine human progress/status lines and interrupt prompts are suppressed while events are enabled.
## Examples
@ -34,4 +35,5 @@ wacli sync --follow --max-reconnect 10m
wacli sync --follow --max-messages 250000 --max-db-size 2GB
wacli sync --once --refresh-contacts --refresh-groups
wacli sync --follow --download-media
wacli sync --once --events 2>events.ndjson
```

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/steipete/wacli/internal/fsutil"
"github.com/steipete/wacli/internal/out"
"github.com/steipete/wacli/internal/store"
"github.com/steipete/wacli/internal/wa"
"go.mau.fi/whatsmeow"
@ -66,6 +67,7 @@ type Options struct {
StoreDir string
Version string
JSON bool
Events *out.EventWriter
AllowUnauthed bool
}
@ -139,7 +141,10 @@ func (a *App) WA() WAClient {
defer a.waMu.Unlock()
return a.wa
}
func (a *App) DB() *store.DB { return a.db }
func (a *App) DB() *store.DB { return a.db }
func (a *App) Events() *out.EventWriter {
return a.opts.Events
}
func (a *App) StoreDir() string { return a.opts.StoreDir }
func (a *App) Version() string { return a.opts.Version }
func (a *App) AllowUnauthed() bool { return a.opts.AllowUnauthed }

View File

@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
@ -114,7 +113,11 @@ func (a *App) BackfillHistory(ctx context.Context, opts BackfillOptions) (Backfi
}
data, err := a.wa.DownloadHistorySync(ctx, notif)
if err != nil {
fmt.Fprintf(os.Stderr, "\rwarning: failed to download on-demand history sync: %v\n", err)
a.emitWarning(
"on_demand_history_download_failed",
fmt.Sprintf("warning: failed to download on-demand history sync: %v", err),
map[string]any{"error": err.Error()},
)
return
}
if data.GetSyncType() != waHistorySync.HistorySync_ON_DEMAND {
@ -159,7 +162,11 @@ func (a *App) BackfillHistory(ctx context.Context, opts BackfillOptions) (Backfi
mu.Unlock()
requestsSent++
fmt.Fprintf(os.Stderr, "Requesting %d older messages for %s...\n", opts.Count, chatStr)
a.emitOrPrint("backfill_requesting", map[string]any{
"chat_jid": chatStr,
"count": opts.Count,
"request": requestsSent,
}, "Requesting %d older messages for %s...\n", opts.Count, chatStr)
if _, err := a.wa.RequestHistorySyncOnDemand(ctx, reqInfo, opts.Count); err != nil {
return err
}
@ -180,19 +187,33 @@ func (a *App) BackfillHistory(ctx context.Context, opts BackfillOptions) (Backfi
}
mu.Unlock()
fmt.Fprintf(os.Stderr, "On-demand history sync: %d conversations, %d messages.\n", resp.conversations, resp.messages)
a.emitOrPrint("backfill_response", map[string]any{
"chat_jid": chatStr,
"conversations": resp.conversations,
"messages": resp.messages,
"responses_seen": responsesSeen,
}, "On-demand history sync: %d conversations, %d messages.\n", resp.conversations, resp.messages)
newOldest, err := a.db.GetOldestMessageInfo(chatStr)
if err == nil && newOldest.MsgID == oldest.MsgID {
fmt.Fprintln(os.Stderr, "No older messages were added (stopping).")
a.emitOrPrint("backfill_stopped", map[string]any{
"chat_jid": chatStr,
"reason": "no_older_messages_added",
}, "No older messages were added (stopping).\n")
return nil
}
if resp.messages <= 0 {
fmt.Fprintln(os.Stderr, "No messages returned (stopping).")
a.emitOrPrint("backfill_stopped", map[string]any{
"chat_jid": chatStr,
"reason": "no_messages_returned",
}, "No messages returned (stopping).\n")
return nil
}
if resp.endType == waHistorySync.Conversation_COMPLETE_AND_NO_MORE_MESSAGE_REMAIN_ON_PRIMARY {
fmt.Fprintln(os.Stderr, "Reached start of chat history (stopping).")
a.emitOrPrint("backfill_stopped", map[string]any{
"chat_jid": chatStr,
"reason": "start_of_history_reached",
}, "Reached start of chat history (stopping).\n")
return nil
}
}

38
internal/app/events.go Normal file
View File

@ -0,0 +1,38 @@
package app
import (
"fmt"
"os"
)
func (a *App) eventsEnabled() bool {
return a != nil && a.Events().Enabled()
}
func (a *App) emitEvent(event string, data map[string]any) {
if a == nil {
return
}
_ = a.Events().Emit(event, data)
}
func (a *App) emitOrPrint(event string, data map[string]any, format string, args ...any) {
if a.eventsEnabled() {
a.emitEvent(event, data)
return
}
fmt.Fprintf(os.Stderr, format, args...)
}
func (a *App) emitWarning(code, message string, data map[string]any) {
if a.eventsEnabled() {
if data == nil {
data = map[string]any{}
}
data["code"] = code
data["message"] = message
a.emitEvent("warning", data)
return
}
fmt.Fprintln(os.Stderr, message)
}

View File

@ -104,12 +104,25 @@ func (a *App) runMediaWorkers(ctx context.Context, jobs <-chan mediaJob, workers
func() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "media worker panic (recovered) for %s/%s: %v\n%s\n",
job.chatJID, job.msgID, r, debug.Stack())
if a.eventsEnabled() {
a.emitEvent("media_worker_panic", map[string]any{
"chat_jid": job.chatJID,
"msg_id": job.msgID,
"panic": fmt.Sprint(r),
"stack": string(debug.Stack()),
})
} else {
fmt.Fprintf(os.Stderr, "media worker panic (recovered) for %s/%s: %v\n%s\n",
job.chatJID, job.msgID, r, debug.Stack())
}
}
}()
if err := a.downloadMediaJob(ctx, job); err != nil {
fmt.Fprintf(os.Stderr, "media download failed for %s/%s: %v\n", job.chatJID, job.msgID, err)
a.emitWarning(
"media_download_failed",
fmt.Sprintf("media download failed for %s/%s: %v", job.chatJID, job.msgID, err),
map[string]any{"chat_jid": job.chatJID, "msg_id": job.msgID, "error": err.Error()},
)
}
}()
}

View File

@ -55,7 +55,11 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
opts.IdleExit = 30 * time.Second
}
if opts.WarnNoLimits && opts.MaxMessages <= 0 && opts.MaxDBSizeBytes <= 0 {
fmt.Fprintln(os.Stderr, "warning: sync storage is uncapped; use --max-messages or --max-db-size to bound local history growth")
a.emitWarning(
"sync_storage_uncapped",
"warning: sync storage is uncapped; use --max-messages or --max-db-size to bound local history growth",
nil,
)
}
if err := a.checkSyncStorageLimits(opts); err != nil {
return SyncResult{}, err
@ -151,7 +155,11 @@ func (a *App) connectForSync(ctx context.Context, opts SyncOptions) error {
if attempt == attempts || ctx.Err() != nil || !isRetryableAuthConnectError(err) {
return err
}
fmt.Fprintf(os.Stderr, "warning: auth connection dropped before pairing completed; retrying (%d/%d)\n", attempt+1, attempts)
a.emitWarning(
"auth_connect_retry",
fmt.Sprintf("warning: auth connection dropped before pairing completed; retrying (%d/%d)", attempt+1, attempts),
map[string]any{"attempt": attempt + 1, "attempts": attempts},
)
select {
case <-time.After(authConnectRetryDelay(attempt)):
case <-ctx.Done():

View File

@ -38,8 +38,17 @@ func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, message
defer func() {
if r := recover(); r != nil {
n := panicCount.Add(1)
fmt.Fprintf(os.Stderr, "\nevent handler panic (recovered, total=%d) event=%T: %v\n%s\n",
n, evt, r, debug.Stack())
if a.eventsEnabled() {
a.emitEvent("event_handler_panic", map[string]any{
"total": n,
"event": fmt.Sprintf("%T", evt),
"panic": fmt.Sprint(r),
"stack": string(debug.Stack()),
})
} else {
fmt.Fprintf(os.Stderr, "\nevent handler panic (recovered, total=%d) event=%T: %v\n%s\n",
n, evt, r, debug.Stack())
}
}
}()
switch v := evt.(type) {
@ -50,9 +59,9 @@ func (a *App) addSyncEventHandler(ctx context.Context, opts SyncOptions, message
lastEvent.Store(nowUTC().UnixNano())
a.handleHistorySync(ctx, opts, v, messagesStored, lastEvent, enqueueMedia, limits)
case *events.Connected:
fmt.Fprintln(os.Stderr, "\nConnected.")
a.emitOrPrint("connected", nil, "\nConnected.\n")
case *events.Disconnected:
fmt.Fprintln(os.Stderr, "\nDisconnected.")
a.emitOrPrint("disconnected", nil, "\nDisconnected.\n")
select {
case disconnected <- struct{}{}:
default:
@ -78,16 +87,28 @@ func (a *App) handleAppStateSyncError(ctx context.Context, evt *events.AppStateS
return
}
fmt.Fprintf(os.Stderr, "\rwarning: app state %s hit an LTHash mismatch; requesting recovery snapshot\n", name)
a.emitWarning(
"app_state_lthash_mismatch",
fmt.Sprintf("warning: app state %s hit an LTHash mismatch; requesting recovery snapshot", name),
map[string]any{"name": name},
)
go func() {
reqCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
reqID, err := a.wa.RequestAppStateRecovery(reqCtx, name)
if err != nil {
fmt.Fprintf(os.Stderr, "\rwarning: app state %s recovery request failed: %v\n", name, err)
a.emitWarning(
"app_state_recovery_failed",
fmt.Sprintf("warning: app state %s recovery request failed: %v", name, err),
map[string]any{"name": name, "error": err.Error()},
)
return
}
fmt.Fprintf(os.Stderr, "\rRequested app state %s recovery (id %s)\n", name, reqID)
if a.eventsEnabled() {
a.emitEvent("app_state_recovery_requested", map[string]any{"name": name, "id": string(reqID)})
} else {
fmt.Fprintf(os.Stderr, "\rRequested app state %s recovery (id %s)\n", name, reqID)
}
}()
}
@ -100,14 +121,11 @@ func (a *App) handleLiveSyncMessage(ctx context.Context, opts SyncOptions, v *ev
a.decryptEncryptedReaction(ctx, &pm, v)
}
if err := a.storeParsedMessageForSync(ctx, pm, limits...); err == nil {
messagesStored.Add(1)
a.emitSyncProgress(messagesStored.Add(1))
}
if opts.DownloadMedia && pm.Media != nil && pm.ID != "" {
enqueueMedia(pm.Chat.String(), pm.ID)
}
if messagesStored.Load()%25 == 0 {
fmt.Fprintf(os.Stderr, "\rSynced %d messages...", messagesStored.Load())
}
}
func historySyncNotificationFromMessage(v *events.Message) *waE2E.HistorySyncNotification {
@ -118,7 +136,7 @@ func historySyncNotificationFromMessage(v *events.Message) *waE2E.HistorySyncNot
}
func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events.HistorySync, messagesStored, lastEvent *atomic.Int64, enqueueMedia func(string, string), limits ...*syncStorageLimits) {
fmt.Fprintf(os.Stderr, "\nProcessing history sync (%d conversations)...\n", len(v.Data.Conversations))
a.emitOrPrint("history_sync", map[string]any{"conversations": len(v.Data.Conversations)}, "\nProcessing history sync (%d conversations)...\n", len(v.Data.Conversations))
for _, conv := range v.Data.Conversations {
lastEvent.Store(nowUTC().UnixNano())
chatID := strings.TrimSpace(conv.GetID())
@ -137,13 +155,17 @@ func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events
if pm.ReactionToID != "" && pm.ReactionEmoji == "" && m.Message.GetMessage().GetEncReactionMessage() != nil {
evt, err := a.wa.ParseWebMessage(pm.Chat, m.Message)
if err != nil {
fmt.Fprintf(os.Stderr, "\rwarning: failed to parse encrypted reaction message %s: %v\n", pm.ID, err)
a.emitWarning(
"encrypted_reaction_parse_failed",
fmt.Sprintf("warning: failed to parse encrypted reaction message %s: %v", pm.ID, err),
map[string]any{"message_id": pm.ID, "error": err.Error()},
)
} else {
a.decryptEncryptedReaction(ctx, &pm, evt)
}
}
if err := a.storeParsedMessageForSync(ctx, pm, limits...); err == nil {
messagesStored.Add(1)
a.emitSyncProgress(messagesStored.Add(1))
} else if ctx.Err() != nil {
return
}
@ -152,7 +174,16 @@ func (a *App) handleHistorySync(ctx context.Context, opts SyncOptions, v *events
}
}
}
fmt.Fprintf(os.Stderr, "\rSynced %d messages...", messagesStored.Load())
if !a.eventsEnabled() {
fmt.Fprintf(os.Stderr, "\rSynced %d messages...", messagesStored.Load())
}
}
func (a *App) emitSyncProgress(total int64) {
if total <= 0 || total%25 != 0 {
return
}
a.emitOrPrint("progress", map[string]any{"messages_synced": total}, "\rSynced %d messages...", total)
}
func (a *App) storeParsedMessageForSync(ctx context.Context, pm wa.ParsedMessage, limits ...*syncStorageLimits) error {
@ -165,7 +196,11 @@ func (a *App) storeParsedMessageForSync(ctx context.Context, pm wa.ParsedMessage
func (a *App) decryptEncryptedReaction(ctx context.Context, pm *wa.ParsedMessage, msg *events.Message) {
reaction, err := a.wa.DecryptReaction(ctx, msg)
if err != nil {
fmt.Fprintf(os.Stderr, "\rwarning: failed to decrypt reaction message %s: %v\n", pm.ID, err)
a.emitWarning(
"encrypted_reaction_decrypt_failed",
fmt.Sprintf("warning: failed to decrypt reaction message %s: %v", pm.ID, err),
map[string]any{"message_id": pm.ID, "error": err.Error()},
)
return
}
if reaction == nil {

View File

@ -0,0 +1,67 @@
package app
import (
"context"
"encoding/json"
"os"
"strings"
"testing"
"time"
"github.com/steipete/wacli/internal/out"
"go.mau.fi/whatsmeow/types"
)
func TestSyncEventsOutputStaysNDJSONDuringProgress(t *testing.T) {
a := newTestApp(t)
f := newFakeWA()
a.wa = f
chat := types.JID{User: "123", Server: types.DefaultUserServer}
base := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
ids := make([]string, 25)
for i := range ids {
ids[i] = "m" + string(rune('a'+i))
}
f.connectEvents = []interface{}{historySyncWithTextMessages(chat, base, ids...)}
raw := captureStderr(t, func() {
a.opts.Events = out.NewEventWriter(os.Stderr, true)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := a.Sync(ctx, SyncOptions{
Mode: SyncModeOnce,
AllowQR: false,
IdleExit: 50 * time.Millisecond,
WarnNoLimits: false,
})
if err != nil {
t.Fatalf("Sync: %v", err)
}
})
if strings.Contains(raw, "Synced ") || strings.Contains(raw, "Processing history sync") {
t.Fatalf("human progress leaked into --events stderr:\n%s", raw)
}
var sawProgress bool
for _, line := range strings.Split(strings.TrimSpace(raw), "\n") {
if strings.TrimSpace(line) == "" {
continue
}
var evt map[string]any
if err := json.Unmarshal([]byte(line), &evt); err != nil {
t.Fatalf("stderr line is not valid JSON %q: %v\nfull stderr:\n%s", line, err, raw)
}
if evt["event"] == "progress" {
data, ok := evt["data"].(map[string]any)
if !ok || data["messages_synced"] != float64(25) {
t.Fatalf("unexpected progress event: %#v", evt)
}
sawProgress = true
}
}
if !sawProgress {
t.Fatalf("expected progress event in:\n%s", raw)
}
}

View File

@ -3,7 +3,6 @@ package app
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
)
@ -12,10 +11,10 @@ func (a *App) runSyncFollow(ctx context.Context, maxReconnect time.Duration, mes
for {
select {
case <-ctx.Done():
fmt.Fprintln(os.Stderr, "\nStopping sync.")
a.emitOrPrint("stopping", map[string]any{"messages_synced": messagesStored.Load()}, "\nStopping sync.\n")
return SyncResult{MessagesStored: messagesStored.Load()}, nil
case <-disconnected:
fmt.Fprintln(os.Stderr, "Reconnecting...")
a.emitOrPrint("reconnecting", nil, "Reconnecting...\n")
if err := a.reconnect(ctx, maxReconnect); err != nil {
return SyncResult{MessagesStored: messagesStored.Load()}, err
}
@ -33,17 +32,20 @@ func (a *App) runSyncUntilIdle(ctx context.Context, idleExit, maxReconnect time.
for {
select {
case <-ctx.Done():
fmt.Fprintln(os.Stderr, "\nStopping sync.")
a.emitOrPrint("stopping", map[string]any{"messages_synced": messagesStored.Load()}, "\nStopping sync.\n")
return SyncResult{MessagesStored: messagesStored.Load()}, nil
case <-disconnected:
fmt.Fprintln(os.Stderr, "Reconnecting...")
a.emitOrPrint("reconnecting", nil, "Reconnecting...\n")
if err := a.reconnect(ctx, maxReconnect); err != nil {
return SyncResult{MessagesStored: messagesStored.Load()}, err
}
case <-ticker.C:
last := time.Unix(0, lastEvent.Load())
if time.Since(last) >= idleExit {
fmt.Fprintf(os.Stderr, "\nIdle for %s, exiting.\n", idleExit)
a.emitOrPrint("idle_exit", map[string]any{
"idle_duration": idleExit.String(),
"messages_synced": messagesStored.Load(),
}, "\nIdle for %s, exiting.\n", idleExit)
return SyncResult{MessagesStored: messagesStored.Load()}, nil
}
}

54
internal/out/events.go Normal file
View File

@ -0,0 +1,54 @@
package out
import (
"encoding/json"
"io"
"sync"
"time"
)
type EventWriter struct {
mu sync.Mutex
w io.Writer
enabled bool
clockNow func() time.Time
}
type eventEnvelope struct {
Event string `json:"event"`
Data map[string]any `json:"data,omitempty"`
TS int64 `json:"ts"`
}
func NewEventWriter(w io.Writer, enabled bool) *EventWriter {
return &EventWriter{
w: w,
enabled: enabled,
clockNow: time.Now,
}
}
func (e *EventWriter) Enabled() bool {
return e != nil && e.enabled && e.w != nil
}
func (e *EventWriter) Emit(event string, data map[string]any) error {
if !e.Enabled() {
return nil
}
payload := eventEnvelope{
Event: event,
Data: data,
TS: e.clockNow().UTC().UnixMilli(),
}
b, err := json.Marshal(payload)
if err != nil {
return err
}
e.mu.Lock()
defer e.mu.Unlock()
_, err = e.w.Write(append(b, '\n'))
return err
}

View File

@ -0,0 +1,49 @@
package out
import (
"bytes"
"encoding/json"
"strings"
"testing"
"time"
)
func TestEventWriterDisabledIsNoOp(t *testing.T) {
var b bytes.Buffer
w := NewEventWriter(&b, false)
if err := w.Emit("connected", nil); err != nil {
t.Fatalf("Emit: %v", err)
}
if b.Len() != 0 {
t.Fatalf("expected no output when disabled, got %q", b.String())
}
}
func TestEventWriterEmitsNDJSON(t *testing.T) {
var b bytes.Buffer
w := NewEventWriter(&b, true)
w.clockNow = func() time.Time { return time.UnixMilli(1234).UTC() }
if err := w.Emit("progress", map[string]any{"messages_synced": 25}); err != nil {
t.Fatalf("Emit: %v", err)
}
line := strings.TrimSpace(b.String())
var got map[string]any
if err := json.Unmarshal([]byte(line), &got); err != nil {
t.Fatalf("unmarshal: %v", err)
}
if got["event"] != "progress" {
t.Fatalf("unexpected event: %v", got["event"])
}
if got["ts"] != float64(1234) {
t.Fatalf("unexpected ts: %v", got["ts"])
}
data, ok := got["data"].(map[string]any)
if !ok {
t.Fatalf("expected data object, got %T", got["data"])
}
if data["messages_synced"] != float64(25) {
t.Fatalf("unexpected data payload: %#v", data)
}
}