440 lines
11 KiB
Go
440 lines
11 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/subtle"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"google.golang.org/api/gmail/v1"
|
|
"google.golang.org/api/googleapi"
|
|
"google.golang.org/api/idtoken"
|
|
)
|
|
|
|
type gmailWatchServer struct {
|
|
cfg gmailWatchServeConfig
|
|
store *gmailWatchStore
|
|
validator *idtoken.Validator
|
|
newService func(context.Context, string) (*gmail.Service, error)
|
|
hookClient *http.Client
|
|
logf func(string, ...any)
|
|
warnf func(string, ...any)
|
|
}
|
|
|
|
func (s *gmailWatchServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
if !pathMatches(s.cfg.Path, r.URL.Path) {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
return
|
|
}
|
|
if r.Method != http.MethodPost {
|
|
w.Header().Set("Allow", http.MethodPost)
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
if ok := s.authorize(r); !ok {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
|
|
push, err := parsePubSubPush(r)
|
|
if err != nil {
|
|
s.warnf("watch: invalid push payload: %v", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
payload, err := decodeGmailPushPayload(push)
|
|
if err != nil {
|
|
s.warnf("watch: invalid push data: %v", err)
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
if payload.EmailAddress != "" && !strings.EqualFold(payload.EmailAddress, s.cfg.Account) {
|
|
s.warnf("watch: ignoring push for %s", payload.EmailAddress)
|
|
w.WriteHeader(http.StatusAccepted)
|
|
return
|
|
}
|
|
|
|
result, err := s.handlePush(r.Context(), payload)
|
|
if err != nil {
|
|
s.warnf("watch: handle push failed: %v", err)
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if result == nil {
|
|
w.WriteHeader(http.StatusAccepted)
|
|
return
|
|
}
|
|
|
|
if s.cfg.HookURL == "" {
|
|
if s.cfg.AllowNoHook {
|
|
_ = json.NewEncoder(w).Encode(result)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusAccepted)
|
|
return
|
|
}
|
|
|
|
if err := s.sendHook(r.Context(), result); err != nil {
|
|
s.warnf("watch: hook failed: %v", err)
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
w.WriteHeader(http.StatusOK)
|
|
}
|
|
|
|
func (s *gmailWatchServer) authorize(r *http.Request) bool {
|
|
if s.cfg.VerifyOIDC {
|
|
bearer := bearerToken(r)
|
|
if bearer != "" {
|
|
if ok, err := verifyOIDCToken(r.Context(), s.validator, bearer, s.oidcAudience(r), s.cfg.OIDCEmail); ok {
|
|
return true
|
|
} else if err != nil {
|
|
s.warnf("watch: oidc verify failed: %v", err)
|
|
}
|
|
}
|
|
if s.cfg.SharedToken != "" {
|
|
return sharedTokenMatches(r, s.cfg.SharedToken)
|
|
}
|
|
return false
|
|
}
|
|
if s.cfg.SharedToken == "" {
|
|
return true
|
|
}
|
|
return sharedTokenMatches(r, s.cfg.SharedToken)
|
|
}
|
|
|
|
func (s *gmailWatchServer) oidcAudience(r *http.Request) string {
|
|
if s.cfg.OIDCAudience != "" {
|
|
return s.cfg.OIDCAudience
|
|
}
|
|
scheme := "http"
|
|
if r.TLS != nil {
|
|
scheme = "https"
|
|
}
|
|
if xf := r.Header.Get("X-Forwarded-Proto"); xf != "" {
|
|
parts := strings.Split(xf, ",")
|
|
if len(parts) > 0 && strings.TrimSpace(parts[0]) != "" {
|
|
scheme = strings.TrimSpace(parts[0])
|
|
}
|
|
}
|
|
host := r.Host
|
|
if xf := r.Header.Get("X-Forwarded-Host"); xf != "" {
|
|
parts := strings.Split(xf, ",")
|
|
if len(parts) > 0 && strings.TrimSpace(parts[0]) != "" {
|
|
host = strings.TrimSpace(parts[0])
|
|
}
|
|
}
|
|
return fmt.Sprintf("%s://%s%s", scheme, host, r.URL.Path)
|
|
}
|
|
|
|
func (s *gmailWatchServer) handlePush(ctx context.Context, payload gmailPushPayload) (*gmailHookPayload, error) {
|
|
store := s.store
|
|
startID, err := store.StartHistoryID(payload.HistoryID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if startID == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
svc, err := s.newService(ctx, s.cfg.Account)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
historyCall := svc.Users.History.List("me").StartHistoryId(startID).MaxResults(s.cfg.HistoryMax)
|
|
historyCall.HistoryTypes("messageAdded")
|
|
|
|
historyResp, err := historyCall.Do()
|
|
if err != nil {
|
|
if isStaleHistoryError(err) {
|
|
return s.resyncHistory(ctx, svc, payload.HistoryID)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
messageIDs := collectHistoryMessageIDs(historyResp)
|
|
msgs, err := s.fetchMessages(ctx, svc, messageIDs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
nextHistoryID := payload.HistoryID
|
|
if historyResp != nil && historyResp.HistoryId != 0 {
|
|
nextHistoryID = formatHistoryID(historyResp.HistoryId)
|
|
}
|
|
if err := store.Update(func(state *gmailWatchState) error {
|
|
state.HistoryID = nextHistoryID
|
|
state.UpdatedAtMs = time.Now().UnixMilli()
|
|
return nil
|
|
}); err != nil {
|
|
s.warnf("watch: failed to update state: %v", err)
|
|
}
|
|
|
|
return &gmailHookPayload{
|
|
Source: "gmail",
|
|
Account: s.cfg.Account,
|
|
HistoryID: nextHistoryID,
|
|
Messages: msgs,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gmailWatchServer) resyncHistory(ctx context.Context, svc *gmail.Service, historyID string) (*gmailHookPayload, error) {
|
|
list, err := svc.Users.Messages.List("me").MaxResults(s.cfg.ResyncMax).Do()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ids := make([]string, 0, len(list.Messages))
|
|
for _, m := range list.Messages {
|
|
if m != nil && m.Id != "" {
|
|
ids = append(ids, m.Id)
|
|
}
|
|
}
|
|
msgs, err := s.fetchMessages(ctx, svc, ids)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := s.store.Update(func(state *gmailWatchState) error {
|
|
state.HistoryID = historyID
|
|
state.UpdatedAtMs = time.Now().UnixMilli()
|
|
return nil
|
|
}); err != nil {
|
|
s.warnf("watch: failed to update state after resync: %v", err)
|
|
}
|
|
|
|
return &gmailHookPayload{
|
|
Source: "gmail",
|
|
Account: s.cfg.Account,
|
|
HistoryID: historyID,
|
|
Messages: msgs,
|
|
}, nil
|
|
}
|
|
|
|
func (s *gmailWatchServer) fetchMessages(ctx context.Context, svc *gmail.Service, ids []string) ([]gmailHookMessage, error) {
|
|
messages := make([]gmailHookMessage, 0, len(ids))
|
|
format := "metadata"
|
|
if s.cfg.IncludeBody {
|
|
format = "full"
|
|
}
|
|
for _, id := range ids {
|
|
if strings.TrimSpace(id) == "" {
|
|
continue
|
|
}
|
|
msg, err := svc.Users.Messages.Get("me", id).
|
|
Format(format).
|
|
MetadataHeaders("From", "To", "Subject", "Date").
|
|
Context(ctx).
|
|
Do()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
item := gmailHookMessage{
|
|
ID: msg.Id,
|
|
ThreadID: msg.ThreadId,
|
|
From: headerValue(msg.Payload, "From"),
|
|
To: headerValue(msg.Payload, "To"),
|
|
Subject: headerValue(msg.Payload, "Subject"),
|
|
Date: formatGmailDate(headerValue(msg.Payload, "Date")),
|
|
Snippet: msg.Snippet,
|
|
Labels: msg.LabelIds,
|
|
}
|
|
if s.cfg.IncludeBody {
|
|
body := bestBodyText(msg.Payload)
|
|
item.Body, item.BodyTruncated = truncateUTF8Bytes(body, s.cfg.MaxBodyBytes)
|
|
}
|
|
messages = append(messages, item)
|
|
}
|
|
return messages, nil
|
|
}
|
|
|
|
func (s *gmailWatchServer) sendHook(ctx context.Context, payload *gmailHookPayload) error {
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.cfg.HookURL, bytes.NewReader(data))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if s.cfg.HookToken != "" {
|
|
req.Header.Set("Authorization", "Bearer "+s.cfg.HookToken)
|
|
}
|
|
resp, err := s.hookClient.Do(req)
|
|
if err != nil {
|
|
_ = s.store.Update(func(state *gmailWatchState) error {
|
|
state.LastDeliveryStatus = "error"
|
|
state.LastDeliveryAtMs = time.Now().UnixMilli()
|
|
state.LastDeliveryStatusNote = err.Error()
|
|
return nil
|
|
})
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
|
_ = s.store.Update(func(state *gmailWatchState) error {
|
|
state.LastDeliveryStatus = "http_error"
|
|
state.LastDeliveryAtMs = time.Now().UnixMilli()
|
|
state.LastDeliveryStatusNote = fmt.Sprintf("status %d", resp.StatusCode)
|
|
return nil
|
|
})
|
|
return fmt.Errorf("hook status %d", resp.StatusCode)
|
|
}
|
|
_ = s.store.Update(func(state *gmailWatchState) error {
|
|
state.LastDeliveryStatus = "ok"
|
|
state.LastDeliveryAtMs = time.Now().UnixMilli()
|
|
state.LastDeliveryStatusNote = ""
|
|
return nil
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func parsePubSubPush(r *http.Request) (*pubsubPushEnvelope, error) {
|
|
defer r.Body.Close()
|
|
limit := int64(defaultPushBodyLimitBytes)
|
|
data, err := io.ReadAll(io.LimitReader(r.Body, limit+1))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if int64(len(data)) > limit {
|
|
return nil, errors.New("push body too large")
|
|
}
|
|
var envelope pubsubPushEnvelope
|
|
if err := json.Unmarshal(data, &envelope); err != nil {
|
|
return nil, err
|
|
}
|
|
if envelope.Message.Data == "" {
|
|
return nil, errors.New("missing message.data")
|
|
}
|
|
return &envelope, nil
|
|
}
|
|
|
|
func decodeGmailPushPayload(envelope *pubsubPushEnvelope) (gmailPushPayload, error) {
|
|
decoded, err := base64.StdEncoding.DecodeString(envelope.Message.Data)
|
|
if err != nil {
|
|
decoded, err = base64.RawStdEncoding.DecodeString(envelope.Message.Data)
|
|
if err != nil {
|
|
return gmailPushPayload{}, err
|
|
}
|
|
}
|
|
var payload gmailPushPayload
|
|
if err := json.Unmarshal(decoded, &payload); err != nil {
|
|
return gmailPushPayload{}, err
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
func bearerToken(r *http.Request) string {
|
|
auth := r.Header.Get("Authorization")
|
|
if auth == "" {
|
|
return ""
|
|
}
|
|
parts := strings.SplitN(auth, " ", 2)
|
|
if len(parts) != 2 {
|
|
return ""
|
|
}
|
|
if !strings.EqualFold(parts[0], "bearer") {
|
|
return ""
|
|
}
|
|
return strings.TrimSpace(parts[1])
|
|
}
|
|
|
|
func sharedTokenMatches(r *http.Request, expected string) bool {
|
|
if expected == "" {
|
|
return false
|
|
}
|
|
token := r.Header.Get("x-gog-token")
|
|
if token == "" {
|
|
token = r.URL.Query().Get("token")
|
|
}
|
|
if token == "" {
|
|
return false
|
|
}
|
|
return subtle.ConstantTimeCompare([]byte(token), []byte(expected)) == 1
|
|
}
|
|
|
|
func verifyOIDCToken(ctx context.Context, validator *idtoken.Validator, token, audience, expectedEmail string) (bool, error) {
|
|
if validator == nil {
|
|
return false, errors.New("no OIDC validator")
|
|
}
|
|
payload, err := validator.Validate(ctx, token, audience)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if expectedEmail == "" {
|
|
return true, nil
|
|
}
|
|
email, _ := payload.Claims["email"].(string)
|
|
if !strings.EqualFold(email, expectedEmail) {
|
|
return false, fmt.Errorf("oidc email mismatch: %s", email)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func pathMatches(expected, actual string) bool {
|
|
if expected == actual {
|
|
return true
|
|
}
|
|
if strings.HasSuffix(expected, "/") {
|
|
return strings.HasPrefix(actual, expected)
|
|
}
|
|
return strings.HasPrefix(actual, expected+"/")
|
|
}
|
|
|
|
func isStaleHistoryError(err error) bool {
|
|
var gerr *googleapi.Error
|
|
if errors.As(err, &gerr) {
|
|
if gerr.Code == http.StatusBadRequest || gerr.Code == http.StatusNotFound {
|
|
if strings.Contains(strings.ToLower(gerr.Message), "history") {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return strings.Contains(strings.ToLower(err.Error()), "history")
|
|
}
|
|
|
|
func collectHistoryMessageIDs(resp *gmail.ListHistoryResponse) []string {
|
|
if resp == nil || len(resp.History) == 0 {
|
|
return nil
|
|
}
|
|
seen := make(map[string]struct{})
|
|
out := make([]string, 0)
|
|
for _, h := range resp.History {
|
|
if h == nil {
|
|
continue
|
|
}
|
|
for _, added := range h.MessagesAdded {
|
|
if added == nil || added.Message == nil || added.Message.Id == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[added.Message.Id]; ok {
|
|
continue
|
|
}
|
|
seen[added.Message.Id] = struct{}{}
|
|
out = append(out, added.Message.Id)
|
|
}
|
|
for _, msg := range h.Messages {
|
|
if msg == nil || msg.Id == "" {
|
|
continue
|
|
}
|
|
if _, ok := seen[msg.Id]; ok {
|
|
continue
|
|
}
|
|
seen[msg.Id] = struct{}{}
|
|
out = append(out, msg.Id)
|
|
}
|
|
}
|
|
return out
|
|
}
|