feat: add notion ingestion adapters

This commit is contained in:
Vincent Koc 2026-04-22 15:53:46 -07:00
parent dda7f700ac
commit e814a23509
No known key found for this signature in database
4 changed files with 899 additions and 0 deletions

407
internal/notionapi/api.go Normal file
View File

@ -0,0 +1,407 @@
package notionapi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/vincentkoc/notioncrawl/internal/notiontext"
"github.com/vincentkoc/notioncrawl/internal/store"
)
const SourceName = "api"
type Client struct {
BaseURL string
Version string
Token string
HTTP *http.Client
}
type Summary struct {
Users int
Pages int
Blocks int
Comments int
}
func (c Client) Sync(ctx context.Context, st *store.Store) (Summary, error) {
if strings.TrimSpace(c.Token) == "" {
return Summary{}, fmt.Errorf("missing Notion API token")
}
if c.BaseURL == "" {
c.BaseURL = "https://api.notion.com/v1"
}
if c.Version == "" {
c.Version = "2022-06-28"
}
if c.HTTP == nil {
c.HTTP = http.DefaultClient
}
var s Summary
users, err := c.listUsers(ctx)
if err != nil {
return s, err
}
for _, u := range users {
raw := notiontext.MarshalRaw(u)
if err := st.UpsertUser(ctx, store.User{
ID: u.string("id"), Name: userName(u), Email: userEmail(u), RawJSON: raw, Source: SourceName, SyncedAt: store.NowMS(),
}); err != nil {
return s, err
}
s.Users++
}
pages, err := c.searchPages(ctx)
if err != nil {
return s, err
}
for _, page := range pages {
count, comments, err := c.ingestPage(ctx, st, page)
if err != nil {
return s, err
}
s.Pages++
s.Blocks += count
s.Comments += comments
}
if err := st.SetSyncState(ctx, SourceName, "workspace", "default", time.Now().Format(time.RFC3339)); err != nil {
return s, err
}
return s, nil
}
type obj map[string]any
func (o obj) string(key string) string {
if v, ok := o[key].(string); ok {
return v
}
return ""
}
func (o obj) bool(key string) bool {
if v, ok := o[key].(bool); ok {
return v
}
return false
}
func (o obj) mapObj(key string) obj {
if v, ok := o[key].(map[string]any); ok {
return obj(v)
}
return nil
}
func (c Client) listUsers(ctx context.Context) ([]obj, error) {
var out []obj
cursor := ""
for {
path := "/users?page_size=100"
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
return nil, err
}
for _, item := range asSlice(resp["results"]) {
if m, ok := item.(map[string]any); ok {
out = append(out, obj(m))
}
}
if !truthy(resp["has_more"]) {
return out, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return out, nil
}
}
}
func (c Client) searchPages(ctx context.Context) ([]obj, error) {
var out []obj
cursor := ""
for {
body := obj{"page_size": 100, "filter": obj{"property": "object", "value": "page"}}
if cursor != "" {
body["start_cursor"] = cursor
}
var resp obj
if err := c.do(ctx, http.MethodPost, "/search", body, &resp); err != nil {
return nil, err
}
for _, item := range asSlice(resp["results"]) {
if m, ok := item.(map[string]any); ok {
out = append(out, obj(m))
}
}
if !truthy(resp["has_more"]) {
return out, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return out, nil
}
}
}
func (c Client) ingestPage(ctx context.Context, st *store.Store, page obj) (blockCount int, commentCount int, err error) {
raw := notiontext.MarshalRaw(page)
props := marshalAny(page["properties"])
parent := page.mapObj("parent")
parentID := parent.string("page_id")
if parentID == "" {
parentID = parent.string("database_id")
}
spaceID := parent.string("workspace")
p := store.Page{
ID: page.string("id"),
SpaceID: spaceID,
ParentID: parentID,
ParentTable: parent.string("type"),
Title: titleFromAPIPage(page),
URL: page.string("url"),
PropertiesJSON: props,
CreatedTime: parseTimeMS(page.string("created_time")),
LastEditedTime: parseTimeMS(page.string("last_edited_time")),
Alive: !page.bool("archived") && !page.bool("in_trash"),
Source: SourceName,
RawJSON: raw,
SyncedAt: store.NowMS(),
}
if p.Title == "" {
p.Title = "Untitled"
}
if err := st.UpsertPage(ctx, p); err != nil {
return 0, 0, err
}
blocks, err := c.walkBlocks(ctx, st, p.ID, p.ID, p.SpaceID)
if err != nil {
return 0, 0, err
}
comments, err := c.ingestComments(ctx, st, p.ID, p.SpaceID)
if err != nil {
return 0, 0, err
}
return blocks, comments, nil
}
func (c Client) walkBlocks(ctx context.Context, st *store.Store, pageID, parentID, spaceID string) (int, error) {
var count int
cursor := ""
for {
path := fmt.Sprintf("/blocks/%s/children?page_size=100", url.PathEscape(parentID))
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
return count, err
}
for _, item := range asSlice(resp["results"]) {
m, ok := item.(map[string]any)
if !ok {
continue
}
block := obj(m)
typ := block.string("type")
typeBody := block[typ]
text := notiontext.Plain(typeBody)
raw := notiontext.MarshalRaw(block)
if err := st.UpsertBlock(ctx, store.Block{
ID: block.string("id"),
PageID: pageID,
SpaceID: spaceID,
ParentID: parentID,
ParentTable: "block",
Type: typ,
Text: text,
PropertiesJSON: marshalAny(typeBody),
CreatedTime: parseTimeMS(block.string("created_time")),
LastEditedTime: parseTimeMS(block.string("last_edited_time")),
Alive: !block.bool("archived") && !block.bool("in_trash"),
Source: SourceName,
RawJSON: raw,
SyncedAt: store.NowMS(),
}); err != nil {
return count, err
}
count++
if truthy(block["has_children"]) {
n, err := c.walkBlocks(ctx, st, pageID, block.string("id"), spaceID)
if err != nil {
return count, err
}
count += n
}
}
if !truthy(resp["has_more"]) {
return count, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return count, nil
}
}
}
func (c Client) ingestComments(ctx context.Context, st *store.Store, pageID, spaceID string) (int, error) {
var count int
cursor := ""
for {
path := "/comments?block_id=" + url.QueryEscape(pageID) + "&page_size=100"
if cursor != "" {
path += "&start_cursor=" + url.QueryEscape(cursor)
}
var resp obj
if err := c.do(ctx, http.MethodGet, path, nil, &resp); err != nil {
if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "not_found") {
return count, nil
}
return count, err
}
for _, item := range asSlice(resp["results"]) {
m, ok := item.(map[string]any)
if !ok {
continue
}
comment := obj(m)
createdBy := comment.mapObj("created_by")
if err := st.UpsertComment(ctx, store.Comment{
ID: comment.string("id"),
PageID: pageID,
SpaceID: spaceID,
ParentID: pageID,
Text: notiontext.Plain(comment["rich_text"]),
CreatedByID: createdBy.string("id"),
CreatedTime: parseTimeMS(comment.string("created_time")),
LastEditedTime: parseTimeMS(comment.string("last_edited_time")),
Alive: true,
RawJSON: notiontext.MarshalRaw(comment),
Source: SourceName,
SyncedAt: store.NowMS(),
}); err != nil {
return count, err
}
count++
}
if !truthy(resp["has_more"]) {
return count, nil
}
cursor, _ = resp["next_cursor"].(string)
if cursor == "" {
return count, nil
}
}
}
func (c Client) do(ctx context.Context, method, path string, body any, out any) error {
var reader io.Reader
if body != nil {
b, err := json.Marshal(body)
if err != nil {
return err
}
reader = bytes.NewReader(b)
}
req, err := http.NewRequestWithContext(ctx, method, strings.TrimRight(c.BaseURL, "/")+path, reader)
if err != nil {
return err
}
req.Header.Set("Authorization", "Bearer "+c.Token)
req.Header.Set("Notion-Version", c.Version)
req.Header.Set("Accept", "application/json")
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.HTTP.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusTooManyRequests {
if wait, err := time.ParseDuration(resp.Header.Get("Retry-After") + "s"); err == nil && wait > 0 {
timer := time.NewTimer(wait)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
return c.do(ctx, method, path, body, out)
}
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
return fmt.Errorf("notion api %s %s: %s: %s", method, path, resp.Status, strings.TrimSpace(string(b)))
}
return json.NewDecoder(resp.Body).Decode(out)
}
func userName(u obj) string {
if name := u.string("name"); name != "" {
return name
}
person := u.mapObj("person")
return person.string("email")
}
func userEmail(u obj) string {
person := u.mapObj("person")
return person.string("email")
}
func titleFromAPIPage(page obj) string {
props, ok := page["properties"].(map[string]any)
if !ok {
return ""
}
for _, prop := range props {
m, ok := prop.(map[string]any)
if !ok || m["type"] != "title" {
continue
}
return notiontext.Plain(m["title"])
}
return ""
}
func marshalAny(v any) string {
b, err := json.Marshal(v)
if err != nil {
return "{}"
}
return string(b)
}
func parseTimeMS(s string) int64 {
if s == "" {
return 0
}
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
return 0
}
return t.UnixMilli()
}
func truthy(v any) bool {
b, _ := v.(bool)
return b
}
func asSlice(v any) []any {
if s, ok := v.([]any); ok {
return s
}
return nil
}

