diff --git a/internal/store/clusters.go b/internal/store/clusters.go index 4cfb38e..ab62f96 100644 --- a/internal/store/clusters.go +++ b/internal/store/clusters.go @@ -464,10 +464,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO args = append(args, limit) rows, err := s.db.QueryContext(ctx, ` select cm.role, cm.state, cm.score_to_representative, - t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type, - t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft, - t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh, - t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local + `+s.threadSelectColumns(ctx, "t")+` from cluster_memberships cm join cluster_groups cg on cg.id = cm.cluster_id join threads t on t.id = cm.thread_id @@ -526,10 +523,7 @@ func (s *Store) RunClusterDetail(ctx context.Context, options ClusterDetailOptio select case when t.id = c.representative_thread_id then 'representative' else 'member' end as role, 'active' as state, cm.score_to_representative, - t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type, - t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft, - t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh, - t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local + `+s.threadSelectColumns(ctx, "t")+` from cluster_members cm join clusters c on c.id = cm.cluster_id and c.cluster_run_id = ? join threads t on t.id = cm.thread_id @@ -1167,6 +1161,9 @@ func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, } func (s *Store) latestRawClusterRunID(ctx context.Context, repoID int64) (int64, bool, error) { + if !s.hasTable(ctx, "cluster_runs") || !s.hasTable(ctx, "clusters") { + return 0, false, nil + } row := s.db.QueryRowContext(ctx, ` select cr.id from cluster_runs cr @@ -1292,23 +1289,46 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma placeholders = append(placeholders, "?") args = append(args, threadID) } - rows, err := s.db.QueryContext(ctx, ` - select thread_id, summary_kind, summary_text - from document_summaries - where thread_id in (`+strings.Join(placeholders, ",")+`) - order by thread_id, summary_kind, updated_at desc - `, args...) - if err != nil { - return nil, fmt.Errorf("select document summaries: %w", err) - } - defer rows.Close() - out := make(map[int64]map[string]string) + if s.hasTable(ctx, "document_summaries") { + rows, err := s.db.QueryContext(ctx, ` + select thread_id, summary_kind, summary_text + from document_summaries + where thread_id in (`+strings.Join(placeholders, ",")+`) + order by thread_id, summary_kind, updated_at desc + `, args...) + if err != nil { + return nil, fmt.Errorf("select document summaries: %w", err) + } + if err := scanSummaryRows(rows, out, "document summary"); err != nil { + return nil, err + } + } + if s.hasTable(ctx, "thread_key_summaries") && s.hasTable(ctx, "thread_revisions") { + rows, err := s.db.QueryContext(ctx, ` + select tr.thread_id, tks.summary_kind, tks.key_text + from thread_key_summaries tks + join thread_revisions tr on tr.id = tks.thread_revision_id + where tr.thread_id in (`+strings.Join(placeholders, ",")+`) + order by tr.thread_id, tks.summary_kind, tks.created_at desc + `, args...) + if err != nil { + return nil, fmt.Errorf("select thread key summaries: %w", err) + } + if err := scanSummaryRows(rows, out, "thread key summary"); err != nil { + return nil, err + } + } + return out, nil +} + +func scanSummaryRows(rows *sql.Rows, out map[int64]map[string]string, source string) error { + defer rows.Close() for rows.Next() { var threadID int64 var kind, text string if err := rows.Scan(&threadID, &kind, &text); err != nil { - return nil, fmt.Errorf("scan document summary: %w", err) + return fmt.Errorf("scan %s: %w", source, err) } if out[threadID] == nil { out[threadID] = map[string]string{} @@ -1318,9 +1338,9 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma } } if err := rows.Err(); err != nil { - return nil, fmt.Errorf("iterate document summaries: %w", err) + return fmt.Errorf("iterate %s rows: %w", source, err) } - return out, nil + return nil } func snippetRunes(value string, limit int) string { diff --git a/internal/store/portable.go b/internal/store/portable.go index e615ad8..54b22bf 100644 --- a/internal/store/portable.go +++ b/internal/store/portable.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" "os" + "strings" + "time" ) type PortablePruneOptions struct { @@ -13,16 +15,18 @@ type PortablePruneOptions struct { } type PortablePruneStats struct { - DBPath string `json:"db_path"` - BodyChars int `json:"body_chars"` - BytesBefore int64 `json:"bytes_before"` - BytesAfter int64 `json:"bytes_after"` - ThreadsPruned int64 `json:"threads_pruned"` - RepositoriesPruned int64 `json:"repositories_pruned"` - FingerprintsPruned int64 `json:"fingerprints_pruned"` - DocumentsDeleted int64 `json:"documents_deleted"` - DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"` - Vacuumed bool `json:"vacuumed"` + DBPath string `json:"db_path"` + BodyChars int `json:"body_chars"` + BytesBefore int64 `json:"bytes_before"` + BytesAfter int64 `json:"bytes_after"` + ThreadsPruned int64 `json:"threads_pruned"` + RepositoriesPruned int64 `json:"repositories_pruned"` + FingerprintsPruned int64 `json:"fingerprints_pruned"` + DocumentsDeleted int64 `json:"documents_deleted"` + DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"` + DroppedTables []string `json:"dropped_tables,omitempty"` + DroppedColumns []string `json:"dropped_columns,omitempty"` + Vacuumed bool `json:"vacuumed"` } func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePruneOptions) (PortablePruneStats, error) { @@ -107,6 +111,9 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune } stats.DocumentsFTSRebuilt = true } + if err := s.canonicalizePortableSchema(ctx, options.BodyChars, &stats); err != nil { + return stats, err + } if options.Vacuum { if _, err := s.db.ExecContext(ctx, `pragma wal_checkpoint(TRUNCATE)`); err != nil { return stats, fmt.Errorf("checkpoint wal: %w", err) @@ -121,6 +128,110 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune return stats, nil } +func (s *Store) canonicalizePortableSchema(ctx context.Context, bodyChars int, stats *PortablePruneStats) error { + if s.hasColumn(ctx, "threads", "body") && !s.hasColumn(ctx, "threads", "body_excerpt") { + if _, err := s.db.ExecContext(ctx, `alter table threads add column body_excerpt text`); err != nil { + return fmt.Errorf("add portable threads.body_excerpt: %w", err) + } + if _, err := s.db.ExecContext(ctx, ` + update threads + set body_excerpt = case when length(body) > ? then substr(body, 1, ?) else body end + where body is not null + `, bodyChars, bodyChars); err != nil { + return fmt.Errorf("backfill portable body excerpts: %w", err) + } + } + if !s.hasColumn(ctx, "threads", "body_length") { + if _, err := s.db.ExecContext(ctx, `alter table threads add column body_length integer not null default 0`); err != nil { + return fmt.Errorf("add portable threads.body_length: %w", err) + } + } + for _, column := range []struct { + table string + name string + }{ + {table: "repositories", name: "raw_json"}, + {table: "threads", name: "raw_json"}, + {table: "threads", name: "body"}, + } { + if !s.hasColumn(ctx, column.table, column.name) { + continue + } + if _, err := s.db.ExecContext(ctx, `alter table `+sqliteIdentifier(column.table)+` drop column `+sqliteIdentifier(column.name)); err != nil { + return fmt.Errorf("drop portable column %s.%s: %w", column.table, column.name, err) + } + stats.DroppedColumns = append(stats.DroppedColumns, column.table+"."+column.name) + } + for _, table := range canonicalPortableDroppedTables() { + if !s.tableExists(ctx, table) { + continue + } + if _, err := s.db.ExecContext(ctx, `drop table if exists `+sqliteIdentifier(table)); err != nil { + return fmt.Errorf("drop portable table %s: %w", table, err) + } + stats.DroppedTables = append(stats.DroppedTables, table) + } + if _, err := s.db.ExecContext(ctx, ` + create table if not exists portable_metadata ( + key text primary key, + value text not null + ) + `); err != nil { + return fmt.Errorf("ensure portable metadata: %w", err) + } + metadata := map[string]string{ + "schema": "ghcrawl-portable-sync-v1", + "body_chars": fmt.Sprintf("%d", bodyChars), + "excluded": "raw_json,comments,documents,fts,vectors,code_snapshots,cluster_events,run_history,similarity_edges,blobs", + "exported_at": time.Now().UTC().Format(timeLayout), + "source_path": s.path, + } + for key, value := range metadata { + if _, err := s.db.ExecContext(ctx, ` + insert into portable_metadata(key, value) + values(?, ?) + on conflict(key) do update set value = excluded.value + `, key, value); err != nil { + return fmt.Errorf("write portable metadata %s: %w", key, err) + } + } + return nil +} + +func canonicalPortableDroppedTables() []string { + return []string{ + "documents_fts", + "documents_fts_config", + "documents_fts_data", + "documents_fts_docsize", + "documents_fts_idx", + "comments", + "documents", + "document_embeddings", + "document_summaries", + "thread_vectors", + "thread_code_snapshots", + "thread_changed_files", + "thread_hunk_signatures", + "cluster_events", + "cluster_members", + "clusters", + "sync_runs", + "summary_runs", + "embedding_runs", + "cluster_runs", + "similarity_edges", + "blobs", + } +} + +func sqliteIdentifier(value string) string { + if value == "" || strings.ContainsAny(value, "\"\x00") { + panic(fmt.Sprintf("unsafe SQLite identifier: %q", value)) + } + return `"` + value + `"` +} + func (s *Store) tableExists(ctx context.Context, table string) bool { var name string err := s.db.QueryRowContext(ctx, `select name from sqlite_master where type in ('table', 'virtual table') and name = ?`, table).Scan(&name) diff --git a/internal/store/repositories.go b/internal/store/repositories.go index 499e403..afe1b42 100644 --- a/internal/store/repositories.go +++ b/internal/store/repositories.go @@ -43,7 +43,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo var githubRepoID sql.NullString var rawJSON sql.NullString err := s.q().QueryRowContext(ctx, ` - select id, owner, name, full_name, github_repo_id, raw_json, updated_at + select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at from repositories where full_name = ? `, fullName).Scan(&repo.ID, &repo.Owner, &repo.Name, &repo.FullName, &githubRepoID, &rawJSON, &repo.UpdatedAt) @@ -57,7 +57,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) { rows, err := s.db.QueryContext(ctx, ` - select id, owner, name, full_name, github_repo_id, raw_json, updated_at + select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at from repositories order by coalesce(updated_at, '') desc, id desc `) @@ -84,6 +84,13 @@ func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) { return repos, nil } +func (s *Store) repositoryRawJSONExpr(ctx context.Context) string { + if s.hasColumn(ctx, "repositories", "raw_json") { + return "raw_json" + } + return "''" +} + func nullString(value string) sql.NullString { return sql.NullString{String: value, Valid: value != ""} } diff --git a/internal/store/search.go b/internal/store/search.go index d9d1e7b..1b5f232 100644 --- a/internal/store/search.go +++ b/internal/store/search.go @@ -71,14 +71,15 @@ func (s *Store) searchThreads(ctx context.Context, repoID int64, query string, l return nil, nil } pattern := "%" + escapeLike(needle) + "%" + bodyExpr := s.threadBodyExpr(ctx, "") rows, err := s.db.QueryContext(ctx, ` select id, number, kind, state, title, html_url, author_login, - coalesce(nullif(body, ''), title) + coalesce(nullif(`+bodyExpr+`, ''), title) from threads where repo_id = ? and ( lower(title) like ? escape '\' - or lower(coalesce(body, '')) like ? escape '\' + or lower(coalesce(`+bodyExpr+`, '')) like ? escape '\' ) order by coalesce(updated_at_gh, updated_at) desc, number desc limit ? diff --git a/internal/store/store.go b/internal/store/store.go index e7e8bdd..5267c16 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -155,8 +155,24 @@ func (s *Store) Status(ctx context.Context) (Status, error) { return Status{}, fmt.Errorf("count clusters: %w", err) } var lastSync string - if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil { - return Status{}, fmt.Errorf("read last sync: %w", err) + if s.hasTable(ctx, "sync_runs") { + if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil { + return Status{}, fmt.Errorf("read last sync: %w", err) + } + } else if s.hasTable(ctx, "repo_sync_state") { + if err := s.db.QueryRowContext(ctx, ` + select coalesce( + max(last_open_close_reconciled_at), + max(last_overlapping_open_scan_completed_at), + max(last_non_overlapping_scan_completed_at), + max(last_full_open_scan_started_at), + max(updated_at), + '' + ) + from repo_sync_state + `).Scan(&lastSync); err != nil { + return Status{}, fmt.Errorf("read portable sync state: %w", err) + } } if lastSync != "" { parsed, err := time.Parse(timeLayout, lastSync) @@ -206,6 +222,12 @@ func (s *Store) ensureLegacyPortableColumns(ctx context.Context) error { return nil } +func (s *Store) hasTable(ctx context.Context, table string) bool { + var name string + err := s.db.QueryRowContext(ctx, `select name from sqlite_schema where type in ('table', 'virtual table') and name = ?`, table).Scan(&name) + return err == nil +} + func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error { if s.hasColumn(ctx, table, column) { return nil diff --git a/internal/store/store_test.go b/internal/store/store_test.go index b1c223b..3d76520 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -90,6 +90,132 @@ func TestOpenReadOnlyDoesNotMutateStore(t *testing.T) { } } +func TestOpenReadOnlySupportsCanonicalPortableStore(t *testing.T) { + ctx := context.Background() + dbPath := filepath.Join(t.TempDir(), "portable.db") + db, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatalf("open seed db: %v", err) + } + _, err = db.ExecContext(ctx, ` + create table repositories ( + id integer primary key, + owner text not null, + name text not null, + full_name text not null, + github_repo_id text, + updated_at text not null + ); + create table threads ( + id integer primary key, + repo_id integer not null, + github_id text not null, + number integer not null, + kind text not null, + state text not null, + title text not null, + body_excerpt text, + body_length integer not null default 0, + author_login text, + author_type text, + html_url text not null, + labels_json text not null, + assignees_json text not null, + content_hash text not null, + is_draft integer not null default 0, + created_at_gh text, + updated_at_gh text, + closed_at_gh text, + merged_at_gh text, + first_pulled_at text, + last_pulled_at text, + updated_at text not null, + closed_at_local text, + close_reason_local text + ); + create table repo_sync_state ( + repo_id integer primary key, + last_full_open_scan_started_at text, + last_overlapping_open_scan_completed_at text, + last_non_overlapping_scan_completed_at text, + last_open_close_reconciled_at text, + updated_at text not null + ); + create table cluster_groups ( + id integer primary key, + repo_id integer not null, + stable_key text not null, + stable_slug text not null, + status text not null, + cluster_type text not null, + representative_thread_id integer, + title text, + created_at text not null, + updated_at text not null, + closed_at text + ); + insert into repositories(id, owner, name, full_name, updated_at) + values(1, 'openclaw', 'openclaw', 'openclaw/openclaw', '2026-04-28T00:00:00Z'); + insert into threads(id, repo_id, github_id, number, kind, state, title, body_excerpt, html_url, labels_json, assignees_json, content_hash, updated_at) + values(1, 1, '1', 42, 'issue', 'open', 'portable issue', 'portable body', 'https://github.com/openclaw/openclaw/issues/42', '[]', '[]', 'hash', '2026-04-28T00:00:00Z'); + insert into repo_sync_state(repo_id, last_open_close_reconciled_at, updated_at) + values(1, '2026-04-28T01:02:03Z', '2026-04-28T01:02:03Z'); + insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at) + values(1, 1, 'stable', 'stable', 'active', 'similarity', 1, 'portable cluster', '2026-04-28T00:00:00Z', '2026-04-28T00:00:00Z'); + `) + if err != nil { + t.Fatalf("seed portable db: %v", err) + } + if err := db.Close(); err != nil { + t.Fatalf("close seed db: %v", err) + } + before, err := os.ReadFile(dbPath) + if err != nil { + t.Fatalf("read db before: %v", err) + } + + st, err := OpenReadOnly(ctx, dbPath) + if err != nil { + t.Fatalf("open readonly portable: %v", err) + } + defer st.Close() + + status, err := st.Status(ctx) + if err != nil { + t.Fatalf("portable status: %v", err) + } + if status.RepositoryCount != 1 || status.ThreadCount != 1 || status.OpenThreadCount != 1 || status.ClusterCount != 1 { + t.Fatalf("unexpected portable status: %#v", status) + } + if status.LastSyncAt.IsZero() { + t.Fatalf("portable last sync was not read from repo_sync_state: %#v", status) + } + repo, err := st.RepositoryByFullName(ctx, "openclaw/openclaw") + if err != nil { + t.Fatalf("portable repository: %v", err) + } + if repo.RawJSON != "" { + t.Fatalf("portable raw json: got %q want empty", repo.RawJSON) + } + threads, err := st.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repo.ID, Numbers: []int{42}}) + if err != nil { + t.Fatalf("portable threads: %v", err) + } + if len(threads) != 1 || threads[0].Body != "portable body" || threads[0].RawJSON != "" { + t.Fatalf("unexpected portable thread: %#v", threads) + } + if err := st.Close(); err != nil { + t.Fatalf("close portable readonly: %v", err) + } + after, err := os.ReadFile(dbPath) + if err != nil { + t.Fatalf("read db after: %v", err) + } + if !bytes.Equal(after, before) { + t.Fatal("readonly portable open mutated database bytes") + } +} + func TestOpenMigratesPortableStoreColumns(t *testing.T) { ctx := context.Background() dbPath := filepath.Join(t.TempDir(), "portable.db") @@ -252,21 +378,26 @@ func TestPrunePortablePayloads(t *testing.T) { t.Fatalf("unexpected stats: %#v", stats) } - var repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features string - var documentCount int - if err := st.DB().QueryRowContext(ctx, `select raw_json from repositories where id = 1`).Scan(&repoRaw); err != nil { - t.Fatalf("repo raw: %v", err) + var bodyExcerpt, titleTokens, linkedRefs, buckets, features string + if st.hasColumn(ctx, "repositories", "raw_json") { + t.Fatal("repositories.raw_json was not dropped") } - if err := st.DB().QueryRowContext(ctx, `select body, raw_json from threads where id = 1`).Scan(&body, &threadRaw); err != nil { - t.Fatalf("thread payload: %v", err) + if st.hasColumn(ctx, "threads", "raw_json") { + t.Fatal("threads.raw_json was not dropped") + } + if st.hasColumn(ctx, "threads", "body") { + t.Fatal("threads.body was not dropped") + } + if err := st.DB().QueryRowContext(ctx, `select body_excerpt from threads where id = 1`).Scan(&bodyExcerpt); err != nil { + t.Fatalf("thread body excerpt: %v", err) } if err := st.DB().QueryRowContext(ctx, `select title_tokens_json, linked_refs_json, module_buckets_json, feature_json from thread_fingerprints where id = 1`).Scan(&titleTokens, &linkedRefs, &buckets, &features); err != nil { t.Fatalf("fingerprint payload: %v", err) } - if err := st.DB().QueryRowContext(ctx, `select count(*) from documents`).Scan(&documentCount); err != nil { - t.Fatalf("document count: %v", err) + if st.tableExists(ctx, "documents") { + t.Fatal("documents table was not dropped") } - if repoRaw != "" || body != "abcdefgh" || threadRaw != "" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" || documentCount != 0 { - t.Fatalf("payloads not pruned: repoRaw=%q body=%q threadRaw=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q documents=%d", repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features, documentCount) + if bodyExcerpt != "abcdefgh" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" { + t.Fatalf("payloads not pruned: bodyExcerpt=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q", bodyExcerpt, titleTokens, linkedRefs, buckets, features) } } diff --git a/internal/store/threads.go b/internal/store/threads.go index 87fbaef..d668d16 100644 --- a/internal/store/threads.go +++ b/internal/store/threads.go @@ -108,10 +108,7 @@ func (s *Store) ListThreadsFiltered(ctx context.Context, options ThreadListOptio args = append(args, options.Limit) } rows, err := s.q().QueryContext(ctx, ` - select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type, - html_url, labels_json, assignees_json, raw_json, content_hash, is_draft, - created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh, - first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local + select `+s.threadSelectColumns(ctx, "")+` from threads where `+where+` order by number`+limitSQL, args...) @@ -210,6 +207,66 @@ func scanThread(rows interface { return thread, nil } +func (s *Store) threadSelectColumns(ctx context.Context, alias string) string { + column := func(name string) string { + if alias == "" { + return name + } + return alias + "." + name + } + return strings.Join([]string{ + column("id"), + column("repo_id"), + column("github_id"), + column("number"), + column("kind"), + column("state"), + column("title"), + s.threadBodyExpr(ctx, alias), + column("author_login"), + column("author_type"), + column("html_url"), + column("labels_json"), + column("assignees_json"), + s.threadRawJSONExpr(ctx, alias), + column("content_hash"), + column("is_draft"), + column("created_at_gh"), + column("updated_at_gh"), + column("closed_at_gh"), + column("merged_at_gh"), + column("first_pulled_at"), + column("last_pulled_at"), + column("updated_at"), + column("closed_at_local"), + column("close_reason_local"), + }, ", ") +} + +func (s *Store) threadBodyExpr(ctx context.Context, alias string) string { + if s.hasColumn(ctx, "threads", "body") { + return qualifiedColumn(alias, "body") + } + if s.hasColumn(ctx, "threads", "body_excerpt") { + return qualifiedColumn(alias, "body_excerpt") + } + return "''" +} + +func (s *Store) threadRawJSONExpr(ctx context.Context, alias string) string { + if s.hasColumn(ctx, "threads", "raw_json") { + return qualifiedColumn(alias, "raw_json") + } + return "''" +} + +func qualifiedColumn(alias, name string) string { + if alias == "" { + return name + } + return alias + "." + name +} + func boolInt(value bool) int { if value { return 1 diff --git a/internal/store/vectors.go b/internal/store/vectors.go index d13b3f6..5b9dd1c 100644 --- a/internal/store/vectors.go +++ b/internal/store/vectors.go @@ -63,6 +63,9 @@ func (s *Store) ListThreadVectors(ctx context.Context, repoID int64) ([]ThreadVe } func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVectorQuery) ([]ThreadVector, error) { + if !s.hasTable(ctx, "thread_vectors") { + return []ThreadVector{}, nil + } where, args := threadVectorWhere(query) rows, err := s.db.QueryContext(ctx, ` select tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at @@ -96,13 +99,13 @@ func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVecto } func (s *Store) ThreadVectorByNumber(ctx context.Context, query ThreadVectorQuery, number int) (Thread, ThreadVector, error) { + if !s.hasTable(ctx, "thread_vectors") { + return Thread{}, ThreadVector{}, fmt.Errorf("thread #%d was not found with an embedding", number) + } where, args := threadVectorWhere(query) args = append(args, number) row := s.db.QueryRowContext(ctx, ` - select t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type, - t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft, - t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh, - t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local, + select `+s.threadSelectColumns(ctx, "t")+`, tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at from threads t join thread_vectors tv on tv.thread_id = t.id @@ -131,10 +134,7 @@ func (s *Store) ThreadsByIDs(ctx context.Context, repoID int64, ids []int64) (ma args = append(args, id) } rows, err := s.db.QueryContext(ctx, ` - select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type, - html_url, labels_json, assignees_json, raw_json, content_hash, is_draft, - created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh, - first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local + select `+s.threadSelectColumns(ctx, "")+` from threads where repo_id = ? and id in (`+strings.Join(placeholders, ",")+`) `, args...)