feat: add resilient message reactions
This commit is contained in:
parent
95f2136887
commit
91d240a658
@ -26,6 +26,7 @@ func newSendCmd(flags *rootFlags) *cobra.Command {
|
||||
}
|
||||
cmd.AddCommand(newSendTextCmd(flags))
|
||||
cmd.AddCommand(newSendFileCmd(flags))
|
||||
cmd.AddCommand(newSendReactCmd(flags))
|
||||
return cmd
|
||||
}
|
||||
|
||||
@ -67,7 +68,9 @@ func newSendTextCmd(flags *rootFlags) *cobra.Command {
|
||||
return err
|
||||
}
|
||||
|
||||
msgID, err := sendTextMessage(ctx, a, toJID, message, replyTo, replyToSender)
|
||||
msgID, err := runSendOperation(ctx, reconnectForSend(a), func(ctx context.Context) (types.MessageID, error) {
|
||||
return sendTextMessage(ctx, a, toJID, message, replyTo, replyToSender)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -49,10 +49,21 @@ func newSendFileCmd(flags *rootFlags) *cobra.Command {
|
||||
return err
|
||||
}
|
||||
|
||||
msgID, meta, err := sendFile(ctx, a, toJID, filePath, filename, caption, mimeOverride)
|
||||
type sendFileResult struct {
|
||||
id string
|
||||
meta map[string]string
|
||||
}
|
||||
res, err := runSendOperation(ctx, reconnectForSend(a), func(ctx context.Context) (sendFileResult, error) {
|
||||
msgID, meta, err := sendFile(ctx, a, toJID, filePath, filename, caption, mimeOverride)
|
||||
if err != nil {
|
||||
return sendFileResult{}, err
|
||||
}
|
||||
return sendFileResult{id: msgID, meta: meta}, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msgID, meta := res.id, res.meta
|
||||
|
||||
if flags.asJSON {
|
||||
return out.WriteJSON(os.Stdout, map[string]any{
|
||||
|
||||
90
cmd/wacli/send_helpers.go
Normal file
90
cmd/wacli/send_helpers.go
Normal file
@ -0,0 +1,90 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/steipete/wacli/internal/app"
|
||||
"go.mau.fi/whatsmeow"
|
||||
)
|
||||
|
||||
const sendAttemptTimeout = 45 * time.Second
|
||||
|
||||
func runSendOperation[T any](
|
||||
ctx context.Context,
|
||||
reconnect func(context.Context) error,
|
||||
op func(context.Context) (T, error),
|
||||
) (T, error) {
|
||||
result, err := runSendAttempt(ctx, sendAttemptTimeout, op)
|
||||
if err == nil {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
var zero T
|
||||
if !isRetryableSendError(err) || ctx.Err() != nil {
|
||||
return zero, err
|
||||
}
|
||||
if reconnectErr := reconnect(ctx); reconnectErr != nil {
|
||||
return zero, fmt.Errorf("%w; reconnect failed: %v", err, reconnectErr)
|
||||
}
|
||||
return runSendAttempt(ctx, sendAttemptTimeout, op)
|
||||
}
|
||||
|
||||
func runSendAttempt[T any](ctx context.Context, timeout time.Duration, op func(context.Context) (T, error)) (T, error) {
|
||||
attemptCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
type result struct {
|
||||
value T
|
||||
err error
|
||||
}
|
||||
ch := make(chan result, 1)
|
||||
go func() {
|
||||
value, err := op(attemptCtx)
|
||||
ch <- result{value: value, err: err}
|
||||
}()
|
||||
|
||||
select {
|
||||
case res := <-ch:
|
||||
if errors.Is(res.err, context.DeadlineExceeded) && errors.Is(attemptCtx.Err(), context.DeadlineExceeded) {
|
||||
var zero T
|
||||
return zero, fmt.Errorf("send timed out after %s", timeout)
|
||||
}
|
||||
return res.value, res.err
|
||||
case <-attemptCtx.Done():
|
||||
var zero T
|
||||
if errors.Is(attemptCtx.Err(), context.DeadlineExceeded) {
|
||||
return zero, fmt.Errorf("send timed out after %s", timeout)
|
||||
}
|
||||
return zero, attemptCtx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func isRetryableSendError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, whatsmeow.ErrIQTimedOut) {
|
||||
return true
|
||||
}
|
||||
|
||||
msg := strings.ToLower(err.Error())
|
||||
return strings.Contains(msg, "failed to send usync query") ||
|
||||
strings.Contains(msg, "failed to get user info") ||
|
||||
strings.Contains(msg, "failed to get device list") ||
|
||||
strings.Contains(msg, "info query timed out") ||
|
||||
strings.Contains(msg, "not connected")
|
||||
}
|
||||
|
||||
func reconnectForSend(a interface {
|
||||
WA() app.WAClient
|
||||
Connect(context.Context, bool, func(string)) error
|
||||
}) func(context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
a.WA().Close()
|
||||
return a.Connect(ctx, false, nil)
|
||||
}
|
||||
}
|
||||
78
cmd/wacli/send_helpers_test.go
Normal file
78
cmd/wacli/send_helpers_test.go
Normal file
@ -0,0 +1,78 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.mau.fi/whatsmeow"
|
||||
)
|
||||
|
||||
func TestRunSendOperationRetriesRetryableError(t *testing.T) {
|
||||
var reconnects int
|
||||
attempts := 0
|
||||
|
||||
got, err := runSendOperation(context.Background(), func(ctx context.Context) error {
|
||||
reconnects++
|
||||
return nil
|
||||
}, func(ctx context.Context) (string, error) {
|
||||
attempts++
|
||||
if attempts == 1 {
|
||||
return "", fmt.Errorf("failed to get device list: failed to send usync query: %w", whatsmeow.ErrIQTimedOut)
|
||||
}
|
||||
return "ok", nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runSendOperation: %v", err)
|
||||
}
|
||||
if got != "ok" {
|
||||
t.Fatalf("expected ok, got %q", got)
|
||||
}
|
||||
if reconnects != 1 {
|
||||
t.Fatalf("expected 1 reconnect, got %d", reconnects)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunSendOperationDoesNotRetryValidationError(t *testing.T) {
|
||||
var reconnects int
|
||||
|
||||
_, err := runSendOperation(context.Background(), func(ctx context.Context) error {
|
||||
reconnects++
|
||||
return nil
|
||||
}, func(ctx context.Context) (string, error) {
|
||||
return "", errors.New("permission denied")
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
if reconnects != 0 {
|
||||
t.Fatalf("expected no reconnect, got %d", reconnects)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunSendAttemptTimesOut(t *testing.T) {
|
||||
_, err := runSendAttempt(context.Background(), 20*time.Millisecond, func(ctx context.Context) (string, error) {
|
||||
<-ctx.Done()
|
||||
return "", ctx.Err()
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("expected timeout error")
|
||||
}
|
||||
if err.Error() != "send timed out after 20ms" {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsRetryableSendError(t *testing.T) {
|
||||
if !isRetryableSendError(fmt.Errorf("wrapped: %w", whatsmeow.ErrIQTimedOut)) {
|
||||
t.Fatalf("expected ErrIQTimedOut to be retryable")
|
||||
}
|
||||
if !isRetryableSendError(errors.New("failed to get user info for 123@s.whatsapp.net to fill LID cache: failed to send usync query: info query timed out")) {
|
||||
t.Fatalf("expected wrapped usync timeout to be retryable")
|
||||
}
|
||||
if isRetryableSendError(errors.New("permission denied")) {
|
||||
t.Fatalf("did not expect arbitrary error to be retryable")
|
||||
}
|
||||
}
|
||||
100
cmd/wacli/send_react_cmd.go
Normal file
100
cmd/wacli/send_react_cmd.go
Normal file
@ -0,0 +1,100 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/steipete/wacli/internal/out"
|
||||
"github.com/steipete/wacli/internal/wa"
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
)
|
||||
|
||||
func newSendReactCmd(flags *rootFlags) *cobra.Command {
|
||||
var to string
|
||||
var msgID string
|
||||
var emoji string
|
||||
var sender string
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "react",
|
||||
Short: "React to a message",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if strings.TrimSpace(to) == "" || strings.TrimSpace(msgID) == "" {
|
||||
return fmt.Errorf("--to and --id are required")
|
||||
}
|
||||
if err := flags.requireWritable(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := withTimeout(context.Background(), flags)
|
||||
defer cancel()
|
||||
|
||||
a, lk, err := newApp(ctx, flags, true, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closeApp(a, lk)
|
||||
|
||||
if err := a.EnsureAuthed(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.Connect(ctx, false, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
chat, senderJID, err := reactionTarget(to, sender)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sentID, err := runSendOperation(ctx, reconnectForSend(a), func(ctx context.Context) (types.MessageID, error) {
|
||||
return a.WA().SendReaction(ctx, chat, senderJID, types.MessageID(msgID), emoji)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if flags.asJSON {
|
||||
return out.WriteJSON(os.Stdout, map[string]any{
|
||||
"sent": true,
|
||||
"to": chat.String(),
|
||||
"id": sentID,
|
||||
"target": msgID,
|
||||
"reaction": emoji,
|
||||
})
|
||||
}
|
||||
if emoji == "" {
|
||||
fmt.Fprintf(os.Stdout, "Removed reaction from %s (id %s)\n", msgID, sentID)
|
||||
return nil
|
||||
}
|
||||
fmt.Fprintf(os.Stdout, "Reacted %s to %s (id %s)\n", emoji, msgID, sentID)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().StringVar(&to, "to", "", "recipient phone number or JID")
|
||||
cmd.Flags().StringVar(&msgID, "id", "", "target message ID")
|
||||
cmd.Flags().StringVar(&emoji, "reaction", "\U0001f44d", "reaction emoji (pass an empty string to remove)")
|
||||
cmd.Flags().StringVar(&sender, "sender", "", "message sender JID (required for group messages)")
|
||||
return cmd
|
||||
}
|
||||
|
||||
func reactionTarget(to, sender string) (types.JID, types.JID, error) {
|
||||
chat, err := wa.ParseUserOrJID(to)
|
||||
if err != nil {
|
||||
return types.JID{}, types.JID{}, fmt.Errorf("invalid --to: %w", err)
|
||||
}
|
||||
var senderJID types.JID
|
||||
if strings.TrimSpace(sender) != "" {
|
||||
senderJID, err = wa.ParseUserOrJID(sender)
|
||||
if err != nil {
|
||||
return types.JID{}, types.JID{}, fmt.Errorf("invalid --sender: %w", err)
|
||||
}
|
||||
}
|
||||
if chat.Server == types.GroupServer && senderJID.IsEmpty() {
|
||||
return types.JID{}, types.JID{}, fmt.Errorf("--sender is required for group reactions")
|
||||
}
|
||||
return chat, senderJID, nil
|
||||
}
|
||||
41
cmd/wacli/send_react_cmd_test.go
Normal file
41
cmd/wacli/send_react_cmd_test.go
Normal file
@ -0,0 +1,41 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"go.mau.fi/whatsmeow/types"
|
||||
)
|
||||
|
||||
func TestReactionTargetDirect(t *testing.T) {
|
||||
chat, sender, err := reactionTarget("+15551234567", "")
|
||||
if err != nil {
|
||||
t.Fatalf("reactionTarget: %v", err)
|
||||
}
|
||||
if chat.String() != "15551234567@s.whatsapp.net" {
|
||||
t.Fatalf("chat = %q", chat.String())
|
||||
}
|
||||
if !sender.IsEmpty() {
|
||||
t.Fatalf("sender = %q, want empty", sender.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactionTargetGroupRequiresSender(t *testing.T) {
|
||||
_, _, err := reactionTarget("12345@g.us", "")
|
||||
if err == nil || !strings.Contains(err.Error(), "--sender is required") {
|
||||
t.Fatalf("expected sender error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactionTargetGroupSender(t *testing.T) {
|
||||
chat, sender, err := reactionTarget("12345@g.us", "+15551234567")
|
||||
if err != nil {
|
||||
t.Fatalf("reactionTarget: %v", err)
|
||||
}
|
||||
if chat.Server != types.GroupServer {
|
||||
t.Fatalf("chat = %q, want group", chat.String())
|
||||
}
|
||||
if sender.String() != "15551234567@s.whatsapp.net" {
|
||||
t.Fatalf("sender = %q", sender.String())
|
||||
}
|
||||
}
|
||||
@ -39,6 +39,7 @@ type WAClient interface {
|
||||
|
||||
SendText(ctx context.Context, to types.JID, text string) (types.MessageID, error)
|
||||
SendProtoMessage(ctx context.Context, to types.JID, msg *waProto.Message) (types.MessageID, error)
|
||||
SendReaction(ctx context.Context, chat, sender types.JID, targetID types.MessageID, reaction string) (types.MessageID, error)
|
||||
Upload(ctx context.Context, data []byte, mediaType whatsmeow.MediaType) (whatsmeow.UploadResponse, error)
|
||||
DownloadMediaToFile(ctx context.Context, directPath string, encFileHash, fileHash, mediaKey []byte, fileLength uint64, mediaType, mmsType string, targetPath string) (int64, error)
|
||||
|
||||
|
||||
@ -221,6 +221,10 @@ func (f *fakeWA) SendProtoMessage(ctx context.Context, to types.JID, msg *waProt
|
||||
return types.MessageID("msgid"), nil
|
||||
}
|
||||
|
||||
func (f *fakeWA) SendReaction(ctx context.Context, chat, sender types.JID, targetID types.MessageID, reaction string) (types.MessageID, error) {
|
||||
return types.MessageID("reactionid"), nil
|
||||
}
|
||||
|
||||
func (f *fakeWA) Upload(ctx context.Context, data []byte, mediaType whatsmeow.MediaType) (whatsmeow.UploadResponse, error) {
|
||||
return whatsmeow.UploadResponse{}, nil
|
||||
}
|
||||
|
||||
@ -182,6 +182,20 @@ func (c *Client) SendProtoMessage(ctx context.Context, to types.JID, msg *waProt
|
||||
return resp.ID, nil
|
||||
}
|
||||
|
||||
func (c *Client) SendReaction(ctx context.Context, chat, sender types.JID, targetID types.MessageID, reaction string) (types.MessageID, error) {
|
||||
c.mu.Lock()
|
||||
cli := c.client
|
||||
c.mu.Unlock()
|
||||
if cli == nil || !cli.IsConnected() {
|
||||
return "", fmt.Errorf("not connected")
|
||||
}
|
||||
resp, err := cli.SendMessage(ctx, chat, cli.BuildReaction(chat, sender, targetID, reaction))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return resp.ID, nil
|
||||
}
|
||||
|
||||
func (c *Client) Upload(ctx context.Context, data []byte, mediaType whatsmeow.MediaType) (whatsmeow.UploadResponse, error) {
|
||||
c.mu.Lock()
|
||||
cli := c.client
|
||||
|
||||
Loading…
Reference in New Issue
Block a user