View File

@ -0,0 +1,344 @@
package notiondesktop
import (
"context"
"database/sql"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"github.com/vincentkoc/notioncrawl/internal/notiontext"
"github.com/vincentkoc/notioncrawl/internal/store"
_ "modernc.org/sqlite"
)
const SourceName = "desktop"
type Source struct {
Path string
Snapshot string
Available bool
SizeBytes int64
}
type Summary struct {
Source Source
Spaces int
Users int
Pages int
Blocks int
Collections int
Comments int
RawRecords int
}
func Inspect(path string) (Source, error) {
info, err := os.Stat(path)
if err != nil {
return Source{Path: path, Available: false}, nil
}
if info.IsDir() {
return Source{Path: path, Available: false}, fmt.Errorf("desktop path is a directory, expected notion.db: %s", path)
}
return Source{Path: path, Available: true, SizeBytes: info.Size()}, nil
}
func Ingest(ctx context.Context, st *store.Store, path, cacheDir string) (Summary, error) {
source, err := Inspect(path)
if err != nil || !source.Available {
return Summary{Source: source}, err
}
snapshot, err := snapshotDB(path, cacheDir)
if err != nil {
return Summary{Source: source}, err
}
source.Snapshot = snapshot
db, err := sql.Open("sqlite", snapshot)
if err != nil {
return Summary{Source: source}, err
}
defer db.Close()
s := Summary{Source: source}
if s.Spaces, err = ingestSpaces(ctx, st, db); err != nil {
return s, err
}
if s.Users, err = ingestUsers(ctx, st, db); err != nil {
return s, err
}
if s.Collections, err = ingestCollections(ctx, st, db); err != nil {
return s, err
}
if s.Pages, s.Blocks, s.RawRecords, err = ingestBlocks(ctx, st, db); err != nil {
return s, err
}
if s.Comments, err = ingestComments(ctx, st, db); err != nil {
return s, err
}
if err := st.SetSyncState(ctx, SourceName, "desktop", "notion.db", snapshot); err != nil {
return s, err
}
return s, nil
}
func snapshotDB(path, cacheDir string) (string, error) {
if err := os.MkdirAll(cacheDir, 0o755); err != nil {
return "", err
}
in, err := os.Open(path)
if err != nil {
return "", err
}
defer in.Close()
outPath := filepath.Join(cacheDir, fmt.Sprintf("notion-desktop-%d.db", time.Now().UnixMilli()))
out, err := os.OpenFile(outPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600)
if err != nil {
return "", err
}
defer out.Close()
if _, err := io.Copy(out, in); err != nil {
return "", err
}
return outPath, nil
}
func ingestSpaces(ctx context.Context, st *store.Store, db *sql.DB) (int, error) {
rows, err := db.QueryContext(ctx, `select id, coalesce(name, ''), coalesce(json_object(
'id', id, 'name', name, 'pages', pages, 'settings', settings, 'created_time', created_time, 'last_edited_time', last_edited_time
), '{}') from space`)
if err != nil {
return 0, ignoreMissingTable(err)
}
defer rows.Close()
n := 0
for rows.Next() {
var id, name, raw string
if err := rows.Scan(&id, &name, &raw); err != nil {
return n, err
}
if name == "" {
name = id
}
if err := st.UpsertSpace(ctx, store.Space{ID: id, Name: name, RawJSON: raw, Source: SourceName, SyncedAt: store.NowMS()}); err != nil {
return n, err
}
n++
}
return n, rows.Err()
}
func ingestUsers(ctx context.Context, st *store.Store, db *sql.DB) (int, error) {
rows, err := db.QueryContext(ctx, `select id, coalesce(name, ''), coalesce(email, ''), coalesce(json_object(
'id', id, 'name', name, 'email', email, 'given_name', given_name, 'family_name', family_name, 'profile_photo', profile_photo
), '{}') from notion_user`)
if err != nil {
return 0, ignoreMissingTable(err)
}
defer rows.Close()
n := 0
for rows.Next() {
var id, name, email, raw string
if err := rows.Scan(&id, &name, &email, &raw); err != nil {
return n, err
}
if err := st.UpsertUser(ctx, store.User{ID: id, Name: name, Email: email, RawJSON: raw, Source: SourceName, SyncedAt: store.NowMS()}); err != nil {
return n, err
}
n++
}
return n, rows.Err()
}
func ingestCollections(ctx context.Context, st *store.Store, db *sql.DB) (int, error) {
rows, err := db.QueryContext(ctx, `select id, space_id, parent_id, coalesce(name, ''), coalesce(schema, ''), coalesce(format, ''),
coalesce(json_object('id', id, 'space_id', space_id, 'parent_id', parent_id, 'name', name, 'schema', schema, 'format', format), '{}')
from collection where alive = 1`)
if err != nil {
return 0, ignoreMissingTable(err)
}
defer rows.Close()
n := 0
for rows.Next() {
var x store.Collection
if err := rows.Scan(&x.ID, &x.SpaceID, &x.ParentID, &x.Name, &x.SchemaJSON, &x.FormatJSON, &x.RawJSON); err != nil {
return n, err
}
x.Name = notiontext.TitleFromProperties(x.Name)
if x.Name == "" {
x.Name = x.ID
}
x.Source = SourceName
x.SyncedAt = store.NowMS()
if err := st.UpsertCollection(ctx, x); err != nil {
return n, err
}
n++
}
return n, rows.Err()
}
type localBlock struct {
ID string
SpaceID string
Type string
PropertiesJSON string
ContentJSON string
CollectionID string
CreatedTime int64
LastEditedTime int64
ParentID string
ParentTable string
Alive bool
FormatJSON string
RawJSON string
}
func ingestBlocks(ctx context.Context, st *store.Store, db *sql.DB) (pages int, blocks int, rawRecords int, err error) {
rows, err := db.QueryContext(ctx, `select id, space_id, type, coalesce(properties, ''), coalesce(content, ''),
coalesce(collection_id, ''), coalesce(cast(created_time as integer), 0), coalesce(cast(last_edited_time as integer), 0),
coalesce(parent_id, ''), coalesce(parent_table, ''), alive, coalesce(format, ''),
coalesce(json_object('id', id, 'space_id', space_id, 'type', type, 'properties', properties, 'content', content,
'collection_id', collection_id, 'created_time', created_time, 'last_edited_time', last_edited_time,
'parent_id', parent_id, 'parent_table', parent_table, 'alive', alive, 'format', format), '{}')
from block`)
if err != nil {
return 0, 0, 0, ignoreMissingTable(err)
}
defer rows.Close()
byID := map[string]localBlock{}
var all []localBlock
for rows.Next() {
var b localBlock
var alive int
if err := rows.Scan(&b.ID, &b.SpaceID, &b.Type, &b.PropertiesJSON, &b.ContentJSON, &b.CollectionID, &b.CreatedTime,
&b.LastEditedTime, &b.ParentID, &b.ParentTable, &alive, &b.FormatJSON, &b.RawJSON); err != nil {
return pages, blocks, rawRecords, err
}
b.Alive = alive != 0
byID[b.ID] = b
all = append(all, b)
}
if err := rows.Err(); err != nil {
return 0, 0, 0, err
}
pageFor := func(id string) string { return "" }
var resolve func(string, map[string]bool) string
resolve = func(id string, seen map[string]bool) string {
if id == "" || seen[id] {
return ""
}
seen[id] = true
b, ok := byID[id]
if !ok {
return ""
}
if isPageType(b.Type) {
return b.ID
}
if b.ParentTable == "block" {
return resolve(b.ParentID, seen)
}
return ""
}
pageFor = func(id string) string { return resolve(id, map[string]bool{}) }
for _, b := range all {
title := notiontext.TitleFromProperties(b.PropertiesJSON)
if title == "" && isPageType(b.Type) {
title = "Untitled"
}
if isPageType(b.Type) {
if err := st.UpsertPage(ctx, store.Page{
ID: b.ID,
SpaceID: b.SpaceID,
ParentID: b.ParentID,
ParentTable: b.ParentTable,
CollectionID: b.CollectionID,
Title: title,
PropertiesJSON: b.PropertiesJSON,
CreatedTime: b.CreatedTime,
LastEditedTime: b.LastEditedTime,
Alive: b.Alive,
Source: SourceName,
RawJSON: b.RawJSON,
SyncedAt: store.NowMS(),
}); err != nil {
return pages, blocks, rawRecords, err
}
pages++
}
pageID := pageFor(b.ID)
text := notiontext.PlainFromJSON(b.PropertiesJSON)
if err := st.UpsertBlock(ctx, store.Block{
ID: b.ID,
PageID: pageID,
SpaceID: b.SpaceID,
ParentID: b.ParentID,
ParentTable: b.ParentTable,
Type: b.Type,
Text: text,
PropertiesJSON: b.PropertiesJSON,
ContentJSON: b.ContentJSON,
FormatJSON: b.FormatJSON,
CreatedTime: b.CreatedTime,
LastEditedTime: b.LastEditedTime,
Alive: b.Alive,
Source: SourceName,
RawJSON: b.RawJSON,
SyncedAt: store.NowMS(),
}); err != nil {
return pages, blocks, rawRecords, err
}
blocks++
if err := st.UpsertRawRecord(ctx, store.RawRecord{
Source: SourceName, RecordTable: "block", RecordID: b.ID, ParentID: b.ParentID,
SpaceID: b.SpaceID, RawJSON: b.RawJSON, SyncedAt: store.NowMS(),
}); err != nil {
return pages, blocks, rawRecords, err
}
rawRecords++
}
return pages, blocks, rawRecords, nil
}
func ingestComments(ctx context.Context, st *store.Store, db *sql.DB) (int, error) {
rows, err := db.QueryContext(ctx, `select id, parent_id, space_id, coalesce(text, ''), coalesce(created_by_id, ''),
coalesce(cast(created_time as integer), 0), coalesce(cast(last_edited_time as integer), 0), alive,
coalesce(json_object('id', id, 'parent_id', parent_id, 'space_id', space_id, 'text', text, 'content', content,
'created_by_id', created_by_id, 'created_time', created_time, 'last_edited_time', last_edited_time, 'alive', alive), '{}')
from comment`)
if err != nil {
return 0, ignoreMissingTable(err)
}
defer rows.Close()
n := 0
for rows.Next() {
var c store.Comment
var alive int
if err := rows.Scan(&c.ID, &c.ParentID, &c.SpaceID, &c.Text, &c.CreatedByID, &c.CreatedTime, &c.LastEditedTime, &alive, &c.RawJSON); err != nil {
return n, err
}
c.PageID = c.ParentID
c.Text = notiontext.PlainFromJSON(c.Text)
c.Alive = alive != 0
c.Source = SourceName
c.SyncedAt = store.NowMS()
if err := st.UpsertComment(ctx, c); err != nil {
return n, err
}
n++
}
return n, rows.Err()
}
func isPageType(t string) bool {
return t == "page" || t == "collection_view_page" || t == "external_object_instance_page"
}
func ignoreMissingTable(err error) error {
if err != nil && strings.Contains(err.Error(), "no such table") {
return nil
}
return err
}

