Compare commits

...

3 Commits

Author SHA1 Message Date
Vincent Koc
6e9f4356b0
fix(cluster): emit ghcrawl-shaped durable clusters 2026-04-28 12:40:40 -07:00
Vincent Koc
832cd09dc0
fix(cluster): align gitcrawl sync and embedding parity 2026-04-28 12:13:17 -07:00
Vincent Koc
85ce5ced12
fix(store): support canonical portable snapshots 2026-04-28 11:21:37 -07:00
21 changed files with 839 additions and 135 deletions

View File

@ -16,7 +16,7 @@ gitcrawl doctor
gitcrawl sync owner/repo
gitcrawl sync owner/repo --state open
gitcrawl refresh owner/repo
gitcrawl cluster owner/repo --threshold 0.82
gitcrawl cluster owner/repo --threshold 0.80
gitcrawl clusters owner/repo
gitcrawl durable-clusters owner/repo
gitcrawl cluster-detail owner/repo --id 123
@ -35,9 +35,9 @@ gitcrawl tui owner/repo
```
`gitcrawl clusters`, `gitcrawl durable-clusters`, and `gitcrawl tui` show active primary cluster memberships by default. Pass `--include-closed` to inspect closed rows and historical secondary memberships.
`gitcrawl cluster` and `gitcrawl refresh` build bounded nearest-neighbor clusters by default (`--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`) and add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges also need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
`gitcrawl cluster` and `gitcrawl refresh` build ghcrawl-shaped durable clusters by default (`--threshold 0.80`, `--min-size 1`, `--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`): every active vector-backed thread is represented, singleton rows use `singleton_orphan`, multi-member rows use `duplicate_candidate`, and stable IDs are derived from the representative thread. They also add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
`gitcrawl tui` infers the most recently updated local repository when `owner/repo` is omitted. `serve` is intentionally not part of `gitcrawl`.
`gitcrawl sync` fetches issues and pull requests in every GitHub state by default. Pass `--state open` or `--state closed` to limit a sync to one state.
`gitcrawl sync` fetches open issues and pull requests by default. Pass `--state all` or `--state closed` for explicit backfill workflows; incremental open syncs with `--since` also sweep recently closed items so local open state does not rot.
The TUI starts at `--min-size 5` so maintainer-significant active clusters are visible first; pass `--min-size 1` to include singletons. Mouse support is built in: click rows, wheel panes, and right-click for copy, sort, filter, jump, link, neighbor, local close/reopen, and member triage actions. Press `a` to open the same action menu from the keyboard, `#` to jump directly to an issue or PR number, `p` to switch between repositories already present in the local store, or `n` to load neighbors for the selected issue or PR. Enter from the members pane also loads neighbors before opening detail. The TUI quietly refreshes from the local store every 15 seconds.
## Local Defaults

View File

