fix(store): support canonical portable snapshots
This commit is contained in:
parent
67861b5d32
commit
85ce5ced12
@ -464,10 +464,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO
|
||||
args = append(args, limit)
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select cm.role, cm.state, cm.score_to_representative,
|
||||
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
|
||||
`+s.threadSelectColumns(ctx, "t")+`
|
||||
from cluster_memberships cm
|
||||
join cluster_groups cg on cg.id = cm.cluster_id
|
||||
join threads t on t.id = cm.thread_id
|
||||
@ -526,10 +523,7 @@ func (s *Store) RunClusterDetail(ctx context.Context, options ClusterDetailOptio
|
||||
select case when t.id = c.representative_thread_id then 'representative' else 'member' end as role,
|
||||
'active' as state,
|
||||
cm.score_to_representative,
|
||||
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
|
||||
`+s.threadSelectColumns(ctx, "t")+`
|
||||
from cluster_members cm
|
||||
join clusters c on c.id = cm.cluster_id and c.cluster_run_id = ?
|
||||
join threads t on t.id = cm.thread_id
|
||||
@ -1167,6 +1161,9 @@ func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64,
|
||||
}
|
||||
|
||||
func (s *Store) latestRawClusterRunID(ctx context.Context, repoID int64) (int64, bool, error) {
|
||||
if !s.hasTable(ctx, "cluster_runs") || !s.hasTable(ctx, "clusters") {
|
||||
return 0, false, nil
|
||||
}
|
||||
row := s.db.QueryRowContext(ctx, `
|
||||
select cr.id
|
||||
from cluster_runs cr
|
||||
@ -1292,23 +1289,46 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
|
||||
placeholders = append(placeholders, "?")
|
||||
args = append(args, threadID)
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select thread_id, summary_kind, summary_text
|
||||
from document_summaries
|
||||
where thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by thread_id, summary_kind, updated_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select document summaries: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make(map[int64]map[string]string)
|
||||
if s.hasTable(ctx, "document_summaries") {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select thread_id, summary_kind, summary_text
|
||||
from document_summaries
|
||||
where thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by thread_id, summary_kind, updated_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select document summaries: %w", err)
|
||||
}
|
||||
if err := scanSummaryRows(rows, out, "document summary"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if s.hasTable(ctx, "thread_key_summaries") && s.hasTable(ctx, "thread_revisions") {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select tr.thread_id, tks.summary_kind, tks.key_text
|
||||
from thread_key_summaries tks
|
||||
join thread_revisions tr on tr.id = tks.thread_revision_id
|
||||
where tr.thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by tr.thread_id, tks.summary_kind, tks.created_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select thread key summaries: %w", err)
|
||||
}
|
||||
if err := scanSummaryRows(rows, out, "thread key summary"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func scanSummaryRows(rows *sql.Rows, out map[int64]map[string]string, source string) error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var threadID int64
|
||||
var kind, text string
|
||||
if err := rows.Scan(&threadID, &kind, &text); err != nil {
|
||||
return nil, fmt.Errorf("scan document summary: %w", err)
|
||||
return fmt.Errorf("scan %s: %w", source, err)
|
||||
}
|
||||
if out[threadID] == nil {
|
||||
out[threadID] = map[string]string{}
|
||||
@ -1318,9 +1338,9 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate document summaries: %w", err)
|
||||
return fmt.Errorf("iterate %s rows: %w", source, err)
|
||||
}
|
||||
return out, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func snippetRunes(value string, limit int) string {
|
||||
|
||||
@ -5,6 +5,8 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PortablePruneOptions struct {
|
||||
@ -13,16 +15,18 @@ type PortablePruneOptions struct {
|
||||
}
|
||||
|
||||
type PortablePruneStats struct {
|
||||
DBPath string `json:"db_path"`
|
||||
BodyChars int `json:"body_chars"`
|
||||
BytesBefore int64 `json:"bytes_before"`
|
||||
BytesAfter int64 `json:"bytes_after"`
|
||||
ThreadsPruned int64 `json:"threads_pruned"`
|
||||
RepositoriesPruned int64 `json:"repositories_pruned"`
|
||||
FingerprintsPruned int64 `json:"fingerprints_pruned"`
|
||||
DocumentsDeleted int64 `json:"documents_deleted"`
|
||||
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
|
||||
Vacuumed bool `json:"vacuumed"`
|
||||
DBPath string `json:"db_path"`
|
||||
BodyChars int `json:"body_chars"`
|
||||
BytesBefore int64 `json:"bytes_before"`
|
||||
BytesAfter int64 `json:"bytes_after"`
|
||||
ThreadsPruned int64 `json:"threads_pruned"`
|
||||
RepositoriesPruned int64 `json:"repositories_pruned"`
|
||||
FingerprintsPruned int64 `json:"fingerprints_pruned"`
|
||||
DocumentsDeleted int64 `json:"documents_deleted"`
|
||||
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
|
||||
DroppedTables []string `json:"dropped_tables,omitempty"`
|
||||
DroppedColumns []string `json:"dropped_columns,omitempty"`
|
||||
Vacuumed bool `json:"vacuumed"`
|
||||
}
|
||||
|
||||
func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePruneOptions) (PortablePruneStats, error) {
|
||||
@ -107,6 +111,9 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
|
||||
}
|
||||
stats.DocumentsFTSRebuilt = true
|
||||
}
|
||||
if err := s.canonicalizePortableSchema(ctx, options.BodyChars, &stats); err != nil {
|
||||
return stats, err
|
||||
}
|
||||
if options.Vacuum {
|
||||
if _, err := s.db.ExecContext(ctx, `pragma wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return stats, fmt.Errorf("checkpoint wal: %w", err)
|
||||
@ -121,6 +128,110 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (s *Store) canonicalizePortableSchema(ctx context.Context, bodyChars int, stats *PortablePruneStats) error {
|
||||
if s.hasColumn(ctx, "threads", "body") && !s.hasColumn(ctx, "threads", "body_excerpt") {
|
||||
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_excerpt text`); err != nil {
|
||||
return fmt.Errorf("add portable threads.body_excerpt: %w", err)
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
update threads
|
||||
set body_excerpt = case when length(body) > ? then substr(body, 1, ?) else body end
|
||||
where body is not null
|
||||
`, bodyChars, bodyChars); err != nil {
|
||||
return fmt.Errorf("backfill portable body excerpts: %w", err)
|
||||
}
|
||||
}
|
||||
if !s.hasColumn(ctx, "threads", "body_length") {
|
||||
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_length integer not null default 0`); err != nil {
|
||||
return fmt.Errorf("add portable threads.body_length: %w", err)
|
||||
}
|
||||
}
|
||||
for _, column := range []struct {
|
||||
table string
|
||||
name string
|
||||
}{
|
||||
{table: "repositories", name: "raw_json"},
|
||||
{table: "threads", name: "raw_json"},
|
||||
{table: "threads", name: "body"},
|
||||
} {
|
||||
if !s.hasColumn(ctx, column.table, column.name) {
|
||||
continue
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `alter table `+sqliteIdentifier(column.table)+` drop column `+sqliteIdentifier(column.name)); err != nil {
|
||||
return fmt.Errorf("drop portable column %s.%s: %w", column.table, column.name, err)
|
||||
}
|
||||
stats.DroppedColumns = append(stats.DroppedColumns, column.table+"."+column.name)
|
||||
}
|
||||
for _, table := range canonicalPortableDroppedTables() {
|
||||
if !s.tableExists(ctx, table) {
|
||||
continue
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `drop table if exists `+sqliteIdentifier(table)); err != nil {
|
||||
return fmt.Errorf("drop portable table %s: %w", table, err)
|
||||
}
|
||||
stats.DroppedTables = append(stats.DroppedTables, table)
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
create table if not exists portable_metadata (
|
||||
key text primary key,
|
||||
value text not null
|
||||
)
|
||||
`); err != nil {
|
||||
return fmt.Errorf("ensure portable metadata: %w", err)
|
||||
}
|
||||
metadata := map[string]string{
|
||||
"schema": "ghcrawl-portable-sync-v1",
|
||||
"body_chars": fmt.Sprintf("%d", bodyChars),
|
||||
"excluded": "raw_json,comments,documents,fts,vectors,code_snapshots,cluster_events,run_history,similarity_edges,blobs",
|
||||
"exported_at": time.Now().UTC().Format(timeLayout),
|
||||
"source_path": s.path,
|
||||
}
|
||||
for key, value := range metadata {
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
insert into portable_metadata(key, value)
|
||||
values(?, ?)
|
||||
on conflict(key) do update set value = excluded.value
|
||||
`, key, value); err != nil {
|
||||
return fmt.Errorf("write portable metadata %s: %w", key, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func canonicalPortableDroppedTables() []string {
|
||||
return []string{
|
||||
"documents_fts",
|
||||
"documents_fts_config",
|
||||
"documents_fts_data",
|
||||
"documents_fts_docsize",
|
||||
"documents_fts_idx",
|
||||
"comments",
|
||||
"documents",
|
||||
"document_embeddings",
|
||||
"document_summaries",
|
||||
"thread_vectors",
|
||||
"thread_code_snapshots",
|
||||
"thread_changed_files",
|
||||
"thread_hunk_signatures",
|
||||
"cluster_events",
|
||||
"cluster_members",
|
||||
"clusters",
|
||||
"sync_runs",
|
||||
"summary_runs",
|
||||
"embedding_runs",
|
||||
"cluster_runs",
|
||||
"similarity_edges",
|
||||
"blobs",
|
||||
}
|
||||
}
|
||||
|
||||
func sqliteIdentifier(value string) string {
|
||||
if value == "" || strings.ContainsAny(value, "\"\x00") {
|
||||
panic(fmt.Sprintf("unsafe SQLite identifier: %q", value))
|
||||
}
|
||||
return `"` + value + `"`
|
||||
}
|
||||
|
||||
func (s *Store) tableExists(ctx context.Context, table string) bool {
|
||||
var name string
|
||||
err := s.db.QueryRowContext(ctx, `select name from sqlite_master where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)
|
||||
|
||||
@ -43,7 +43,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
|
||||
var githubRepoID sql.NullString
|
||||
var rawJSON sql.NullString
|
||||
err := s.q().QueryRowContext(ctx, `
|
||||
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
|
||||
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
|
||||
from repositories
|
||||
where full_name = ?
|
||||
`, fullName).Scan(&repo.ID, &repo.Owner, &repo.Name, &repo.FullName, &githubRepoID, &rawJSON, &repo.UpdatedAt)
|
||||
@ -57,7 +57,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
|
||||
|
||||
func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
|
||||
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
|
||||
from repositories
|
||||
order by coalesce(updated_at, '') desc, id desc
|
||||
`)
|
||||
@ -84,6 +84,13 @@ func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
func (s *Store) repositoryRawJSONExpr(ctx context.Context) string {
|
||||
if s.hasColumn(ctx, "repositories", "raw_json") {
|
||||
return "raw_json"
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func nullString(value string) sql.NullString {
|
||||
return sql.NullString{String: value, Valid: value != ""}
|
||||
}
|
||||
|
||||
@ -71,14 +71,15 @@ func (s *Store) searchThreads(ctx context.Context, repoID int64, query string, l
|
||||
return nil, nil
|
||||
}
|
||||
pattern := "%" + escapeLike(needle) + "%"
|
||||
bodyExpr := s.threadBodyExpr(ctx, "")
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, number, kind, state, title, html_url, author_login,
|
||||
coalesce(nullif(body, ''), title)
|
||||
coalesce(nullif(`+bodyExpr+`, ''), title)
|
||||
from threads
|
||||
where repo_id = ?
|
||||
and (
|
||||
lower(title) like ? escape '\'
|
||||
or lower(coalesce(body, '')) like ? escape '\'
|
||||
or lower(coalesce(`+bodyExpr+`, '')) like ? escape '\'
|
||||
)
|
||||
order by coalesce(updated_at_gh, updated_at) desc, number desc
|
||||
limit ?
|
||||
|
||||
@ -155,8 +155,24 @@ func (s *Store) Status(ctx context.Context) (Status, error) {
|
||||
return Status{}, fmt.Errorf("count clusters: %w", err)
|
||||
}
|
||||
var lastSync string
|
||||
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read last sync: %w", err)
|
||||
if s.hasTable(ctx, "sync_runs") {
|
||||
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read last sync: %w", err)
|
||||
}
|
||||
} else if s.hasTable(ctx, "repo_sync_state") {
|
||||
if err := s.db.QueryRowContext(ctx, `
|
||||
select coalesce(
|
||||
max(last_open_close_reconciled_at),
|
||||
max(last_overlapping_open_scan_completed_at),
|
||||
max(last_non_overlapping_scan_completed_at),
|
||||
max(last_full_open_scan_started_at),
|
||||
max(updated_at),
|
||||
''
|
||||
)
|
||||
from repo_sync_state
|
||||
`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read portable sync state: %w", err)
|
||||
}
|
||||
}
|
||||
if lastSync != "" {
|
||||
parsed, err := time.Parse(timeLayout, lastSync)
|
||||
@ -206,6 +222,12 @@ func (s *Store) ensureLegacyPortableColumns(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) hasTable(ctx context.Context, table string) bool {
|
||||
var name string
|
||||
err := s.db.QueryRowContext(ctx, `select name from sqlite_schema where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error {
|
||||
if s.hasColumn(ctx, table, column) {
|
||||
return nil
|
||||
|
||||
@ -90,6 +90,132 @@ func TestOpenReadOnlyDoesNotMutateStore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenReadOnlySupportsCanonicalPortableStore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbPath := filepath.Join(t.TempDir(), "portable.db")
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open seed db: %v", err)
|
||||
}
|
||||
_, err = db.ExecContext(ctx, `
|
||||
create table repositories (
|
||||
id integer primary key,
|
||||
owner text not null,
|
||||
name text not null,
|
||||
full_name text not null,
|
||||
github_repo_id text,
|
||||
updated_at text not null
|
||||
);
|
||||
create table threads (
|
||||
id integer primary key,
|
||||
repo_id integer not null,
|
||||
github_id text not null,
|
||||
number integer not null,
|
||||
kind text not null,
|
||||
state text not null,
|
||||
title text not null,
|
||||
body_excerpt text,
|
||||
body_length integer not null default 0,
|
||||
author_login text,
|
||||
author_type text,
|
||||
html_url text not null,
|
||||
labels_json text not null,
|
||||
assignees_json text not null,
|
||||
content_hash text not null,
|
||||
is_draft integer not null default 0,
|
||||
created_at_gh text,
|
||||
updated_at_gh text,
|
||||
closed_at_gh text,
|
||||
merged_at_gh text,
|
||||
first_pulled_at text,
|
||||
last_pulled_at text,
|
||||
updated_at text not null,
|
||||
closed_at_local text,
|
||||
close_reason_local text
|
||||
);
|
||||
create table repo_sync_state (
|
||||
repo_id integer primary key,
|
||||
last_full_open_scan_started_at text,
|
||||
last_overlapping_open_scan_completed_at text,
|
||||
last_non_overlapping_scan_completed_at text,
|
||||
last_open_close_reconciled_at text,
|
||||
updated_at text not null
|
||||
);
|
||||
create table cluster_groups (
|
||||
id integer primary key,
|
||||
repo_id integer not null,
|
||||
stable_key text not null,
|
||||
stable_slug text not null,
|
||||
status text not null,
|
||||
cluster_type text not null,
|
||||
representative_thread_id integer,
|
||||
title text,
|
||||
created_at text not null,
|
||||
updated_at text not null,
|
||||
closed_at text
|
||||
);
|
||||
insert into repositories(id, owner, name, full_name, updated_at)
|
||||
values(1, 'openclaw', 'openclaw', 'openclaw/openclaw', '2026-04-28T00:00:00Z');
|
||||
insert into threads(id, repo_id, github_id, number, kind, state, title, body_excerpt, html_url, labels_json, assignees_json, content_hash, updated_at)
|
||||
values(1, 1, '1', 42, 'issue', 'open', 'portable issue', 'portable body', 'https://github.com/openclaw/openclaw/issues/42', '[]', '[]', 'hash', '2026-04-28T00:00:00Z');
|
||||
insert into repo_sync_state(repo_id, last_open_close_reconciled_at, updated_at)
|
||||
values(1, '2026-04-28T01:02:03Z', '2026-04-28T01:02:03Z');
|
||||
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at)
|
||||
values(1, 1, 'stable', 'stable', 'active', 'similarity', 1, 'portable cluster', '2026-04-28T00:00:00Z', '2026-04-28T00:00:00Z');
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatalf("seed portable db: %v", err)
|
||||
}
|
||||
if err := db.Close(); err != nil {
|
||||
t.Fatalf("close seed db: %v", err)
|
||||
}
|
||||
before, err := os.ReadFile(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read db before: %v", err)
|
||||
}
|
||||
|
||||
st, err := OpenReadOnly(ctx, dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open readonly portable: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
status, err := st.Status(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("portable status: %v", err)
|
||||
}
|
||||
if status.RepositoryCount != 1 || status.ThreadCount != 1 || status.OpenThreadCount != 1 || status.ClusterCount != 1 {
|
||||
t.Fatalf("unexpected portable status: %#v", status)
|
||||
}
|
||||
if status.LastSyncAt.IsZero() {
|
||||
t.Fatalf("portable last sync was not read from repo_sync_state: %#v", status)
|
||||
}
|
||||
repo, err := st.RepositoryByFullName(ctx, "openclaw/openclaw")
|
||||
if err != nil {
|
||||
t.Fatalf("portable repository: %v", err)
|
||||
}
|
||||
if repo.RawJSON != "" {
|
||||
t.Fatalf("portable raw json: got %q want empty", repo.RawJSON)
|
||||
}
|
||||
threads, err := st.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repo.ID, Numbers: []int{42}})
|
||||
if err != nil {
|
||||
t.Fatalf("portable threads: %v", err)
|
||||
}
|
||||
if len(threads) != 1 || threads[0].Body != "portable body" || threads[0].RawJSON != "" {
|
||||
t.Fatalf("unexpected portable thread: %#v", threads)
|
||||
}
|
||||
if err := st.Close(); err != nil {
|
||||
t.Fatalf("close portable readonly: %v", err)
|
||||
}
|
||||
after, err := os.ReadFile(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read db after: %v", err)
|
||||
}
|
||||
if !bytes.Equal(after, before) {
|
||||
t.Fatal("readonly portable open mutated database bytes")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenMigratesPortableStoreColumns(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbPath := filepath.Join(t.TempDir(), "portable.db")
|
||||
@ -252,21 +378,26 @@ func TestPrunePortablePayloads(t *testing.T) {
|
||||
t.Fatalf("unexpected stats: %#v", stats)
|
||||
}
|
||||
|
||||
var repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features string
|
||||
var documentCount int
|
||||
if err := st.DB().QueryRowContext(ctx, `select raw_json from repositories where id = 1`).Scan(&repoRaw); err != nil {
|
||||
t.Fatalf("repo raw: %v", err)
|
||||
var bodyExcerpt, titleTokens, linkedRefs, buckets, features string
|
||||
if st.hasColumn(ctx, "repositories", "raw_json") {
|
||||
t.Fatal("repositories.raw_json was not dropped")
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select body, raw_json from threads where id = 1`).Scan(&body, &threadRaw); err != nil {
|
||||
t.Fatalf("thread payload: %v", err)
|
||||
if st.hasColumn(ctx, "threads", "raw_json") {
|
||||
t.Fatal("threads.raw_json was not dropped")
|
||||
}
|
||||
if st.hasColumn(ctx, "threads", "body") {
|
||||
t.Fatal("threads.body was not dropped")
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select body_excerpt from threads where id = 1`).Scan(&bodyExcerpt); err != nil {
|
||||
t.Fatalf("thread body excerpt: %v", err)
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select title_tokens_json, linked_refs_json, module_buckets_json, feature_json from thread_fingerprints where id = 1`).Scan(&titleTokens, &linkedRefs, &buckets, &features); err != nil {
|
||||
t.Fatalf("fingerprint payload: %v", err)
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select count(*) from documents`).Scan(&documentCount); err != nil {
|
||||
t.Fatalf("document count: %v", err)
|
||||
if st.tableExists(ctx, "documents") {
|
||||
t.Fatal("documents table was not dropped")
|
||||
}
|
||||
if repoRaw != "" || body != "abcdefgh" || threadRaw != "" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" || documentCount != 0 {
|
||||
t.Fatalf("payloads not pruned: repoRaw=%q body=%q threadRaw=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q documents=%d", repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features, documentCount)
|
||||
if bodyExcerpt != "abcdefgh" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" {
|
||||
t.Fatalf("payloads not pruned: bodyExcerpt=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q", bodyExcerpt, titleTokens, linkedRefs, buckets, features)
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,10 +108,7 @@ func (s *Store) ListThreadsFiltered(ctx context.Context, options ThreadListOptio
|
||||
args = append(args, options.Limit)
|
||||
}
|
||||
rows, err := s.q().QueryContext(ctx, `
|
||||
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
|
||||
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
|
||||
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
|
||||
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
|
||||
select `+s.threadSelectColumns(ctx, "")+`
|
||||
from threads
|
||||
where `+where+`
|
||||
order by number`+limitSQL, args...)
|
||||
@ -210,6 +207,66 @@ func scanThread(rows interface {
|
||||
return thread, nil
|
||||
}
|
||||
|
||||
func (s *Store) threadSelectColumns(ctx context.Context, alias string) string {
|
||||
column := func(name string) string {
|
||||
if alias == "" {
|
||||
return name
|
||||
}
|
||||
return alias + "." + name
|
||||
}
|
||||
return strings.Join([]string{
|
||||
column("id"),
|
||||
column("repo_id"),
|
||||
column("github_id"),
|
||||
column("number"),
|
||||
column("kind"),
|
||||
column("state"),
|
||||
column("title"),
|
||||
s.threadBodyExpr(ctx, alias),
|
||||
column("author_login"),
|
||||
column("author_type"),
|
||||
column("html_url"),
|
||||
column("labels_json"),
|
||||
column("assignees_json"),
|
||||
s.threadRawJSONExpr(ctx, alias),
|
||||
column("content_hash"),
|
||||
column("is_draft"),
|
||||
column("created_at_gh"),
|
||||
column("updated_at_gh"),
|
||||
column("closed_at_gh"),
|
||||
column("merged_at_gh"),
|
||||
column("first_pulled_at"),
|
||||
column("last_pulled_at"),
|
||||
column("updated_at"),
|
||||
column("closed_at_local"),
|
||||
column("close_reason_local"),
|
||||
}, ", ")
|
||||
}
|
||||
|
||||
func (s *Store) threadBodyExpr(ctx context.Context, alias string) string {
|
||||
if s.hasColumn(ctx, "threads", "body") {
|
||||
return qualifiedColumn(alias, "body")
|
||||
}
|
||||
if s.hasColumn(ctx, "threads", "body_excerpt") {
|
||||
return qualifiedColumn(alias, "body_excerpt")
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func (s *Store) threadRawJSONExpr(ctx context.Context, alias string) string {
|
||||
if s.hasColumn(ctx, "threads", "raw_json") {
|
||||
return qualifiedColumn(alias, "raw_json")
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func qualifiedColumn(alias, name string) string {
|
||||
if alias == "" {
|
||||
return name
|
||||
}
|
||||
return alias + "." + name
|
||||
}
|
||||
|
||||
func boolInt(value bool) int {
|
||||
if value {
|
||||
return 1
|
||||
|
||||
@ -63,6 +63,9 @@ func (s *Store) ListThreadVectors(ctx context.Context, repoID int64) ([]ThreadVe
|
||||
}
|
||||
|
||||
func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVectorQuery) ([]ThreadVector, error) {
|
||||
if !s.hasTable(ctx, "thread_vectors") {
|
||||
return []ThreadVector{}, nil
|
||||
}
|
||||
where, args := threadVectorWhere(query)
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
|
||||
@ -96,13 +99,13 @@ func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVecto
|
||||
}
|
||||
|
||||
func (s *Store) ThreadVectorByNumber(ctx context.Context, query ThreadVectorQuery, number int) (Thread, ThreadVector, error) {
|
||||
if !s.hasTable(ctx, "thread_vectors") {
|
||||
return Thread{}, ThreadVector{}, fmt.Errorf("thread #%d was not found with an embedding", number)
|
||||
}
|
||||
where, args := threadVectorWhere(query)
|
||||
args = append(args, number)
|
||||
row := s.db.QueryRowContext(ctx, `
|
||||
select t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local,
|
||||
select `+s.threadSelectColumns(ctx, "t")+`,
|
||||
tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
|
||||
from threads t
|
||||
join thread_vectors tv on tv.thread_id = t.id
|
||||
@ -131,10 +134,7 @@ func (s *Store) ThreadsByIDs(ctx context.Context, repoID int64, ids []int64) (ma
|
||||
args = append(args, id)
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
|
||||
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
|
||||
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
|
||||
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
|
||||
select `+s.threadSelectColumns(ctx, "")+`
|
||||
from threads
|
||||
where repo_id = ? and id in (`+strings.Join(placeholders, ",")+`)
|
||||
`, args...)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user