131
internal/notiontext/text.go Normal file
View File

@ -0,0 +1,131 @@
package notiontext
import (
"encoding/json"
"fmt"
"regexp"
"strings"
)
var spaceRE = regexp.MustCompile(`\s+`)
func Normalize(s string) string {
return strings.TrimSpace(spaceRE.ReplaceAllString(s, " "))
}
func PlainFromJSON(raw string) string {
if strings.TrimSpace(raw) == "" {
return ""
}
var v any
if err := json.Unmarshal([]byte(raw), &v); err != nil {
return Normalize(raw)
}
return Plain(v)
}
func Plain(v any) string {
var parts []string
walk(v, &parts)
return Normalize(strings.Join(parts, " "))
}
func TitleFromProperties(raw string) string {
if strings.TrimSpace(raw) == "" {
return ""
}
var v any
if err := json.Unmarshal([]byte(raw), &v); err != nil {
return ""
}
if m, ok := v.(map[string]any); ok {
for _, key := range []string{"title", "Name", "name"} {
if text := Plain(m[key]); text != "" {
return text
}
}
for _, value := range m {
if text := Plain(value); text != "" {
return text
}
}
}
return Plain(v)
}
func MarkdownEscape(s string) string {
s = strings.ReplaceAll(s, "\r\n", "\n")
return strings.TrimRight(s, " \n")
}
func ShortID(id string) string {
clean := strings.ReplaceAll(id, "-", "")
if len(clean) > 8 {
return clean[:8]
}
if clean == "" {
return "unknown"
}
return clean
}
func Slug(s string) string {
s = strings.ToLower(Normalize(s))
var b strings.Builder
lastDash := false
for _, r := range s {
switch {
case r >= 'a' && r <= 'z', r >= '0' && r <= '9':
b.WriteRune(r)
lastDash = false
case r == '-' || r == '_' || r == ' ' || r == '/' || r == '.':
if !lastDash {
b.WriteByte('-')
lastDash = true
}
}
}
out := strings.Trim(b.String(), "-")
if out == "" {
return "untitled"
}
return out
}
func MarshalRaw(m map[string]any) string {
b, err := json.Marshal(m)
if err != nil {
return fmt.Sprintf(`{"marshal_error":%q}`, err.Error())
}
return string(b)
}
func walk(v any, parts *[]string) {
switch x := v.(type) {
case nil:
return
case string:
if x != "" {
*parts = append(*parts, x)
}
case []any:
for _, item := range x {
walk(item, parts)
}
case map[string]any:
for _, key := range []string{"plain_text", "content", "text", "name", "title"} {
if value, ok := x[key]; ok {
walk(value, parts)
}
}
if rt, ok := x["rich_text"]; ok {
walk(rt, parts)
}
if title, ok := x["title"]; ok {
walk(title, parts)
}
if text, ok := x["text"].(map[string]any); ok {
walk(text["content"], parts)
}
}
}

View File

@ -0,0 +1,17 @@
package notiontext
import "testing"
func TestTitleFromProperties(t *testing.T) {
got := TitleFromProperties(`{"title":[["Launch Plan"]]}`)
if got != "Launch Plan" {
t.Fatalf("got %q", got)
}
}
func TestSlug(t *testing.T) {
got := Slug("Launch Plan / Q2")
if got != "launch-plan-q2" {
t.Fatalf("got %q", got)
}
}