fix(gmail-watch): delay history fetch in watch serve (#397)
* fix(gmail-watch): delay history fetch in watch serve * fix: land gmail watch fetch delay and changelog (#397) (thanks @salmonumbrella) --------- Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
parent
4b1d1d429d
commit
caf38a3d33
@ -11,6 +11,9 @@
|
||||
|
||||
### Fixed
|
||||
- Calendar: respond patches only attendees to avoid custom reminders validation errors. (#265) — thanks @sebasrodriguez.
|
||||
- Contacts: fix grouped parameter types in CRUD helpers to restore builds on newer Go toolchains. (#355) — thanks @laihenyi.
|
||||
- Timezone: embed the IANA timezone database so Windows builds can resolve calendar timezones correctly. (#388) — thanks @visionik.
|
||||
- Gmail: add a fetch delay in `watch serve` so History API reads don't race message indexing. (#397) — thanks @salmonumbrella.
|
||||
- Secrets: respect empty `GOG_KEYRING_PASSWORD` (treat set-to-empty as intentional; avoids headless prompts). (#269) — thanks @zerone0x.
|
||||
- Calendar: reject ambiguous calendar-name selectors for `calendar events` instead of guessing. (#131) — thanks @salmonumbrella.
|
||||
- Gmail: `drafts update --quote` now picks a non-draft, non-self message from thread fallback (or errors clearly), avoiding self-quote loops and wrong reply headers. (#394) — thanks @salmonumbrella.
|
||||
|
||||
@ -644,6 +644,7 @@ gog gmail delegates remove --email delegate@example.com
|
||||
gog gmail watch start --topic projects/<p>/topics/<t> --label INBOX
|
||||
gog gmail watch serve --bind 127.0.0.1 --token <shared> --hook-url http://127.0.0.1:18789/hooks/agent
|
||||
gog gmail watch serve --bind 0.0.0.0 --verify-oidc --oidc-email <svc@...> --hook-url <url>
|
||||
gog gmail watch serve --bind 127.0.0.1 --token <shared> --fetch-delay 5 --hook-url http://127.0.0.1:18789/hooks/agent
|
||||
gog gmail watch serve --bind 127.0.0.1 --token <shared> --exclude-labels SPAM,TRASH --hook-url http://127.0.0.1:18789/hooks/agent
|
||||
gog gmail history --since <historyId>
|
||||
```
|
||||
@ -651,6 +652,7 @@ gog gmail history --since <historyId>
|
||||
Gmail watch (Pub/Sub push):
|
||||
- Create Pub/Sub topic + push subscription (OIDC preferred; shared token ok for dev).
|
||||
- Full flow + payload details: `docs/watch.md`.
|
||||
- `watch serve --fetch-delay` defaults to `3s` and helps avoid Gmail History indexing races after push delivery.
|
||||
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; IDs are case-sensitive.
|
||||
|
||||
### Email Tracking
|
||||
|
||||
@ -48,6 +48,7 @@ gog gmail watch serve \
|
||||
[--verify-oidc] [--oidc-email <svc@...>] [--oidc-audience <aud>] \
|
||||
[--token <shared>] \
|
||||
[--hook-url <url>] [--hook-token <token>] \
|
||||
[--fetch-delay <sec|duration>] \
|
||||
[--include-body] [--max-bytes <n>] [--exclude-labels <id,id,...>] \
|
||||
[--history-types <type>...] [--save-hook]
|
||||
|
||||
@ -61,6 +62,7 @@ Notes:
|
||||
- `watch serve` uses stored hook if `--hook-url` not provided.
|
||||
- `watch serve --exclude-labels` defaults to `SPAM,TRASH`; set to an empty string to disable.
|
||||
- Exclude label IDs are matched exactly (case-sensitive opaque IDs).
|
||||
- `watch serve --fetch-delay` delays Gmail history fetch after each push (default `3s`) to avoid indexing races; accepts seconds (`5`) or Go durations (`5s`).
|
||||
- `watch serve --history-types` accepts `messageAdded`, `messageDeleted`, `labelAdded`, `labelRemoved` (repeatable or comma-separated). Default: `messageAdded` (for backward compatibility).
|
||||
- `watch serve --history-types` must include at least one non-empty type.
|
||||
|
||||
|
||||
@ -216,6 +216,7 @@ type GmailWatchServeCmd struct {
|
||||
Bind string `name:"bind" help:"Bind address" default:"127.0.0.1"`
|
||||
Port int `name:"port" help:"Listen port" default:"8788"`
|
||||
Path string `name:"path" help:"Push handler path" default:"/gmail-pubsub"`
|
||||
FetchDelay string `name:"fetch-delay" help:"Delay before fetching Gmail history (seconds or duration)" default:"3s"`
|
||||
Timezone string `name:"timezone" short:"z" help:"Output timezone (IANA name, e.g. America/New_York, UTC). Default: local"`
|
||||
Local bool `name:"local" help:"Use local timezone (default behavior, useful to override --timezone)"`
|
||||
VerifyOIDC bool `name:"verify-oidc" help:"Verify Pub/Sub OIDC tokens"`
|
||||
@ -262,6 +263,13 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fetchDelay, err := parseDurationSeconds(c.FetchDelay)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if fetchDelay < 0 {
|
||||
return usage("--fetch-delay must be >= 0")
|
||||
}
|
||||
|
||||
store, err := loadGmailWatchStore(account)
|
||||
if err != nil {
|
||||
@ -326,6 +334,7 @@ func (c *GmailWatchServeCmd) Run(ctx context.Context, kctx *kong.Context, flags
|
||||
HookTimeout: defaultHookRequestTimeoutSec * time.Second,
|
||||
HistoryMax: defaultHistoryMaxResults,
|
||||
ResyncMax: defaultHistoryResyncMax,
|
||||
FetchDelay: fetchDelay,
|
||||
HistoryTypes: historyTypes,
|
||||
AllowNoHook: hook == nil,
|
||||
IncludeBody: includeBody,
|
||||
|
||||
@ -112,11 +112,100 @@ func TestGmailWatchServeCmd_DefaultMaxBytes(t *testing.T) {
|
||||
if got.cfg.MaxBodyBytes != defaultHookMaxBytes {
|
||||
t.Fatalf("expected default max bytes, got %d", got.cfg.MaxBodyBytes)
|
||||
}
|
||||
if got.cfg.FetchDelay != defaultHistoryFetchDelay {
|
||||
t.Fatalf("expected default fetch delay %v, got %v", defaultHistoryFetchDelay, got.cfg.FetchDelay)
|
||||
}
|
||||
if len(got.cfg.ExcludeLabels) != 2 || got.cfg.ExcludeLabels[0] != "SPAM" || got.cfg.ExcludeLabels[1] != "TRASH" {
|
||||
t.Fatalf("unexpected exclude labels: %#v", got.cfg.ExcludeLabels)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServeCmd_FetchDelaySeconds(t *testing.T) {
|
||||
origListen := listenAndServe
|
||||
t.Cleanup(func() { listenAndServe = origListen })
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
store, err := newGmailWatchStore("a@b.com")
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
updateErr := store.Update(func(s *gmailWatchState) error {
|
||||
s.Account = "a@b.com"
|
||||
return nil
|
||||
})
|
||||
if updateErr != nil {
|
||||
t.Fatalf("seed: %v", updateErr)
|
||||
}
|
||||
|
||||
flags := &RootFlags{Account: "a@b.com"}
|
||||
var got *gmailWatchServer
|
||||
listenAndServe = func(srv *http.Server) error {
|
||||
if gs, ok := srv.Handler.(*gmailWatchServer); ok {
|
||||
got = gs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
|
||||
if err != nil {
|
||||
t.Fatalf("ui.New: %v", err)
|
||||
}
|
||||
if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "5"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
|
||||
t.Fatalf("execute: %v", execErr)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatalf("expected server")
|
||||
}
|
||||
if got.cfg.FetchDelay != 5*time.Second {
|
||||
t.Fatalf("expected fetch delay 5s, got %v", got.cfg.FetchDelay)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServeCmd_FetchDelayDuration(t *testing.T) {
|
||||
origListen := listenAndServe
|
||||
t.Cleanup(func() { listenAndServe = origListen })
|
||||
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
store, err := newGmailWatchStore("a@b.com")
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
updateErr := store.Update(func(s *gmailWatchState) error {
|
||||
s.Account = "a@b.com"
|
||||
return nil
|
||||
})
|
||||
if updateErr != nil {
|
||||
t.Fatalf("seed: %v", updateErr)
|
||||
}
|
||||
|
||||
flags := &RootFlags{Account: "a@b.com"}
|
||||
var got *gmailWatchServer
|
||||
listenAndServe = func(srv *http.Server) error {
|
||||
if gs, ok := srv.Handler.(*gmailWatchServer); ok {
|
||||
got = gs
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
u, err := ui.New(ui.Options{Stdout: io.Discard, Stderr: io.Discard, Color: "never"})
|
||||
if err != nil {
|
||||
t.Fatalf("ui.New: %v", err)
|
||||
}
|
||||
if execErr := runKong(t, &GmailWatchServeCmd{}, []string{"--port", "9999", "--path", "/hook", "--fetch-delay", "750ms"}, ui.WithUI(context.Background(), u), flags); execErr != nil {
|
||||
t.Fatalf("execute: %v", execErr)
|
||||
}
|
||||
if got == nil {
|
||||
t.Fatalf("expected server")
|
||||
}
|
||||
if got.cfg.FetchDelay != 750*time.Millisecond {
|
||||
t.Fatalf("expected fetch delay 750ms, got %v", got.cfg.FetchDelay)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServeCmd_ExcludeLabels_Disable(t *testing.T) {
|
||||
origListen := listenAndServe
|
||||
t.Cleanup(func() { listenAndServe = origListen })
|
||||
|
||||
@ -37,4 +37,16 @@ func TestGmailWatchServeCmd_ValidationErrors(t *testing.T) {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("fetch delay must be non-negative", func(t *testing.T) {
|
||||
if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "-1", "--port", "9999"}, context.Background(), flags); err == nil {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("fetch delay must parse as duration", func(t *testing.T) {
|
||||
if err := runKong(t, &GmailWatchServeCmd{}, []string{"--fetch-delay", "not-a-duration", "--port", "9999"}, context.Background(), flags); err == nil {
|
||||
t.Fatalf("expected error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -30,6 +30,7 @@ type gmailWatchServer struct {
|
||||
store *gmailWatchStore
|
||||
validator *idtoken.Validator
|
||||
newService func(context.Context, string) (*gmail.Service, error)
|
||||
sleep func(context.Context, time.Duration) error
|
||||
hookClient *http.Client
|
||||
excludeLabelIDs map[string]struct{}
|
||||
logf func(string, ...any)
|
||||
@ -172,6 +173,11 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
|
||||
return nil, errNoNewMessages
|
||||
}
|
||||
|
||||
err = s.sleepForFetch(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
svc, err := s.newService(ctx, s.cfg.Account)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -181,8 +187,7 @@ func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayl
|
||||
if len(s.cfg.HistoryTypes) > 0 {
|
||||
historyCall.HistoryTypes(s.cfg.HistoryTypes...)
|
||||
}
|
||||
|
||||
historyResp, err := historyCall.Do()
|
||||
historyResp, err := historyCall.Context(ctx).Do()
|
||||
if err != nil {
|
||||
if isStaleHistoryError(err) {
|
||||
return s.resyncHistory(ctx, svc, payload.HistoryID, payload.MessageID)
|
||||
@ -267,6 +272,23 @@ func (s *gmailWatchServer) resyncHistory(ctx context.Context, svc *gmail.Service
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *gmailWatchServer) sleepForFetch(ctx context.Context) error {
|
||||
if s.cfg.FetchDelay <= 0 {
|
||||
return nil
|
||||
}
|
||||
if s.sleep != nil {
|
||||
return s.sleep(ctx, s.cfg.FetchDelay)
|
||||
}
|
||||
timer := time.NewTimer(s.cfg.FetchDelay)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-timer.C:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *gmailWatchServer) fetchMessages(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailHookMessage, int, error) {
|
||||
messages := make([]gmailHookMessage, 0, len(ids))
|
||||
excluded := 0
|
||||
|
||||
@ -362,6 +362,122 @@ func TestGmailWatchHelpers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServer_HandlePush_AppliesFetchDelay(t *testing.T) {
|
||||
home := t.TempDir()
|
||||
t.Setenv("HOME", home)
|
||||
|
||||
store, err := newGmailWatchStore("a@b.com")
|
||||
if err != nil {
|
||||
t.Fatalf("store: %v", err)
|
||||
}
|
||||
if updateErr := store.Update(func(s *gmailWatchState) error {
|
||||
s.Account = "a@b.com"
|
||||
s.HistoryID = "100"
|
||||
return nil
|
||||
}); updateErr != nil {
|
||||
t.Fatalf("seed: %v", updateErr)
|
||||
}
|
||||
|
||||
var historyCalls int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch {
|
||||
case strings.Contains(r.URL.Path, "/gmail/v1/users/me/history"):
|
||||
historyCalls++
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"historyId": "200",
|
||||
"history": []map[string]any{
|
||||
{"messagesAdded": []map[string]any{
|
||||
{"message": map[string]any{"id": "m1"}},
|
||||
}},
|
||||
},
|
||||
})
|
||||
return
|
||||
case strings.Contains(r.URL.Path, "/gmail/v1/users/me/messages/m1"):
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"id": "m1",
|
||||
"threadId": "t1",
|
||||
"snippet": "hi",
|
||||
"payload": map[string]any{"headers": []map[string]any{{"name": "Subject", "value": "S"}}},
|
||||
})
|
||||
return
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
gsvc, err := gmail.NewService(context.Background(),
|
||||
option.WithoutAuthentication(),
|
||||
option.WithHTTPClient(srv.Client()),
|
||||
option.WithEndpoint(srv.URL+"/"),
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("NewService: %v", err)
|
||||
}
|
||||
|
||||
var slept time.Duration
|
||||
var sleepCalls int
|
||||
server := &gmailWatchServer{
|
||||
cfg: gmailWatchServeConfig{
|
||||
Account: "a@b.com",
|
||||
HistoryMax: 10,
|
||||
FetchDelay: 5 * time.Second,
|
||||
},
|
||||
store: store,
|
||||
newService: func(context.Context, string) (*gmail.Service, error) { return gsvc, nil },
|
||||
sleep: func(_ context.Context, d time.Duration) error {
|
||||
sleepCalls++
|
||||
slept = d
|
||||
return nil
|
||||
},
|
||||
logf: func(string, ...any) {},
|
||||
warnf: func(string, ...any) {},
|
||||
}
|
||||
|
||||
got, err := server.handlePush(context.Background(), gmailPushPayload{EmailAddress: "a@b.com", HistoryID: "200"})
|
||||
if err != nil {
|
||||
t.Fatalf("handlePush: %v", err)
|
||||
}
|
||||
if got == nil || len(got.Messages) != 1 {
|
||||
t.Fatalf("unexpected payload: %#v", got)
|
||||
}
|
||||
if sleepCalls != 1 {
|
||||
t.Fatalf("expected one sleep call, got %d", sleepCalls)
|
||||
}
|
||||
if slept != 5*time.Second {
|
||||
t.Fatalf("expected 5s sleep, got %v", slept)
|
||||
}
|
||||
if historyCalls != 1 {
|
||||
t.Fatalf("expected one history call, got %d", historyCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServer_HandlePush_FetchDelayCanceledContext(t *testing.T) {
|
||||
var serviceCalls int
|
||||
server := &gmailWatchServer{
|
||||
cfg: gmailWatchServeConfig{Account: "a@b.com", FetchDelay: time.Second},
|
||||
store: &gmailWatchStore{state: gmailWatchState{HistoryID: "100"}},
|
||||
newService: func(context.Context, string) (*gmail.Service, error) {
|
||||
serviceCalls++
|
||||
return nil, errors.New("unexpected newService call")
|
||||
},
|
||||
logf: func(string, ...any) {},
|
||||
warnf: func(string, ...any) {},
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
if _, err := server.handlePush(ctx, gmailPushPayload{HistoryID: "200"}); !errors.Is(err, context.Canceled) {
|
||||
t.Fatalf("expected canceled context, got %v", err)
|
||||
}
|
||||
if serviceCalls != 0 {
|
||||
t.Fatalf("expected no service calls, got %d", serviceCalls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGmailWatchServer_OIDCAudience(t *testing.T) {
|
||||
s := &gmailWatchServer{
|
||||
cfg: gmailWatchServeConfig{OIDCAudience: ""},
|
||||
|
||||
@ -12,6 +12,7 @@ const (
|
||||
defaultHookMaxBytes = 20000
|
||||
defaultHistoryMaxResults = 100
|
||||
defaultHistoryResyncMax = 10
|
||||
defaultHistoryFetchDelay = 3 * time.Second
|
||||
defaultPushBodyLimitBytes = 1024 * 1024
|
||||
defaultHookRequestTimeoutSec = 10
|
||||
)
|
||||
@ -55,6 +56,7 @@ type gmailWatchServeConfig struct {
|
||||
ExcludeLabels []string
|
||||
HistoryMax int64
|
||||
ResyncMax int64
|
||||
FetchDelay time.Duration
|
||||
HistoryTypes []string
|
||||
HookTimeout time.Duration
|
||||
DateLocation *time.Location
|
||||
|
||||
Loading…
Reference in New Issue
Block a user