notcrawl/internal/notionapi/api.go
2026-04-29 04:17:02 -07:00

666 lines
16 KiB
Go

package notionapi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/vincentkoc/notcrawl/internal/notiontext"
"github.com/vincentkoc/notcrawl/internal/store"
)
const SourceName = "api"
const maxAPIAttempts = 4
type Client struct {
BaseURL string
Version string
Token string
HTTP *http.Client
}
type Summary struct {
Users int
Pages int
Blocks int
Comments int
Databases int
DatabaseRows int
}
func (c Client) Sync(ctx context.Context, st *store.Store) (Summary, error) {
if strings.TrimSpace(c.Token) == "" {
return Summary{}, fmt.Errorf("missing Notion API token")
}
if c.BaseURL == "" {
c.BaseURL = "https://api.notion.com/v1"
}
if c.Version == "" {
c.Version = "2026-03-11"
}
if c.HTTP == nil {
c.HTTP = http.DefaultClient
}
var s Summary
if err := st.DeferPageFTS(ctx, func() error {
users, err := c.listUsers(ctx)
if err != nil {
return err
}
for _, u := range users {
raw := notiontext.MarshalRaw(u)
if err := st.UpsertUser(ctx, store.User{
ID: u.string("id"), Name: userName(u), Email: userEmail(u), RawJSON: raw, Source: SourceName, SyncedAt: store.NowMS(),
}); err != nil {
return err
}
s.Users++
}
pages, err := c.searchPages(ctx)
if err != nil {
return err
}
for _, page := range pages {
count, comments, err := c.ingestPage(ctx, st, page, ingestPageOptions{FetchBlocks: true, FetchComments: true})
if err != nil {
return err
}
s.Pages++
s.Blocks += count
s.Comments += comments
}
collections, err := c.searchCollections(ctx)
if err != nil {
return err
}
for _, collection := range collections {
rows, err := c.ingestCollection(ctx, st, collection)
if err != nil {
return err
}
s.Databases++
s.DatabaseRows += rows
}
if err := st.SetSyncState(ctx, SourceName, "workspace", "default", time.Now().Format(time.RFC3339)); err != nil {
return err
}
return nil
}); err != nil {
return s, err
}
return s, nil
}
type obj map[string]any
func (o obj) string(key string) string {
if v, ok := o[key].(string); ok {
return v
}
return ""
}
func (o obj) bool(key string) bool {
if v, ok := o[key].(bool); ok {
return v
}
return false
}
func (o obj) mapObj(key string) obj {
if v, ok := o[key].(map[string]any); ok {
return obj(v)
}
return nil
}
func (c Client) listUsers(ctx context.Context) ([]obj, error) {
var out []obj
cursor := ""
for {
path := "/users?page_size=100"
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
return nil, err
}
for _, item := range asSlice(resp["results"]) {
if m, ok := item.(map[string]any); ok {
out = append(out, obj(m))
}
}
if !truthy(resp["has_more"]) {
return out, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return out, nil
}
}
}
func (c Client) searchPages(ctx context.Context) ([]obj, error) {
return c.searchObjects(ctx, "page")
}
func (c Client) searchCollections(ctx context.Context) ([]obj, error) {
return c.searchObjects(ctx, c.collectionSearchType())
}
func (c Client) searchObjects(ctx context.Context, objectType string) ([]obj, error) {
var out []obj
cursor := ""
for {
body := obj{"page_size": 100, "filter": obj{"property": "object", "value": objectType}}
if cursor != "" {
body["start_cursor"] = cursor
}
var resp obj
if err := c.do(ctx, http.MethodPost, "/search", body, &resp); err != nil {
return nil, err
}
for _, item := range asSlice(resp["results"]) {
if m, ok := item.(map[string]any); ok {
out = append(out, obj(m))
}
}
if !truthy(resp["has_more"]) {
return out, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return out, nil
}
}
}
type ingestPageOptions struct {
CollectionID string
FetchBlocks bool
FetchComments bool
}
func (c Client) ingestPage(ctx context.Context, st *store.Store, page obj, opts ingestPageOptions) (blockCount int, commentCount int, err error) {
raw := notiontext.MarshalRaw(page)
props := marshalAny(page["properties"])
parent := page.mapObj("parent")
parentID := parent.string("page_id")
if parentID == "" {
parentID = parent.string("database_id")
}
if parentID == "" {
parentID = parent.string("data_source_id")
}
collectionID := opts.CollectionID
if collectionID == "" && (parent.string("type") == "database_id" || parent.string("type") == "data_source_id") {
collectionID = parentID
}
spaceID := parent.string("workspace")
p := store.Page{
ID: page.string("id"),
SpaceID: spaceID,
ParentID: parentID,
ParentTable: parent.string("type"),
CollectionID: collectionID,
Title: titleFromAPIPage(page),
URL: page.string("url"),
PropertiesJSON: props,
CreatedTime: parseTimeMS(page.string("created_time")),
LastEditedTime: parseTimeMS(page.string("last_edited_time")),
Alive: !page.bool("archived") && !page.bool("in_trash"),
Source: SourceName,
RawJSON: raw,
SyncedAt: store.NowMS(),
}
if p.Title == "" {
p.Title = "Untitled"
}
if err := st.UpsertPage(ctx, p); err != nil {
return 0, 0, err
}
var blocks, comments int
if opts.FetchBlocks {
blocks, err = c.walkBlocks(ctx, st, p.ID, p.ID, p.SpaceID)
if err != nil {
return 0, 0, err
}
}
if opts.FetchComments {
comments, err = c.ingestComments(ctx, st, p.ID, p.SpaceID)
if err != nil {
return 0, 0, err
}
}
return blocks, comments, nil
}
func (c Client) ingestCollection(ctx context.Context, st *store.Store, collection obj) (int, error) {
id := collection.string("id")
raw := notiontext.MarshalRaw(collection)
parent := collection.mapObj("parent")
if len(parent) == 0 {
parent = collection.mapObj("database_parent")
}
parentID := firstNonEmpty(parent.string("database_id"), parent.string("page_id"), parent.string("block_id"), parent.string("workspace"))
name := notiontext.Plain(collection["title"])
if name == "" {
name = id
}
if err := st.UpsertCollection(ctx, store.Collection{
ID: id,
SpaceID: parent.string("workspace"),
ParentID: parentID,
ParentTable: parent.string("type"),
Name: name,
SchemaJSON: marshalAny(collection["properties"]),
FormatJSON: marshalAny(collection),
RawJSON: raw,
Source: SourceName,
SyncedAt: store.NowMS(),
}); err != nil {
return 0, err
}
if err := st.UpsertRawRecord(ctx, store.RawRecord{
Source: SourceName, RecordTable: c.collectionSearchType(), RecordID: id, ParentID: parentID,
SpaceID: parent.string("workspace"), RawJSON: raw, SyncedAt: store.NowMS(),
}); err != nil {
return 0, err
}
return c.queryCollection(ctx, st, id)
}
func (c Client) queryCollection(ctx context.Context, st *store.Store, collectionID string) (int, error) {
var count int
cursor := ""
for {
body := obj{"page_size": 100}
if cursor != "" {
body["start_cursor"] = cursor
}
var resp obj
path := fmt.Sprintf("%s/%s/query", c.collectionQueryBasePath(), url.PathEscape(collectionID))
if err := c.do(ctx, http.MethodPost, path, body, &resp); err != nil {
return count, err
}
for _, item := range asSlice(resp["results"]) {
m, ok := item.(map[string]any)
if !ok {
continue
}
if itemType := obj(m).string("object"); itemType != "" && itemType != "page" {
if itemType == c.collectionSearchType() {
if _, err := c.ingestCollection(ctx, st, obj(m)); err != nil {
return count, err
}
}
continue
}
if _, _, err := c.ingestPage(ctx, st, obj(m), ingestPageOptions{CollectionID: collectionID}); err != nil {
return count, err
}
count++
}
if !truthy(resp["has_more"]) {
return count, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return count, nil
}
}
}
func (c Client) collectionSearchType() string {
if c.usesDataSourceAPI() {
return "data_source"
}
return "database"
}
func (c Client) collectionQueryBasePath() string {
if c.usesDataSourceAPI() {
return "/data_sources"
}
return "/databases"
}
func (c Client) usesDataSourceAPI() bool {
return c.Version >= "2025-09-03"
}
func (c Client) walkBlocks(ctx context.Context, st *store.Store, pageID, parentID, spaceID string) (int, error) {
var count int
cursor := ""
var displayOrder int64
for {
path := fmt.Sprintf("/blocks/%s/children?page_size=100", url.PathEscape(parentID))
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
return count, err
}
for _, item := range asSlice(resp["results"]) {
m, ok := item.(map[string]any)
if !ok {
continue
}
block := obj(m)
typ := block.string("type")
typeBody := block[typ]
text := notiontext.Plain(typeBody)
raw := notiontext.MarshalRaw(block)
displayOrder++
if err := st.UpsertBlock(ctx, store.Block{
ID: block.string("id"),
PageID: pageID,
SpaceID: spaceID,
ParentID: parentID,
ParentTable: "block",
Type: typ,
Text: text,
PropertiesJSON: marshalAny(typeBody),
DisplayOrder: displayOrder,
CreatedTime: parseTimeMS(block.string("created_time")),
LastEditedTime: parseTimeMS(block.string("last_edited_time")),
Alive: !block.bool("archived") && !block.bool("in_trash"),
Source: SourceName,
RawJSON: raw,
SyncedAt: store.NowMS(),
}); err != nil {
return count, err
}
count++
if truthy(block["has_children"]) {
n, err := c.walkBlocks(ctx, st, pageID, block.string("id"), spaceID)
if err != nil {
return count, err
}
count += n
}
}
if !truthy(resp["has_more"]) {
return count, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return count, nil
}
}
}
func (c Client) ingestComments(ctx context.Context, st *store.Store, pageID, spaceID string) (int, error) {
var count int
cursor := ""
for {
path := "/comments?block_id=" + url.QueryEscape(pageID) + "&page_size=100"
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
if isIgnoredCommentError(err) {
return count, nil
}
return count, err
}
for _, item := range asSlice(resp["results"]) {
m, ok := item.(map[string]any)
if !ok {
continue
}
comment := obj(m)
createdBy := comment.mapObj("created_by")
if err := st.UpsertComment(ctx, store.Comment{
ID: comment.string("id"),
PageID: pageID,
SpaceID: spaceID,
ParentID: pageID,
Text: notiontext.Plain(comment["rich_text"]),
CreatedByID: createdBy.string("id"),
CreatedTime: parseTimeMS(comment.string("created_time")),
LastEditedTime: parseTimeMS(comment.string("last_edited_time")),
Alive: true,
RawJSON: notiontext.MarshalRaw(comment),
Source: SourceName,
SyncedAt: store.NowMS(),
}); err != nil {
return count, err
}
count++
}
if !truthy(resp["has_more"]) {
return count, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return count, nil
}
}
}
func (c Client) do(ctx context.Context, method, path string, body any, out any) error {
var bodyBytes []byte
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return err
}
bodyBytes = b
}
for attempt := 1; attempt <= maxAPIAttempts; attempt++ {
var reader io.Reader
if bodyBytes != nil {
reader = bytes.NewReader(bodyBytes)
}
req, err := http.NewRequestWithContext(ctx, method, strings.TrimRight(c.BaseURL, "/")+path, reader)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("Notion-Version", c.Version)
req.Header.Set("Accept", "application/json")
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.HTTP.Do(req)
if err != nil {
return err
}
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
defer resp.Body.Close()
return json.NewDecoder(resp.Body).Decode(out)
}
b, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
resp.Body.Close()
apiErr := apiErrorFromResponse(method, path, resp, b)
if attempt < maxAPIAttempts && shouldRetry(apiErr) {
if err := waitBeforeRetry(ctx, apiErr.RetryAfter); err != nil {
return err
}
continue
}
return apiErr
}
return nil
}
type notionAPIError struct {
Method string
Path string
Status string
StatusCode int
Code string
Message string
Body string
RetryAfter time.Duration
Retryable bool
}
func (e notionAPIError) Error() string {
if e.Code != "" || e.Message != "" {
return fmt.Sprintf("notion api %s %s: %s: %s: %s", e.Method, e.Path, e.Status, e.Code, e.Message)
}
return fmt.Sprintf("notion api %s %s: %s: %s", e.Method, e.Path, e.Status, e.Body)
}
func apiErrorFromResponse(method, path string, resp *http.Response, body []byte) notionAPIError {
bodyText := strings.TrimSpace(string(body))
apiErr := notionAPIError{
Method: method,
Path: path,
Status: resp.Status,
StatusCode: resp.StatusCode,
Body: bodyText,
RetryAfter: retryAfter(resp.Header.Get("Retry-After"), body),
}
var payload struct {
Code string `json:"code"`
Message string `json:"message"`
Retryable bool `json:"retryable"`
RetryAfter float64 `json:"retry_after"`
}
if err := json.Unmarshal(body, &payload); err == nil {
apiErr.Code = payload.Code
apiErr.Message = payload.Message
apiErr.Retryable = payload.Retryable
if payload.RetryAfter > 0 && apiErr.RetryAfter == 0 {
apiErr.RetryAfter = time.Duration(payload.RetryAfter * float64(time.Second))
}
}
return apiErr
}
func shouldRetry(err notionAPIError) bool {
if err.StatusCode == http.StatusTooManyRequests || err.Retryable {
return true
}
return err.StatusCode == http.StatusBadGateway ||
err.StatusCode == http.StatusServiceUnavailable ||
err.StatusCode == http.StatusGatewayTimeout
}
func retryAfter(header string, body []byte) time.Duration {
if header != "" {
if seconds, err := time.ParseDuration(header + "s"); err == nil && seconds > 0 {
return seconds
}
if when, err := http.ParseTime(header); err == nil {
if wait := time.Until(when); wait > 0 {
return wait
}
}
}
var payload struct {
RetryAfter float64 `json:"retry_after"`
}
if err := json.Unmarshal(body, &payload); err == nil && payload.RetryAfter > 0 {
return time.Duration(payload.RetryAfter * float64(time.Second))
}
return 0
}
func waitBeforeRetry(ctx context.Context, wait time.Duration) error {
if wait <= 0 {
return nil
}
timer := time.NewTimer(wait)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}
func isIgnoredCommentError(err error) bool {
apiErr, ok := err.(notionAPIError)
if !ok {
return false
}
if apiErr.StatusCode == http.StatusNotFound || apiErr.Code == "not_found" {
return true
}
return apiErr.StatusCode == http.StatusForbidden && apiErr.Code == "restricted_resource"
}
func userName(u obj) string {
if name := u.string("name"); name != "" {
return name
}
person := u.mapObj("person")
return person.string("email")
}
func userEmail(u obj) string {
person := u.mapObj("person")
return person.string("email")
}
func titleFromAPIPage(page obj) string {
props, ok := page["properties"].(map[string]any)
if !ok {
return ""
}
for _, prop := range props {
m, ok := prop.(map[string]any)
if !ok || m["type"] != "title" {
continue
}
return notiontext.Plain(m["title"])
}
return ""
}
func marshalAny(v any) string {
b, err := json.Marshal(v)
if err != nil {
return "{}"
}
return string(b)
}
func parseTimeMS(s string) int64 {
if s == "" {
return 0
}
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
return 0
}
return t.UnixMilli()
}
func truthy(v any) bool {
b, _ := v.(bool)
return b
}
func asSlice(v any) []any {
if s, ok := v.([]any); ok {
return s
}
return nil
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if value != "" {
return value
}
}
return ""
}