fix: bound media enqueue backpressure (#121) (thanks @jyothepro)
This commit is contained in:
parent
894bc5d1ac
commit
12e8e1a934
@ -24,6 +24,7 @@
|
||||
- Sync: keep `sync --once` idle timing focused on message/history events so connection chatter cannot hang exit. (#119 — thanks @jyothepro)
|
||||
- Sync: start `sync --once` idle timing after the `Connected` event. (#171 — thanks @fuleinist)
|
||||
- Sync: include event type, stack trace, and recovery count when logging recovered event-handler panics. (#181 — thanks @shaun0927)
|
||||
- Sync: apply bounded backpressure to media download enqueueing instead of spawning unbounded overflow goroutines. (#121 — thanks @jyothepro)
|
||||
- Windows: split store locking by platform so the lock package compiles on Windows. (#188 — thanks @dinakars777)
|
||||
|
||||
### Docs
|
||||
|
||||
@ -26,6 +26,7 @@ type fakeWA struct {
|
||||
|
||||
connectEvents []interface{}
|
||||
connectDelay time.Duration
|
||||
downloadDelay time.Duration
|
||||
|
||||
contacts map[types.JID]types.ContactInfo
|
||||
groups map[types.JID]*types.GroupInfo
|
||||
@ -229,6 +230,13 @@ func (f *fakeWA) DecryptReaction(ctx context.Context, reaction *events.Message)
|
||||
}
|
||||
|
||||
func (f *fakeWA) DownloadMediaToFile(ctx context.Context, directPath string, encFileHash, fileHash, mediaKey []byte, fileLength uint64, mediaType, mmsType string, targetPath string) (int64, error) {
|
||||
if f.downloadDelay > 0 {
|
||||
select {
|
||||
case <-time.After(f.downloadDelay):
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
}
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Dir(targetPath), 0o700); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
@ -66,11 +66,6 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
|
||||
handlerID := a.addSyncEventHandler(ctx, opts, &messagesStored, &lastEvent, disconnected, enqueueMedia)
|
||||
defer a.wa.RemoveEventHandler(handlerID)
|
||||
|
||||
if err := a.Connect(ctx, opts.AllowQR, opts.OnQRCode); err != nil {
|
||||
return SyncResult{}, err
|
||||
}
|
||||
lastEvent.Store(time.Now().UTC().UnixNano())
|
||||
|
||||
if opts.DownloadMedia {
|
||||
var err error
|
||||
stopMedia, err = a.runMediaWorkers(ctx, mediaJobs, 4)
|
||||
@ -80,6 +75,11 @@ func (a *App) Sync(ctx context.Context, opts SyncOptions) (SyncResult, error) {
|
||||
defer stopMedia()
|
||||
}
|
||||
|
||||
if err := a.Connect(ctx, opts.AllowQR, opts.OnQRCode); err != nil {
|
||||
return SyncResult{}, err
|
||||
}
|
||||
lastEvent.Store(time.Now().UTC().UnixNano())
|
||||
|
||||
// Optional: bootstrap imports (helps contacts/groups management without waiting for events).
|
||||
if opts.RefreshContacts {
|
||||
_ = a.refreshContacts(ctx)
|
||||
|
||||
@ -20,14 +20,7 @@ func newMediaEnqueuer(ctx context.Context, jobs chan<- mediaJob) func(chatJID, m
|
||||
}
|
||||
select {
|
||||
case jobs <- mediaJob{chatJID: chatJID, msgID: msgID}:
|
||||
default:
|
||||
// Avoid blocking the event handler.
|
||||
go func() {
|
||||
select {
|
||||
case jobs <- mediaJob{chatJID: chatJID, msgID: msgID}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,8 @@ package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -235,6 +237,71 @@ func TestSyncStoresDisplayText(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncMediaEnqueueUsesBoundedBackpressure(t *testing.T) {
|
||||
a := newTestApp(t)
|
||||
f := newFakeWA()
|
||||
a.wa = f
|
||||
f.downloadDelay = 5 * time.Millisecond
|
||||
|
||||
chat := types.JID{User: "123", Server: types.DefaultUserServer}
|
||||
f.contacts[chat.ToNonAD()] = types.ContactInfo{
|
||||
Found: true,
|
||||
FullName: "Alice",
|
||||
PushName: "Alice",
|
||||
}
|
||||
|
||||
base := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
for i := 0; i < 600; i++ {
|
||||
f.connectEvents = append(f.connectEvents, &events.Message{
|
||||
Info: types.MessageInfo{
|
||||
MessageSource: types.MessageSource{
|
||||
Chat: chat,
|
||||
Sender: chat,
|
||||
IsFromMe: false,
|
||||
},
|
||||
ID: fmt.Sprintf("media-%03d", i),
|
||||
Timestamp: base.Add(time.Duration(i) * time.Second),
|
||||
PushName: "Alice",
|
||||
},
|
||||
Message: &waProto.Message{
|
||||
ImageMessage: &waProto.ImageMessage{
|
||||
Mimetype: proto.String("image/jpeg"),
|
||||
DirectPath: proto.String("/direct"),
|
||||
MediaKey: []byte{1},
|
||||
FileSHA256: []byte{2},
|
||||
FileEncSHA256: []byte{3},
|
||||
FileLength: proto.Uint64(10),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
before := runtime.NumGoroutine()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
var during int
|
||||
res, err := a.Sync(ctx, SyncOptions{
|
||||
Mode: SyncModeFollow,
|
||||
AllowQR: false,
|
||||
DownloadMedia: true,
|
||||
AfterConnect: func(context.Context) error {
|
||||
during = runtime.NumGoroutine()
|
||||
cancel()
|
||||
return nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Sync: %v", err)
|
||||
}
|
||||
if res.MessagesStored != 600 {
|
||||
t.Fatalf("expected 600 messages stored, got %d", res.MessagesStored)
|
||||
}
|
||||
if leaked := during - before; leaked > 20 {
|
||||
t.Fatalf("expected bounded media enqueue goroutines, saw +%d (before=%d during=%d)", leaked, before, during)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncOnceIdleExit(t *testing.T) {
|
||||
a := newTestApp(t)
|
||||
f := newFakeWA()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user