@ -2,8 +2,6 @@ package cli
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"flag"
@ -32,6 +30,7 @@ const (
defaultTUIWorkingSetLimit = 500
defaultClusterMaxSize = 40
defaultClusterFanout = 16
defaultClusterThreshold = 0.80
defaultCrossKindMinScore = 0.93
highConfidenceEdgeScore = 0.90
weakEdgeMinTitleOverlap = 0.18
@ -248,10 +247,10 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
includeComments := fs.Bool("include-comments", false, "hydrate comments during sync")
fs.Bool("include-code", false, "accepted for compatibility; code hydration is not implemented yet")
since := fs.String("since", "", "GitHub since timestamp")
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
limitRaw := fs.String("limit", "", "maximum sync or embedding rows")
thresholdRaw := fs.String("threshold", "0.82", "minimum cluster cosine score")
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cluster cosine score")
minSizeRaw := fs.String("min-size", "1", "minimum cluster member count")
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
@ -545,8 +544,8 @@ func (a *App) runNeighbors(ctx context.Context, args []string) error {
func (a *App) runCluster(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("cluster", flag.ContinueOnError)
fs.SetOutput(io.Discard)
thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score")
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cosine score")
minSizeRaw := fs.String("min-size", "1", "minimum cluster member count")
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
@ -726,7 +725,7 @@ func (a *App) embedRepository(ctx context.Context, owner, repoName string, optio
if batchSize <= 0 {
batchSize = 64
}
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL()})
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL(), Dimensions: rt.Config.OpenAI.EmbedDimensions})
for start := 0; start < len(tasks); start += batchSize {
end := start + batchSize
if end > len(tasks) {
@ -1484,7 +1483,7 @@ func (a *App) runSync(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("sync", flag.ContinueOnError)
fs.SetOutput(io.Discard)
since := fs.String("since", "", "GitHub since timestamp")
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
limitRaw := fs.String("limit", "", "maximum issue/PR rows")
jsonOut := fs.Bool("json", false, "write JSON output")
includeComments := fs.Bool("include-comments", false, "hydrate issue comments, PR reviews, and PR review comments")
@ -1936,7 +1935,7 @@ func parseClusterShapeOptions(command, maxClusterSizeRaw, fanoutRaw, crossKindTh
func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, options clusterBuildOptions) ([]store.DurableClusterInput, int, error) {
if options.MinSize <= 0 {
options.MinSize = 2
options.MinSize = 1
}
if options.MaxClusterSize <= 0 {
options.MaxClusterSize = defaultClusterMaxSize
@ -2004,19 +2003,26 @@ func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int6
right := threads[builtCluster.Members[j]]
return left.Number < right.Number
})
rep := threads[builtCluster.RepresentativeThreadID]
identity := store.HumanKeyForValue(fmt.Sprintf("repo:%d:cluster-representative:%d", repoID, builtCluster.RepresentativeThreadID))
clusterType := "duplicate_candidate"
if len(builtCluster.Members) == 1 {
clusterType = "singleton_orphan"
}
input := store.DurableClusterInput{
StableKey: durableClusterStableKey(builtCluster.Members, threads),
StableSlug: durableClusterSlug(builtCluster.Members, threads),
StableKey: identity.Hash,
StableSlug: store.HumanKeyStableSlug(identity),
ClusterType: clusterType,
RepresentativeThreadID: builtCluster.RepresentativeThreadID,
Title: rep.Title,
Title: "Cluster " + identity.Slug,
Members: make([]store.DurableClusterMemberInput, 0, len(builtCluster.Members)),
}
for _, threadID := range builtCluster.Members {
role := "member"
role := "related"
var scorePtr *float64
if threadID == builtCluster.RepresentativeThreadID {
role = "representative"
role = "canonical"
scoreCopy := 1.0
scorePtr = &scoreCopy
} else if score, ok := pairScores[threadIDPairKey(threadID, builtCluster.RepresentativeThreadID)]; ok {
scoreCopy := score
scorePtr = &scoreCopy
@ -2193,23 +2199,6 @@ func clusterRepository(ctx context.Context, st *store.Store, repoID int64, store
}, nil
}
func durableClusterStableKey(threadIDs []int64, threads map[int64]store.Thread) string {
parts := make([]string, 0, len(threadIDs))
for _, id := range threadIDs {
if thread, ok := threads[id]; ok && thread.Number > 0 {
parts = append(parts, strconv.Itoa(thread.Number))
continue
}
parts = append(parts, strconv.FormatInt(id, 10))
}
return "numbers:" + strings.Join(parts, ",")
}
func durableClusterSlug(threadIDs []int64, threads map[int64]store.Thread) string {
sum := sha256.Sum256([]byte(durableClusterStableKey(threadIDs, threads)))
return "cluster-" + hex.EncodeToString(sum[:])[:12]
}
func threadIDPairKey(left, right int64) string {
if left > right {
left, right = right, left

View File

@ -9,6 +9,7 @@ import (
"net/http/httptest"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"testing"
@ -851,7 +852,7 @@ func TestClusterCommandPersistsDurableClusters(t *testing.T) {
if err := run.Run(ctx, []string{"--config", configPath, "cluster", "openclaw/openclaw", "--threshold", "0.90", "--json"}); err != nil {
t.Fatalf("cluster: %v", err)
}
if !strings.Contains(stdout.String(), `"cluster_count": 1`) {
if !strings.Contains(stdout.String(), `"cluster_count": 2`) {
t.Fatalf("cluster output = %q", stdout.String())
}
st, err = store.Open(ctx, dbPath)
@ -863,8 +864,13 @@ func TestClusterCommandPersistsDurableClusters(t *testing.T) {
if err != nil {
t.Fatalf("list clusters: %v", err)
}
if len(clusters) != 1 || clusters[0].MemberCount != 2 {
t.Fatalf("expected one durable cluster, got %#v", clusters)
memberCounts := []int{}
for _, cluster := range clusters {
memberCounts = append(memberCounts, cluster.MemberCount)
}
sort.Ints(memberCounts)
if len(memberCounts) != 2 || memberCounts[0] != 1 || memberCounts[1] != 2 {
t.Fatalf("expected duplicate cluster plus singleton, got %#v", clusters)
}
}
@ -1321,7 +1327,7 @@ func TestRefreshEmbedsAndClustersWithoutSync(t *testing.T) {
if !strings.Contains(out, `"embedded": 3`) {
t.Fatalf("refresh did not embed rows: %q", out)
}
if !strings.Contains(out, `"cluster_count": 1`) {
if !strings.Contains(out, `"cluster_count": 2`) {
t.Fatalf("refresh did not persist cluster: %q", out)
}
@ -1334,8 +1340,13 @@ func TestRefreshEmbedsAndClustersWithoutSync(t *testing.T) {
if err != nil {
t.Fatalf("list clusters: %v", err)
}
if len(clusters) != 1 || clusters[0].MemberCount != 2 {
t.Fatalf("expected one durable cluster, got %#v", clusters)
memberCounts := []int{}
for _, cluster := range clusters {
memberCounts = append(memberCounts, cluster.MemberCount)
}
sort.Ints(memberCounts)
if len(memberCounts) != 2 || memberCounts[0] != 1 || memberCounts[1] != 2 {
t.Fatalf("expected duplicate cluster plus singleton, got %#v", clusters)
}
}

View File

@ -32,11 +32,12 @@ type GitHubConfig struct {
}
type OpenAIConfig struct {
APIKeyEnv string `toml:"api_key_env"`
SummaryModel string `toml:"summary_model"`
EmbedModel string `toml:"embed_model"`
BatchSize int `toml:"batch_size"`
Concurrency int `toml:"concurrency"`
APIKeyEnv string `toml:"api_key_env"`
SummaryModel string `toml:"summary_model"`
EmbedModel string `toml:"embed_model"`
EmbedDimensions int `toml:"embed_dimensions"`
BatchSize int `toml:"batch_size"`
Concurrency int `toml:"concurrency"`
}
type TUIConfig struct {
@ -62,11 +63,12 @@ func Default() Config {
TokenEnv: DefaultTokenEnv,
},
OpenAI: OpenAIConfig{
APIKeyEnv: DefaultOpenAIEnv,
SummaryModel: "gpt-5.4",
EmbedModel: "text-embedding-3-small",
BatchSize: 64,
Concurrency: 2,
APIKeyEnv: DefaultOpenAIEnv,
SummaryModel: "gpt-5.4",
EmbedModel: "text-embedding-3-small",
EmbedDimensions: 1024,
BatchSize: 64,
Concurrency: 2,
},
TUI: TUIConfig{
DefaultSort: "recent",
@ -154,6 +156,9 @@ func (c *Config) Normalize() error {
if c.OpenAI.EmbedModel == "" {
c.OpenAI.EmbedModel = envOrDefault("GITCRAWL_EMBED_MODEL", def.OpenAI.EmbedModel)
}
if c.OpenAI.EmbedDimensions <= 0 {
c.OpenAI.EmbedDimensions = def.OpenAI.EmbedDimensions
}
if c.OpenAI.BatchSize <= 0 {
c.OpenAI.BatchSize = def.OpenAI.BatchSize
}

View File

@ -20,17 +20,20 @@ type Client struct {
apiKey string
baseURL string
httpClient *http.Client
dimensions int
}
type Options struct {
APIKey string
BaseURL string
Dimensions int
HTTPClient *http.Client
}
type embeddingRequest struct {
Model string `json:"model"`
Input []string `json:"input"`
Model string `json:"model"`
Input []string `json:"input"`
Dimensions int `json:"dimensions,omitempty"`
}
type embeddingResponse struct {
@ -57,6 +60,7 @@ func New(options Options) *Client {
apiKey: strings.TrimSpace(options.APIKey),
baseURL: baseURL,
httpClient: httpClient,
dimensions: options.Dimensions,
}
}
@ -71,7 +75,7 @@ func (c *Client) Embed(ctx context.Context, model string, texts []string) ([][]f
if c.apiKey == "" {
return nil, fmt.Errorf("OpenAI API key is required")
}
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts})
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts, Dimensions: c.dimensions})
if err != nil {
return nil, fmt.Errorf("marshal embeddings request: %w", err)
}

View File

@ -14,6 +14,9 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
t.Fatalf("decode request: %v", err)
}
if request.Dimensions != 1024 {
t.Fatalf("dimensions = %d, want 1024", request.Dimensions)
}
response := embeddingResponse{}
for index := range request.Input {
vector := make([]float64, 1536)
@ -36,7 +39,7 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
for index := range inputs {
inputs[index] = "thread text"
}
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL}).Embed(context.Background(), "text-embedding-3-small", inputs)
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL, Dimensions: 1024}).Embed(context.Background(), "text-embedding-3-small", inputs)
if err != nil {
t.Fatalf("embed: %v", err)
}

View File

@ -72,6 +72,7 @@ type ClusterMemberOverride struct {
type DurableClusterInput struct {
StableKey string
StableSlug string
ClusterType string
RepresentativeThreadID int64
Title string
Members []DurableClusterMemberInput
@ -464,10 +465,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, `
select cm.role, cm.state, cm.score_to_representative,
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
`+s.threadSelectColumns(ctx, "t")+`
from cluster_memberships cm
join cluster_groups cg on cg.id = cm.cluster_id
join threads t on t.id = cm.thread_id
@ -526,10 +524,7 @@ func (s *Store) RunClusterDetail(ctx context.Context, options ClusterDetailOptio
select case when t.id = c.representative_thread_id then 'representative' else 'member' end as role,
'active' as state,
cm.score_to_representative,
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
`+s.threadSelectColumns(ctx, "t")+`
from cluster_members cm
join clusters c on c.id = cm.cluster_id and c.cluster_run_id = ?
join threads t on t.id = cm.thread_id
@ -731,6 +726,15 @@ func (s *Store) SaveDurableClusters(ctx context.Context, repoID int64, inputs []
return err
}
}
if len(inputs) > 0 {
if _, err := tx.q().ExecContext(ctx, `
delete from cluster_groups
where repo_id = ?
and cluster_type = 'similarity'
`, repoID); err != nil {
return fmt.Errorf("delete legacy similarity clusters: %w", err)
}
}
if _, err := tx.q().ExecContext(ctx, `
update cluster_runs
set finished_at = ?, stats_json = ?
@ -951,12 +955,16 @@ func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, i
if stableSlug == "" {
stableSlug = stableKey
}
clusterType := strings.TrimSpace(input.ClusterType)
if clusterType == "" {
clusterType = "duplicate_candidate"
}
var clusterID int64
if err := s.q().QueryRowContext(ctx, `
insert into cluster_groups(
repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at
)
values(?, ?, ?, 'active', 'similarity', ?, ?, ?, ?)
values(?, ?, ?, 'active', ?, ?, ?, ?, ?)
on conflict(repo_id, stable_key) do update set
stable_slug = excluded.stable_slug,
cluster_type = excluded.cluster_type,
@ -967,7 +975,7 @@ func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, i
title = excluded.title,
updated_at = excluded.updated_at
returning id
`, repoID, stableKey, stableSlug, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil {
`, repoID, stableKey, stableSlug, clusterType, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil {
return 0, fmt.Errorf("upsert durable cluster: %w", err)
}
if _, err := s.q().ExecContext(ctx, `
@ -1167,6 +1175,9 @@ func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64,
}
func (s *Store) latestRawClusterRunID(ctx context.Context, repoID int64) (int64, bool, error) {
if !s.hasTable(ctx, "cluster_runs") || !s.hasTable(ctx, "clusters") {
return 0, false, nil
}
row := s.db.QueryRowContext(ctx, `
select cr.id
from cluster_runs cr
@ -1292,23 +1303,46 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
placeholders = append(placeholders, "?")
args = append(args, threadID)
}
rows, err := s.db.QueryContext(ctx, `
select thread_id, summary_kind, summary_text
from document_summaries
where thread_id in (`+strings.Join(placeholders, ",")+`)
order by thread_id, summary_kind, updated_at desc
`, args...)
if err != nil {
return nil, fmt.Errorf("select document summaries: %w", err)
}
defer rows.Close()
out := make(map[int64]map[string]string)
if s.hasTable(ctx, "document_summaries") {
rows, err := s.db.QueryContext(ctx, `
select thread_id, summary_kind, summary_text
from document_summaries
where thread_id in (`+strings.Join(placeholders, ",")+`)
order by thread_id, summary_kind, updated_at desc
`, args...)
if err != nil {
return nil, fmt.Errorf("select document summaries: %w", err)
}
if err := scanSummaryRows(rows, out, "document summary"); err != nil {
return nil, err
}
}
if s.hasTable(ctx, "thread_key_summaries") && s.hasTable(ctx, "thread_revisions") {
rows, err := s.db.QueryContext(ctx, `
select tr.thread_id, tks.summary_kind, tks.key_text
from thread_key_summaries tks
join thread_revisions tr on tr.id = tks.thread_revision_id
where tr.thread_id in (`+strings.Join(placeholders, ",")+`)
order by tr.thread_id, tks.summary_kind, tks.created_at desc
`, args...)
if err != nil {
return nil, fmt.Errorf("select thread key summaries: %w", err)
}
if err := scanSummaryRows(rows, out, "thread key summary"); err != nil {
return nil, err
}
}
return out, nil
}
func scanSummaryRows(rows *sql.Rows, out map[int64]map[string]string, source string) error {
defer rows.Close()
for rows.Next() {
var threadID int64
var kind, text string
if err := rows.Scan(&threadID, &kind, &text); err != nil {
return nil, fmt.Errorf("scan document summary: %w", err)
return fmt.Errorf("scan %s: %w", source, err)
}
if out[threadID] == nil {
out[threadID] = map[string]string{}
@ -1318,9 +1352,9 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
}
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate document summaries: %w", err)
return fmt.Errorf("iterate %s rows: %w", source, err)
}
return out, nil
return nil
}
func snippetRunes(value string, limit int) string {

View File

@ -48,10 +48,19 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
args = append(args, options.Limit)
}
rows, err := s.q().QueryContext(ctx, `
select t.id, t.number, t.kind, t.title, coalesce(d.body, ''), coalesce(d.raw_text, ''), coalesce(d.dedupe_text, ''),
select t.id, t.number, t.kind, t.title, coalesce(d.body, t.body, ''), coalesce(d.raw_text, t.body, ''), coalesce(d.dedupe_text, t.title || ' ' || coalesce(t.body, '')),
coalesce((
select tks.key_text
from thread_key_summaries tks
join thread_revisions tr on tr.id = tks.thread_revision_id
where tr.thread_id = t.id
and tks.summary_kind in ('llm_key_summary', 'llm_key_3line')
order by tks.created_at desc, tr.created_at desc, tks.id desc
limit 1
), ''),
coalesce(tv.content_hash, '')
from threads t
join documents d on d.thread_id = t.id
left join documents d on d.thread_id = t.id
left join thread_vectors tv on tv.thread_id = t.id and tv.basis = ? and tv.model = ?
where `+strings.Join(where, " and ")+`
order by coalesce(t.updated_at_gh, t.updated_at) desc, t.number desc`+limitSQL,
@ -64,14 +73,17 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
out := make([]EmbeddingTask, 0)
for rows.Next() {
var task EmbeddingTask
var body, rawText, dedupeText, existingHash string
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &existingHash); err != nil {
var body, rawText, dedupeText, keySummary, existingHash string
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &keySummary, &existingHash); err != nil {
return nil, fmt.Errorf("scan embedding task: %w", err)
}
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText)
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText, keySummary)
if err != nil {
return nil, err
}
if strings.TrimSpace(text) == "" {
continue
}
task.Text = text
task.ContentHash = embeddingContentHash(basis, model, text)
if !options.Force && existingHash == task.ContentHash {
@ -85,7 +97,7 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
return out, nil
}
func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (string, error) {
func embeddingTextForBasis(basis, title, body, rawText, dedupeText, keySummary string) (string, error) {
switch basis {
case "", "title_original":
parts := []string{strings.TrimSpace(title)}
@ -97,6 +109,12 @@ func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (stri
return strings.TrimSpace(strings.Join(parts, "\n\n")), nil
case "dedupe_text":
return strings.TrimSpace(dedupeText), nil
case "llm_key_summary":
keySummary = strings.TrimSpace(keySummary)
if keySummary == "" {
return "", nil
}
return strings.TrimSpace("title: " + strings.TrimSpace(title) + "\n\nkey_summary:\n" + keySummary), nil
default:
return "", fmt.Errorf("embedding basis %q is not supported yet", basis)
}

View File

@ -0,0 +1,69 @@
package store
import (
"context"
"path/filepath"
"strings"
"testing"
)
func TestListEmbeddingTasksUsesLatestLLMKeySummary(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, Repository{
Owner: "openclaw",
Name: "gitcrawl",
FullName: "openclaw/gitcrawl",
RawJSON: "{}",
UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("repo: %v", err)
}
threadID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID,
GitHubID: "1",
Number: 7,
Kind: "issue",
State: "open",
Title: "Download stalls",
Body: "Large download stalls near completion.",
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash",
UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("thread: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into thread_revisions(id, thread_id, content_hash, title_hash, body_hash, labels_hash, created_at)
values(1, ?, 'hash', 'title', 'body', 'labels', '2026-04-26T00:00:00Z');
insert into thread_key_summaries(thread_revision_id, summary_kind, prompt_version, provider, model, input_hash, output_hash, key_text, created_at)
values(1, 'llm_key_3line', 'v1', 'openai', 'gpt-5-mini', 'input', 'output', 'intent: fix downloads\nsurface: downloader\nmechanism: retry stalled stream', '2026-04-26T00:01:00Z');
`, threadID); err != nil {
t.Fatalf("seed summary: %v", err)
}
tasks, err := st.ListEmbeddingTasks(ctx, EmbeddingTaskOptions{
RepoID: repoID,
Basis: "llm_key_summary",
Model: "text-embedding-3-large",
})
if err != nil {
t.Fatalf("tasks: %v", err)
}
if len(tasks) != 1 {
t.Fatalf("tasks = %d, want 1", len(tasks))
}
if !strings.Contains(tasks[0].Text, "title: Download stalls") || !strings.Contains(tasks[0].Text, "key_summary:") {
t.Fatalf("unexpected embedding text: %q", tasks[0].Text)
}
}

View File

@ -2,7 +2,10 @@ package store
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"strings"
)
var humanKeyWords = []string{
@ -44,10 +47,51 @@ func clusterHumanName(repoID, representativeThreadID, clusterID int64) string {
if representativeThreadID != 0 {
key = fmt.Sprintf("repo:%d:cluster-representative:%d", repoID, representativeThreadID)
}
hash := sha256.Sum256([]byte(key))
return fmt.Sprintf("%s-%s-%s",
humanKeyWords[int(hash[0])%len(humanKeyWords)],
humanKeyWords[int(hash[1])%len(humanKeyWords)],
humanKeyWords[int(hash[2])%len(humanKeyWords)],
)
return HumanKeyForValue(key).Slug
}
type HumanKey struct {
Hash string
Slug string
Checksum string
}
func StableHash(value string) string {
sum := sha256.Sum256([]byte(value))
return hex.EncodeToString(sum[:])
}
func HumanKeyForValue(value string) HumanKey {
return HumanKeyFromHash(StableHash(value))
}
func HumanKeyFromHash(hash string) HumanKey {
normalized := strings.ToLower(hash)
index := func(offset int) int {
value, err := strconv.ParseInt(normalized[offset:offset+2], 16, 64)
if err != nil {
return 0
}
return int(value) % len(humanKeyWords)
}
checksumValue, err := strconv.ParseInt(normalized[6:12], 16, 64)
checksum := "0000"
if err == nil {
checksum = strconv.FormatInt(checksumValue, 36)
if len(checksum) < 4 {
checksum = strings.Repeat("0", 4-len(checksum)) + checksum
}
if len(checksum) > 4 {
checksum = checksum[len(checksum)-4:]
}
}
return HumanKey{
Hash: normalized,
Slug: fmt.Sprintf("%s-%s-%s", humanKeyWords[index(0)], humanKeyWords[index(2)], humanKeyWords[index(4)]),
Checksum: checksum,
}
}
func HumanKeyStableSlug(key HumanKey) string {
return key.Slug + "-" + key.Checksum
}

View File

@ -0,0 +1,13 @@
package store
import "testing"
func TestHumanKeyForValueMatchesGhcrawlRepresentativeIdentity(t *testing.T) {
key := HumanKeyForValue("repo:1:cluster-representative:546")
if key.Hash != "e77f18999d72cc6d27c5d3d0aa2c02cdc8cad3c1be077feb70062bc55eae98fd" {
t.Fatalf("hash = %q", key.Hash)
}
if HumanKeyStableSlug(key) != "usage-matrix-binary-zrzm" {
t.Fatalf("stable slug = %q", HumanKeyStableSlug(key))
}
}

View File

@ -5,6 +5,8 @@ import (
"database/sql"
"fmt"
"os"
"strings"
"time"
)
type PortablePruneOptions struct {
@ -13,16 +15,18 @@ type PortablePruneOptions struct {
}
type PortablePruneStats struct {
DBPath string `json:"db_path"`
BodyChars int `json:"body_chars"`
BytesBefore int64 `json:"bytes_before"`
BytesAfter int64 `json:"bytes_after"`
ThreadsPruned int64 `json:"threads_pruned"`
RepositoriesPruned int64 `json:"repositories_pruned"`
FingerprintsPruned int64 `json:"fingerprints_pruned"`
DocumentsDeleted int64 `json:"documents_deleted"`
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
Vacuumed bool `json:"vacuumed"`
DBPath string `json:"db_path"`
BodyChars int `json:"body_chars"`
BytesBefore int64 `json:"bytes_before"`
BytesAfter int64 `json:"bytes_after"`
ThreadsPruned int64 `json:"threads_pruned"`
RepositoriesPruned int64 `json:"repositories_pruned"`
FingerprintsPruned int64 `json:"fingerprints_pruned"`
DocumentsDeleted int64 `json:"documents_deleted"`
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
DroppedTables []string `json:"dropped_tables,omitempty"`
DroppedColumns []string `json:"dropped_columns,omitempty"`
Vacuumed bool `json:"vacuumed"`
}
func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePruneOptions) (PortablePruneStats, error) {
@ -107,6 +111,9 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
}
stats.DocumentsFTSRebuilt = true
}
if err := s.canonicalizePortableSchema(ctx, options.BodyChars, &stats); err != nil {
return stats, err
}
if options.Vacuum {
if _, err := s.db.ExecContext(ctx, `pragma wal_checkpoint(TRUNCATE)`); err != nil {
return stats, fmt.Errorf("checkpoint wal: %w", err)
@ -121,6 +128,110 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
return stats, nil
}
func (s *Store) canonicalizePortableSchema(ctx context.Context, bodyChars int, stats *PortablePruneStats) error {
if s.hasColumn(ctx, "threads", "body") && !s.hasColumn(ctx, "threads", "body_excerpt") {
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_excerpt text`); err != nil {
return fmt.Errorf("add portable threads.body_excerpt: %w", err)
}
if _, err := s.db.ExecContext(ctx, `
update threads
set body_excerpt = case when length(body) > ? then substr(body, 1, ?) else body end
where body is not null
`, bodyChars, bodyChars); err != nil {
return fmt.Errorf("backfill portable body excerpts: %w", err)
}
}
if !s.hasColumn(ctx, "threads", "body_length") {
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_length integer not null default 0`); err != nil {
return fmt.Errorf("add portable threads.body_length: %w", err)
}
}
for _, column := range []struct {
table string
name string
}{
{table: "repositories", name: "raw_json"},
{table: "threads", name: "raw_json"},
{table: "threads", name: "body"},
} {
if !s.hasColumn(ctx, column.table, column.name) {
continue
}
if _, err := s.db.ExecContext(ctx, `alter table `+sqliteIdentifier(column.table)+` drop column `+sqliteIdentifier(column.name)); err != nil {
return fmt.Errorf("drop portable column %s.%s: %w", column.table, column.name, err)
}
stats.DroppedColumns = append(stats.DroppedColumns, column.table+"."+column.name)
}
for _, table := range canonicalPortableDroppedTables() {
if !s.tableExists(ctx, table) {
continue
}
if _, err := s.db.ExecContext(ctx, `drop table if exists `+sqliteIdentifier(table)); err != nil {
return fmt.Errorf("drop portable table %s: %w", table, err)
}
stats.DroppedTables = append(stats.DroppedTables, table)
}
if _, err := s.db.ExecContext(ctx, `
create table if not exists portable_metadata (
key text primary key,
value text not null
)
`); err != nil {
return fmt.Errorf("ensure portable metadata: %w", err)
}
metadata := map[string]string{
"schema": "ghcrawl-portable-sync-v1",
"body_chars": fmt.Sprintf("%d", bodyChars),
"excluded": "raw_json,comments,documents,fts,vectors,code_snapshots,cluster_events,run_history,similarity_edges,blobs",
"exported_at": time.Now().UTC().Format(timeLayout),
"source_path": s.path,
}
for key, value := range metadata {
if _, err := s.db.ExecContext(ctx, `
insert into portable_metadata(key, value)
values(?, ?)
on conflict(key) do update set value = excluded.value
`, key, value); err != nil {
return fmt.Errorf("write portable metadata %s: %w", key, err)
}
}
return nil
}
func canonicalPortableDroppedTables() []string {
return []string{
"documents_fts",
"documents_fts_config",
"documents_fts_data",
"documents_fts_docsize",
"documents_fts_idx",
"comments",
"documents",
"document_embeddings",
"document_summaries",
"thread_vectors",
"thread_code_snapshots",
"thread_changed_files",
"thread_hunk_signatures",
"cluster_events",
"cluster_members",
"clusters",
"sync_runs",
"summary_runs",
"embedding_runs",
"cluster_runs",
"similarity_edges",
"blobs",
}
}
func sqliteIdentifier(value string) string {
if value == "" || strings.ContainsAny(value, "\"\x00") {
panic(fmt.Sprintf("unsafe SQLite identifier: %q", value))
}
return `"` + value + `"`
}
func (s *Store) tableExists(ctx context.Context, table string) bool {
var name string
err := s.db.QueryRowContext(ctx, `select name from sqlite_master where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)

View File

@ -43,7 +43,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
var githubRepoID sql.NullString
var rawJSON sql.NullString
err := s.q().QueryRowContext(ctx, `
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
from repositories
where full_name = ?
`, fullName).Scan(&repo.ID, &repo.Owner, &repo.Name, &repo.FullName, &githubRepoID, &rawJSON, &repo.UpdatedAt)
@ -57,7 +57,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
rows, err := s.db.QueryContext(ctx, `
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
from repositories
order by coalesce(updated_at, '') desc, id desc
`)
@ -84,6 +84,13 @@ func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
return repos, nil
}
func (s *Store) repositoryRawJSONExpr(ctx context.Context) string {
if s.hasColumn(ctx, "repositories", "raw_json") {
return "raw_json"
}
return "''"
}
func nullString(value string) sql.NullString {
return sql.NullString{String: value, Valid: value != ""}
}

View File

@ -214,6 +214,20 @@ create table if not exists thread_fingerprints (
unique(thread_revision_id, algorithm_version)
);
create table if not exists thread_key_summaries (
id integer primary key,
thread_revision_id integer not null references thread_revisions(id) on delete cascade,
summary_kind text not null,
prompt_version text not null,
provider text not null,
model text not null,
input_hash text not null,
output_hash text not null,
key_text text not null,
created_at text not null,
unique(thread_revision_id, summary_kind, prompt_version, provider, model)
);
create table if not exists sync_runs (
id integer primary key,
repo_id integer references repositories(id) on delete cascade,

View File

@ -71,14 +71,15 @@ func (s *Store) searchThreads(ctx context.Context, repoID int64, query string, l
return nil, nil
}
pattern := "%" + escapeLike(needle) + "%"
bodyExpr := s.threadBodyExpr(ctx, "")
rows, err := s.db.QueryContext(ctx, `
select id, number, kind, state, title, html_url, author_login,
coalesce(nullif(body, ''), title)
coalesce(nullif(`+bodyExpr+`, ''), title)
from threads
where repo_id = ?
and (
lower(title) like ? escape '\'
or lower(coalesce(body, '')) like ? escape '\'
or lower(coalesce(`+bodyExpr+`, '')) like ? escape '\'
)
order by coalesce(updated_at_gh, updated_at) desc, number desc
limit ?

View File

@ -155,8 +155,24 @@ func (s *Store) Status(ctx context.Context) (Status, error) {
return Status{}, fmt.Errorf("count clusters: %w", err)
}
var lastSync string
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
return Status{}, fmt.Errorf("read last sync: %w", err)
if s.hasTable(ctx, "sync_runs") {
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
return Status{}, fmt.Errorf("read last sync: %w", err)
}
} else if s.hasTable(ctx, "repo_sync_state") {
if err := s.db.QueryRowContext(ctx, `
select coalesce(
max(last_open_close_reconciled_at),
max(last_overlapping_open_scan_completed_at),
max(last_non_overlapping_scan_completed_at),
max(last_full_open_scan_started_at),
max(updated_at),
''
)
from repo_sync_state
`).Scan(&lastSync); err != nil {
return Status{}, fmt.Errorf("read portable sync state: %w", err)
}
}
if lastSync != "" {
parsed, err := time.Parse(timeLayout, lastSync)
@ -206,6 +222,12 @@ func (s *Store) ensureLegacyPortableColumns(ctx context.Context) error {
return nil
}
func (s *Store) hasTable(ctx context.Context, table string) bool {
var name string
err := s.db.QueryRowContext(ctx, `select name from sqlite_schema where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)
return err == nil
}
func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error {
if s.hasColumn(ctx, table, column) {
return nil

View File

@ -90,6 +90,132 @@ func TestOpenReadOnlyDoesNotMutateStore(t *testing.T) {
}
}
func TestOpenReadOnlySupportsCanonicalPortableStore(t *testing.T) {
ctx := context.Background()
dbPath := filepath.Join(t.TempDir(), "portable.db")
db, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatalf("open seed db: %v", err)
}
_, err = db.ExecContext(ctx, `
create table repositories (
id integer primary key,
owner text not null,
name text not null,
full_name text not null,
github_repo_id text,
updated_at text not null
);
create table threads (
id integer primary key,
repo_id integer not null,
github_id text not null,
number integer not null,
kind text not null,
state text not null,
title text not null,
body_excerpt text,
body_length integer not null default 0,
author_login text,
author_type text,
html_url text not null,
labels_json text not null,
assignees_json text not null,
content_hash text not null,
is_draft integer not null default 0,
created_at_gh text,
updated_at_gh text,
closed_at_gh text,
merged_at_gh text,
first_pulled_at text,
last_pulled_at text,
updated_at text not null,
closed_at_local text,
close_reason_local text
);
create table repo_sync_state (
repo_id integer primary key,
last_full_open_scan_started_at text,
last_overlapping_open_scan_completed_at text,
last_non_overlapping_scan_completed_at text,
last_open_close_reconciled_at text,
updated_at text not null
);
create table cluster_groups (
id integer primary key,
repo_id integer not null,
stable_key text not null,
stable_slug text not null,
status text not null,
cluster_type text not null,
representative_thread_id integer,
title text,
created_at text not null,
updated_at text not null,
closed_at text
);
insert into repositories(id, owner, name, full_name, updated_at)
values(1, 'openclaw', 'openclaw', 'openclaw/openclaw', '2026-04-28T00:00:00Z');
insert into threads(id, repo_id, github_id, number, kind, state, title, body_excerpt, html_url, labels_json, assignees_json, content_hash, updated_at)
values(1, 1, '1', 42, 'issue', 'open', 'portable issue', 'portable body', 'https://github.com/openclaw/openclaw/issues/42', '[]', '[]', 'hash', '2026-04-28T00:00:00Z');
insert into repo_sync_state(repo_id, last_open_close_reconciled_at, updated_at)
values(1, '2026-04-28T01:02:03Z', '2026-04-28T01:02:03Z');
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at)
values(1, 1, 'stable', 'stable', 'active', 'similarity', 1, 'portable cluster', '2026-04-28T00:00:00Z', '2026-04-28T00:00:00Z');
`)
if err != nil {
t.Fatalf("seed portable db: %v", err)
}
if err := db.Close(); err != nil {
t.Fatalf("close seed db: %v", err)
}
before, err := os.ReadFile(dbPath)
if err != nil {
t.Fatalf("read db before: %v", err)
}
st, err := OpenReadOnly(ctx, dbPath)
if err != nil {
t.Fatalf("open readonly portable: %v", err)
}
defer st.Close()
status, err := st.Status(ctx)
if err != nil {
t.Fatalf("portable status: %v", err)
}
if status.RepositoryCount != 1 || status.ThreadCount != 1 || status.OpenThreadCount != 1 || status.ClusterCount != 1 {
t.Fatalf("unexpected portable status: %#v", status)
}
if status.LastSyncAt.IsZero() {
t.Fatalf("portable last sync was not read from repo_sync_state: %#v", status)
}
repo, err := st.RepositoryByFullName(ctx, "openclaw/openclaw")
if err != nil {
t.Fatalf("portable repository: %v", err)
}
if repo.RawJSON != "" {
t.Fatalf("portable raw json: got %q want empty", repo.RawJSON)
}
threads, err := st.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repo.ID, Numbers: []int{42}})
if err != nil {
t.Fatalf("portable threads: %v", err)
}
if len(threads) != 1 || threads[0].Body != "portable body" || threads[0].RawJSON != "" {
t.Fatalf("unexpected portable thread: %#v", threads)
}
if err := st.Close(); err != nil {
t.Fatalf("close portable readonly: %v", err)
}
after, err := os.ReadFile(dbPath)
if err != nil {
t.Fatalf("read db after: %v", err)
}
if !bytes.Equal(after, before) {
t.Fatal("readonly portable open mutated database bytes")
}
}
func TestOpenMigratesPortableStoreColumns(t *testing.T) {
ctx := context.Background()
dbPath := filepath.Join(t.TempDir(), "portable.db")
@ -252,21 +378,26 @@ func TestPrunePortablePayloads(t *testing.T) {
t.Fatalf("unexpected stats: %#v", stats)
}
var repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features string
var documentCount int
if err := st.DB().QueryRowContext(ctx, `select raw_json from repositories where id = 1`).Scan(&repoRaw); err != nil {
t.Fatalf("repo raw: %v", err)
var bodyExcerpt, titleTokens, linkedRefs, buckets, features string
if st.hasColumn(ctx, "repositories", "raw_json") {
t.Fatal("repositories.raw_json was not dropped")
}
if err := st.DB().QueryRowContext(ctx, `select body, raw_json from threads where id = 1`).Scan(&body, &threadRaw); err != nil {
t.Fatalf("thread payload: %v", err)
if st.hasColumn(ctx, "threads", "raw_json") {
t.Fatal("threads.raw_json was not dropped")
}
if st.hasColumn(ctx, "threads", "body") {
t.Fatal("threads.body was not dropped")
}
if err := st.DB().QueryRowContext(ctx, `select body_excerpt from threads where id = 1`).Scan(&bodyExcerpt); err != nil {
t.Fatalf("thread body excerpt: %v", err)
}
if err := st.DB().QueryRowContext(ctx, `select title_tokens_json, linked_refs_json, module_buckets_json, feature_json from thread_fingerprints where id = 1`).Scan(&titleTokens, &linkedRefs, &buckets, &features); err != nil {
t.Fatalf("fingerprint payload: %v", err)
}
if err := st.DB().QueryRowContext(ctx, `select count(*) from documents`).Scan(&documentCount); err != nil {
t.Fatalf("document count: %v", err)
if st.tableExists(ctx, "documents") {
t.Fatal("documents table was not dropped")
}
if repoRaw != "" || body != "abcdefgh" || threadRaw != "" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" || documentCount != 0 {
t.Fatalf("payloads not pruned: repoRaw=%q body=%q threadRaw=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q documents=%d", repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features, documentCount)
if bodyExcerpt != "abcdefgh" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" {
t.Fatalf("payloads not pruned: bodyExcerpt=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q", bodyExcerpt, titleTokens, linkedRefs, buckets, features)
}
}

View File

@ -77,6 +77,59 @@ func (s *Store) UpsertThread(ctx context.Context, thread Thread) (int64, error)
return id, nil
}
func (s *Store) MarkOpenThreadClosedFromGitHub(ctx context.Context, thread Thread) (bool, error) {
if thread.RepoID <= 0 {
return false, fmt.Errorf("repo id must be positive")
}
if thread.Number <= 0 {
return false, fmt.Errorf("thread number must be positive")
}
if thread.Kind == "" {
return false, fmt.Errorf("thread kind is required")
}
if thread.State == "" {
thread.State = "closed"
}
result, err := s.q().ExecContext(ctx, `
update threads
set github_id = ?,
state = ?,
title = ?,
body = ?,
author_login = ?,
author_type = ?,
html_url = ?,
labels_json = ?,
assignees_json = ?,
raw_json = ?,
content_hash = ?,
is_draft = ?,
created_at_gh = ?,
updated_at_gh = ?,
closed_at_gh = ?,
merged_at_gh = ?,
last_pulled_at = ?,
updated_at = ?
where repo_id = ?
and kind = ?
and number = ?
and state = 'open'
and closed_at_local is null
`, thread.GitHubID, thread.State, thread.Title, nullString(thread.Body), nullString(thread.AuthorLogin),
nullString(thread.AuthorType), thread.HTMLURL, thread.LabelsJSON, thread.AssigneesJSON, thread.RawJSON,
thread.ContentHash, boolInt(thread.IsDraft), nullString(thread.CreatedAtGitHub), nullString(thread.UpdatedAtGitHub),
nullString(thread.ClosedAtGitHub), nullString(thread.MergedAtGitHub), nullString(thread.LastPulledAt), thread.UpdatedAt,
thread.RepoID, thread.Kind, thread.Number)
if err != nil {
return false, fmt.Errorf("mark open thread closed from github: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return false, nil
}
return affected > 0, nil
}
func (s *Store) ListThreads(ctx context.Context, repoID int64, includeClosed bool) ([]Thread, error) {
return s.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repoID, IncludeClosed: includeClosed})
}
@ -108,10 +161,7 @@ func (s *Store) ListThreadsFiltered(ctx context.Context, options ThreadListOptio
args = append(args, options.Limit)
}
rows, err := s.q().QueryContext(ctx, `
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
select `+s.threadSelectColumns(ctx, "")+`
from threads
where `+where+`
order by number`+limitSQL, args...)
@ -210,6 +260,66 @@ func scanThread(rows interface {
return thread, nil
}
func (s *Store) threadSelectColumns(ctx context.Context, alias string) string {
column := func(name string) string {
if alias == "" {
return name
}
return alias + "." + name
}
return strings.Join([]string{
column("id"),
column("repo_id"),
column("github_id"),
column("number"),
column("kind"),
column("state"),
column("title"),
s.threadBodyExpr(ctx, alias),
column("author_login"),
column("author_type"),
column("html_url"),
column("labels_json"),
column("assignees_json"),
s.threadRawJSONExpr(ctx, alias),
column("content_hash"),
column("is_draft"),
column("created_at_gh"),
column("updated_at_gh"),
column("closed_at_gh"),
column("merged_at_gh"),
column("first_pulled_at"),
column("last_pulled_at"),
column("updated_at"),
column("closed_at_local"),
column("close_reason_local"),
}, ", ")
}
func (s *Store) threadBodyExpr(ctx context.Context, alias string) string {
if s.hasColumn(ctx, "threads", "body") {
return qualifiedColumn(alias, "body")
}
if s.hasColumn(ctx, "threads", "body_excerpt") {
return qualifiedColumn(alias, "body_excerpt")
}
return "''"
}
func (s *Store) threadRawJSONExpr(ctx context.Context, alias string) string {
if s.hasColumn(ctx, "threads", "raw_json") {
return qualifiedColumn(alias, "raw_json")
}
return "''"
}
func qualifiedColumn(alias, name string) string {
if alias == "" {
return name
}
return alias + "." + name
}
func boolInt(value bool) int {
if value {
return 1

View File

@ -63,6 +63,9 @@ func (s *Store) ListThreadVectors(ctx context.Context, repoID int64) ([]ThreadVe
}
func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVectorQuery) ([]ThreadVector, error) {
if !s.hasTable(ctx, "thread_vectors") {
return []ThreadVector{}, nil
}
where, args := threadVectorWhere(query)
rows, err := s.db.QueryContext(ctx, `
select tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
@ -96,13 +99,13 @@ func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVecto
}
func (s *Store) ThreadVectorByNumber(ctx context.Context, query ThreadVectorQuery, number int) (Thread, ThreadVector, error) {
if !s.hasTable(ctx, "thread_vectors") {
return Thread{}, ThreadVector{}, fmt.Errorf("thread #%d was not found with an embedding", number)
}
where, args := threadVectorWhere(query)
args = append(args, number)
row := s.db.QueryRowContext(ctx, `
select t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local,
select `+s.threadSelectColumns(ctx, "t")+`,
tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
from threads t
join thread_vectors tv on tv.thread_id = t.id
@ -131,10 +134,7 @@ func (s *Store) ThreadsByIDs(ctx context.Context, repoID int64, ids []int64) (ma
args = append(args, id)
}
rows, err := s.db.QueryContext(ctx, `
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
select `+s.threadSelectColumns(ctx, "")+`
from threads
where repo_id = ? and id in (`+strings.Join(placeholders, ",")+`)
`, args...)

View File

@ -45,6 +45,7 @@ type Stats struct {
IssuesSynced int `json:"issues_synced"`
PullRequestsSynced int `json:"pull_requests_synced"`
CommentsSynced int `json:"comments_synced"`
ThreadsClosed int `json:"threads_closed"`
RequestedSince string `json:"requested_since,omitempty"`
Limit int `json:"limit,omitempty"`
MetadataOnly bool `json:"metadata_only"`
@ -129,11 +130,18 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
stats.IssuesSynced++
}
}
if state == "open" && since != "" && options.Limit <= 0 {
closed, err := s.applyClosedOverlapSweep(ctx, st, repoID, options, since)
if err != nil {
return err
}
stats.ThreadsClosed = closed
}
stats.FinishedAt = s.now().Format(time.RFC3339Nano)
if _, err := st.RecordRun(ctx, store.RunRecord{
RepoID: repoID,
Kind: "sync",
Scope: "open",
Scope: state,
Status: "success",
StartedAt: stats.StartedAt,
FinishedAt: stats.FinishedAt,
@ -158,7 +166,7 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
func normalizeState(value string) (string, error) {
value = strings.TrimSpace(strings.ToLower(value))
if value == "" {
return "all", nil
return "open", nil
}
switch value {
case "open", "closed", "all":
@ -168,6 +176,31 @@ func normalizeState(value string) (string, error) {
}
}
func (s *Syncer) applyClosedOverlapSweep(ctx context.Context, st *store.Store, repoID int64, options Options, since string) (int, error) {
rows, err := s.client.ListRepositoryIssues(ctx, options.Owner, options.Repo, gh.ListIssuesOptions{
State: "closed",
Since: since,
}, options.Reporter)
if err != nil {
return 0, err
}
closed := 0
for _, row := range rows {
thread := mapIssueToThread(repoID, row, s.now().Format(time.RFC3339Nano))
updated, err := st.MarkOpenThreadClosedFromGitHub(ctx, thread)
if err != nil {
return 0, err
}
if updated {
closed++
}
}
if closed > 0 {
options.Reporter.Printf("[sync] closed overlap sweep matched %d stale open thread(s)", closed)
}
return closed, nil
}
func normalizeSince(value string, now time.Time) (string, error) {
value = strings.TrimSpace(value)
if value == "" {

View File

@ -17,6 +17,9 @@ func (fakeGitHub) GetRepo(ctx context.Context, owner, repo string, reporter gh.R
}
func (fakeGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
if options.State == "closed" {
return nil, nil
}
return []map[string]any{
{
"id": 1,
@ -89,6 +92,30 @@ func (f *stateCaptureGitHub) ListRepositoryIssues(ctx context.Context, owner, re
return nil, nil
}
type closedSweepGitHub struct {
fakeGitHub
}
func (closedSweepGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
if options.State == "closed" {
return []map[string]any{{
"id": 1,
"number": 7,
"state": "closed",
"title": "download stalls",
"body": "large file download stalls",
"html_url": "https://github.com/openclaw/gitcrawl/issues/7",
"created_at": "2026-04-26T00:00:00Z",
"updated_at": "2026-04-27T00:00:00Z",
"closed_at": "2026-04-27T00:00:00Z",
"labels": []map[string]any{{"name": "bug"}},
"assignees": []map[string]any{},
"user": map[string]any{"login": "vincentkoc", "type": "User"},
}}, nil
}
return nil, nil
}
func TestSyncPersistsIssuesAndPullRequests(t *testing.T) {
ctx := context.Background()
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
@ -189,7 +216,7 @@ func TestSyncPassesRequestedState(t *testing.T) {
}
}
func TestSyncDefaultsToAllStates(t *testing.T) {
func TestSyncDefaultsToOpenState(t *testing.T) {
ctx := context.Background()
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
@ -202,8 +229,8 @@ func TestSyncDefaultsToAllStates(t *testing.T) {
if _, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl"}); err != nil {
t.Fatalf("sync: %v", err)
}
if client.state != "all" {
t.Fatalf("default state = %q, want all", client.state)
if client.state != "open" {
t.Fatalf("default state = %q, want open", client.state)
}
}
@ -220,3 +247,61 @@ func TestSyncRejectsInvalidState(t *testing.T) {
t.Fatal("expected invalid state to fail")
}
}
func TestSyncOpenSinceAppliesClosedOverlapSweep(t *testing.T) {
ctx := context.Background()
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, store.Repository{
Owner: "openclaw",
Name: "gitcrawl",
FullName: "openclaw/gitcrawl",
RawJSON: "{}",
UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("seed repo: %v", err)
}
if _, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "1",
Number: 7,
Kind: "issue",
State: "open",
Title: "download stalls",
Body: "large file download stalls",
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "old",
CreatedAtGitHub: "2026-04-26T00:00:00Z",
UpdatedAtGitHub: "2026-04-26T00:00:00Z",
FirstPulledAt: "2026-04-26T00:00:00Z",
LastPulledAt: "2026-04-26T00:00:00Z",
UpdatedAt: "2026-04-26T00:00:00Z",
}); err != nil {
t.Fatalf("seed thread: %v", err)
}
s := New(closedSweepGitHub{}, st)
s.now = func() time.Time { return time.Date(2026, 4, 27, 1, 0, 0, 0, time.UTC) }
stats, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl", Since: "1h"})
if err != nil {
t.Fatalf("sync: %v", err)
}
if stats.ThreadsClosed != 1 {
t.Fatalf("threads closed = %d, want 1", stats.ThreadsClosed)
}
threads, err := st.ListThreads(ctx, repoID, true)
if err != nil {
t.Fatalf("threads: %v", err)
}
if len(threads) != 1 || threads[0].State != "closed" || threads[0].ClosedAtGitHub == "" {
t.Fatalf("thread not closed from overlap sweep: %#v", threads)
}
}