Compare commits
3 Commits
main
...
fix/canoni
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6e9f4356b0 | ||
|
|
832cd09dc0 | ||
|
|
85ce5ced12 |
@ -16,7 +16,7 @@ gitcrawl doctor
|
||||
gitcrawl sync owner/repo
|
||||
gitcrawl sync owner/repo --state open
|
||||
gitcrawl refresh owner/repo
|
||||
gitcrawl cluster owner/repo --threshold 0.82
|
||||
gitcrawl cluster owner/repo --threshold 0.80
|
||||
gitcrawl clusters owner/repo
|
||||
gitcrawl durable-clusters owner/repo
|
||||
gitcrawl cluster-detail owner/repo --id 123
|
||||
@ -35,9 +35,9 @@ gitcrawl tui owner/repo
|
||||
```
|
||||
|
||||
`gitcrawl clusters`, `gitcrawl durable-clusters`, and `gitcrawl tui` show active primary cluster memberships by default. Pass `--include-closed` to inspect closed rows and historical secondary memberships.
|
||||
`gitcrawl cluster` and `gitcrawl refresh` build bounded nearest-neighbor clusters by default (`--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`) and add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges also need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
|
||||
`gitcrawl cluster` and `gitcrawl refresh` build ghcrawl-shaped durable clusters by default (`--threshold 0.80`, `--min-size 1`, `--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`): every active vector-backed thread is represented, singleton rows use `singleton_orphan`, multi-member rows use `duplicate_candidate`, and stable IDs are derived from the representative thread. They also add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
|
||||
`gitcrawl tui` infers the most recently updated local repository when `owner/repo` is omitted. `serve` is intentionally not part of `gitcrawl`.
|
||||
`gitcrawl sync` fetches issues and pull requests in every GitHub state by default. Pass `--state open` or `--state closed` to limit a sync to one state.
|
||||
`gitcrawl sync` fetches open issues and pull requests by default. Pass `--state all` or `--state closed` for explicit backfill workflows; incremental open syncs with `--since` also sweep recently closed items so local open state does not rot.
|
||||
The TUI starts at `--min-size 5` so maintainer-significant active clusters are visible first; pass `--min-size 1` to include singletons. Mouse support is built in: click rows, wheel panes, and right-click for copy, sort, filter, jump, link, neighbor, local close/reopen, and member triage actions. Press `a` to open the same action menu from the keyboard, `#` to jump directly to an issue or PR number, `p` to switch between repositories already present in the local store, or `n` to load neighbors for the selected issue or PR. Enter from the members pane also loads neighbors before opening detail. The TUI quietly refreshes from the local store every 15 seconds.
|
||||
|
||||
## Local Defaults
|
||||
|
||||
@ -2,8 +2,6 @@ package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
@ -32,6 +30,7 @@ const (
|
||||
defaultTUIWorkingSetLimit = 500
|
||||
defaultClusterMaxSize = 40
|
||||
defaultClusterFanout = 16
|
||||
defaultClusterThreshold = 0.80
|
||||
defaultCrossKindMinScore = 0.93
|
||||
highConfidenceEdgeScore = 0.90
|
||||
weakEdgeMinTitleOverlap = 0.18
|
||||
@ -248,10 +247,10 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
|
||||
includeComments := fs.Bool("include-comments", false, "hydrate comments during sync")
|
||||
fs.Bool("include-code", false, "accepted for compatibility; code hydration is not implemented yet")
|
||||
since := fs.String("since", "", "GitHub since timestamp")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
|
||||
limitRaw := fs.String("limit", "", "maximum sync or embedding rows")
|
||||
thresholdRaw := fs.String("threshold", "0.82", "minimum cluster cosine score")
|
||||
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
|
||||
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cluster cosine score")
|
||||
minSizeRaw := fs.String("min-size", "1", "minimum cluster member count")
|
||||
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
|
||||
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
|
||||
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
|
||||
@ -545,8 +544,8 @@ func (a *App) runNeighbors(ctx context.Context, args []string) error {
|
||||
func (a *App) runCluster(ctx context.Context, args []string) error {
|
||||
fs := flag.NewFlagSet("cluster", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score")
|
||||
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
|
||||
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cosine score")
|
||||
minSizeRaw := fs.String("min-size", "1", "minimum cluster member count")
|
||||
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
|
||||
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
|
||||
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
|
||||
@ -726,7 +725,7 @@ func (a *App) embedRepository(ctx context.Context, owner, repoName string, optio
|
||||
if batchSize <= 0 {
|
||||
batchSize = 64
|
||||
}
|
||||
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL()})
|
||||
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL(), Dimensions: rt.Config.OpenAI.EmbedDimensions})
|
||||
for start := 0; start < len(tasks); start += batchSize {
|
||||
end := start + batchSize
|
||||
if end > len(tasks) {
|
||||
@ -1484,7 +1483,7 @@ func (a *App) runSync(ctx context.Context, args []string) error {
|
||||
fs := flag.NewFlagSet("sync", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
since := fs.String("since", "", "GitHub since timestamp")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
|
||||
limitRaw := fs.String("limit", "", "maximum issue/PR rows")
|
||||
jsonOut := fs.Bool("json", false, "write JSON output")
|
||||
includeComments := fs.Bool("include-comments", false, "hydrate issue comments, PR reviews, and PR review comments")
|
||||
@ -1936,7 +1935,7 @@ func parseClusterShapeOptions(command, maxClusterSizeRaw, fanoutRaw, crossKindTh
|
||||
|
||||
func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, options clusterBuildOptions) ([]store.DurableClusterInput, int, error) {
|
||||
if options.MinSize <= 0 {
|
||||
options.MinSize = 2
|
||||
options.MinSize = 1
|
||||
}
|
||||
if options.MaxClusterSize <= 0 {
|
||||
options.MaxClusterSize = defaultClusterMaxSize
|
||||
@ -2004,19 +2003,26 @@ func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int6
|
||||
right := threads[builtCluster.Members[j]]
|
||||
return left.Number < right.Number
|
||||
})
|
||||
rep := threads[builtCluster.RepresentativeThreadID]
|
||||
identity := store.HumanKeyForValue(fmt.Sprintf("repo:%d:cluster-representative:%d", repoID, builtCluster.RepresentativeThreadID))
|
||||
clusterType := "duplicate_candidate"
|
||||
if len(builtCluster.Members) == 1 {
|
||||
clusterType = "singleton_orphan"
|
||||
}
|
||||
input := store.DurableClusterInput{
|
||||
StableKey: durableClusterStableKey(builtCluster.Members, threads),
|
||||
StableSlug: durableClusterSlug(builtCluster.Members, threads),
|
||||
StableKey: identity.Hash,
|
||||
StableSlug: store.HumanKeyStableSlug(identity),
|
||||
ClusterType: clusterType,
|
||||
RepresentativeThreadID: builtCluster.RepresentativeThreadID,
|
||||
Title: rep.Title,
|
||||
Title: "Cluster " + identity.Slug,
|
||||
Members: make([]store.DurableClusterMemberInput, 0, len(builtCluster.Members)),
|
||||
}
|
||||
for _, threadID := range builtCluster.Members {
|
||||
role := "member"
|
||||
role := "related"
|
||||
var scorePtr *float64
|
||||
if threadID == builtCluster.RepresentativeThreadID {
|
||||
role = "representative"
|
||||
role = "canonical"
|
||||
scoreCopy := 1.0
|
||||
scorePtr = &scoreCopy
|
||||
} else if score, ok := pairScores[threadIDPairKey(threadID, builtCluster.RepresentativeThreadID)]; ok {
|
||||
scoreCopy := score
|
||||
scorePtr = &scoreCopy
|
||||
@ -2193,23 +2199,6 @@ func clusterRepository(ctx context.Context, st *store.Store, repoID int64, store
|
||||
}, nil
|
||||
}
|
||||
|
||||
func durableClusterStableKey(threadIDs []int64, threads map[int64]store.Thread) string {
|
||||
parts := make([]string, 0, len(threadIDs))
|
||||
for _, id := range threadIDs {
|
||||
if thread, ok := threads[id]; ok && thread.Number > 0 {
|
||||
parts = append(parts, strconv.Itoa(thread.Number))
|
||||
continue
|
||||
}
|
||||
parts = append(parts, strconv.FormatInt(id, 10))
|
||||
}
|
||||
return "numbers:" + strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
func durableClusterSlug(threadIDs []int64, threads map[int64]store.Thread) string {
|
||||
sum := sha256.Sum256([]byte(durableClusterStableKey(threadIDs, threads)))
|
||||
return "cluster-" + hex.EncodeToString(sum[:])[:12]
|
||||
}
|
||||
|
||||
func threadIDPairKey(left, right int64) string {
|
||||
if left > right {
|
||||
left, right = right, left
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -851,7 +852,7 @@ func TestClusterCommandPersistsDurableClusters(t *testing.T) {
|
||||
if err := run.Run(ctx, []string{"--config", configPath, "cluster", "openclaw/openclaw", "--threshold", "0.90", "--json"}); err != nil {
|
||||
t.Fatalf("cluster: %v", err)
|
||||
}
|
||||
if !strings.Contains(stdout.String(), `"cluster_count": 1`) {
|
||||
if !strings.Contains(stdout.String(), `"cluster_count": 2`) {
|
||||
t.Fatalf("cluster output = %q", stdout.String())
|
||||
}
|
||||
st, err = store.Open(ctx, dbPath)
|
||||
@ -863,8 +864,13 @@ func TestClusterCommandPersistsDurableClusters(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("list clusters: %v", err)
|
||||
}
|
||||
if len(clusters) != 1 || clusters[0].MemberCount != 2 {
|
||||
t.Fatalf("expected one durable cluster, got %#v", clusters)
|
||||
memberCounts := []int{}
|
||||
for _, cluster := range clusters {
|
||||
memberCounts = append(memberCounts, cluster.MemberCount)
|
||||
}
|
||||
sort.Ints(memberCounts)
|
||||
if len(memberCounts) != 2 || memberCounts[0] != 1 || memberCounts[1] != 2 {
|
||||
t.Fatalf("expected duplicate cluster plus singleton, got %#v", clusters)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1321,7 +1327,7 @@ func TestRefreshEmbedsAndClustersWithoutSync(t *testing.T) {
|
||||
if !strings.Contains(out, `"embedded": 3`) {
|
||||
t.Fatalf("refresh did not embed rows: %q", out)
|
||||
}
|
||||
if !strings.Contains(out, `"cluster_count": 1`) {
|
||||
if !strings.Contains(out, `"cluster_count": 2`) {
|
||||
t.Fatalf("refresh did not persist cluster: %q", out)
|
||||
}
|
||||
|
||||
@ -1334,8 +1340,13 @@ func TestRefreshEmbedsAndClustersWithoutSync(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("list clusters: %v", err)
|
||||
}
|
||||
if len(clusters) != 1 || clusters[0].MemberCount != 2 {
|
||||
t.Fatalf("expected one durable cluster, got %#v", clusters)
|
||||
memberCounts := []int{}
|
||||
for _, cluster := range clusters {
|
||||
memberCounts = append(memberCounts, cluster.MemberCount)
|
||||
}
|
||||
sort.Ints(memberCounts)
|
||||
if len(memberCounts) != 2 || memberCounts[0] != 1 || memberCounts[1] != 2 {
|
||||
t.Fatalf("expected duplicate cluster plus singleton, got %#v", clusters)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -32,11 +32,12 @@ type GitHubConfig struct {
|
||||
}
|
||||
|
||||
type OpenAIConfig struct {
|
||||
APIKeyEnv string `toml:"api_key_env"`
|
||||
SummaryModel string `toml:"summary_model"`
|
||||
EmbedModel string `toml:"embed_model"`
|
||||
BatchSize int `toml:"batch_size"`
|
||||
Concurrency int `toml:"concurrency"`
|
||||
APIKeyEnv string `toml:"api_key_env"`
|
||||
SummaryModel string `toml:"summary_model"`
|
||||
EmbedModel string `toml:"embed_model"`
|
||||
EmbedDimensions int `toml:"embed_dimensions"`
|
||||
BatchSize int `toml:"batch_size"`
|
||||
Concurrency int `toml:"concurrency"`
|
||||
}
|
||||
|
||||
type TUIConfig struct {
|
||||
@ -62,11 +63,12 @@ func Default() Config {
|
||||
TokenEnv: DefaultTokenEnv,
|
||||
},
|
||||
OpenAI: OpenAIConfig{
|
||||
APIKeyEnv: DefaultOpenAIEnv,
|
||||
SummaryModel: "gpt-5.4",
|
||||
EmbedModel: "text-embedding-3-small",
|
||||
BatchSize: 64,
|
||||
Concurrency: 2,
|
||||
APIKeyEnv: DefaultOpenAIEnv,
|
||||
SummaryModel: "gpt-5.4",
|
||||
EmbedModel: "text-embedding-3-small",
|
||||
EmbedDimensions: 1024,
|
||||
BatchSize: 64,
|
||||
Concurrency: 2,
|
||||
},
|
||||
TUI: TUIConfig{
|
||||
DefaultSort: "recent",
|
||||
@ -154,6 +156,9 @@ func (c *Config) Normalize() error {
|
||||
if c.OpenAI.EmbedModel == "" {
|
||||
c.OpenAI.EmbedModel = envOrDefault("GITCRAWL_EMBED_MODEL", def.OpenAI.EmbedModel)
|
||||
}
|
||||
if c.OpenAI.EmbedDimensions <= 0 {
|
||||
c.OpenAI.EmbedDimensions = def.OpenAI.EmbedDimensions
|
||||
}
|
||||
if c.OpenAI.BatchSize <= 0 {
|
||||
c.OpenAI.BatchSize = def.OpenAI.BatchSize
|
||||
}
|
||||
|
||||
@ -20,17 +20,20 @@ type Client struct {
|
||||
apiKey string
|
||||
baseURL string
|
||||
httpClient *http.Client
|
||||
dimensions int
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
APIKey string
|
||||
BaseURL string
|
||||
Dimensions int
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
type embeddingRequest struct {
|
||||
Model string `json:"model"`
|
||||
Input []string `json:"input"`
|
||||
Model string `json:"model"`
|
||||
Input []string `json:"input"`
|
||||
Dimensions int `json:"dimensions,omitempty"`
|
||||
}
|
||||
|
||||
type embeddingResponse struct {
|
||||
@ -57,6 +60,7 @@ func New(options Options) *Client {
|
||||
apiKey: strings.TrimSpace(options.APIKey),
|
||||
baseURL: baseURL,
|
||||
httpClient: httpClient,
|
||||
dimensions: options.Dimensions,
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +75,7 @@ func (c *Client) Embed(ctx context.Context, model string, texts []string) ([][]f
|
||||
if c.apiKey == "" {
|
||||
return nil, fmt.Errorf("OpenAI API key is required")
|
||||
}
|
||||
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts})
|
||||
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts, Dimensions: c.dimensions})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal embeddings request: %w", err)
|
||||
}
|
||||
|
||||
@ -14,6 +14,9 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
|
||||
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
||||
t.Fatalf("decode request: %v", err)
|
||||
}
|
||||
if request.Dimensions != 1024 {
|
||||
t.Fatalf("dimensions = %d, want 1024", request.Dimensions)
|
||||
}
|
||||
response := embeddingResponse{}
|
||||
for index := range request.Input {
|
||||
vector := make([]float64, 1536)
|
||||
@ -36,7 +39,7 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
|
||||
for index := range inputs {
|
||||
inputs[index] = "thread text"
|
||||
}
|
||||
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL}).Embed(context.Background(), "text-embedding-3-small", inputs)
|
||||
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL, Dimensions: 1024}).Embed(context.Background(), "text-embedding-3-small", inputs)
|
||||
if err != nil {
|
||||
t.Fatalf("embed: %v", err)
|
||||
}
|
||||
|
||||
@ -72,6 +72,7 @@ type ClusterMemberOverride struct {
|
||||
type DurableClusterInput struct {
|
||||
StableKey string
|
||||
StableSlug string
|
||||
ClusterType string
|
||||
RepresentativeThreadID int64
|
||||
Title string
|
||||
Members []DurableClusterMemberInput
|
||||
@ -464,10 +465,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO
|
||||
args = append(args, limit)
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select cm.role, cm.state, cm.score_to_representative,
|
||||
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
|
||||
`+s.threadSelectColumns(ctx, "t")+`
|
||||
from cluster_memberships cm
|
||||
join cluster_groups cg on cg.id = cm.cluster_id
|
||||
join threads t on t.id = cm.thread_id
|
||||
@ -526,10 +524,7 @@ func (s *Store) RunClusterDetail(ctx context.Context, options ClusterDetailOptio
|
||||
select case when t.id = c.representative_thread_id then 'representative' else 'member' end as role,
|
||||
'active' as state,
|
||||
cm.score_to_representative,
|
||||
t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
|
||||
`+s.threadSelectColumns(ctx, "t")+`
|
||||
from cluster_members cm
|
||||
join clusters c on c.id = cm.cluster_id and c.cluster_run_id = ?
|
||||
join threads t on t.id = cm.thread_id
|
||||
@ -731,6 +726,15 @@ func (s *Store) SaveDurableClusters(ctx context.Context, repoID int64, inputs []
|
||||
return err
|
||||
}
|
||||
}
|
||||
if len(inputs) > 0 {
|
||||
if _, err := tx.q().ExecContext(ctx, `
|
||||
delete from cluster_groups
|
||||
where repo_id = ?
|
||||
and cluster_type = 'similarity'
|
||||
`, repoID); err != nil {
|
||||
return fmt.Errorf("delete legacy similarity clusters: %w", err)
|
||||
}
|
||||
}
|
||||
if _, err := tx.q().ExecContext(ctx, `
|
||||
update cluster_runs
|
||||
set finished_at = ?, stats_json = ?
|
||||
@ -951,12 +955,16 @@ func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, i
|
||||
if stableSlug == "" {
|
||||
stableSlug = stableKey
|
||||
}
|
||||
clusterType := strings.TrimSpace(input.ClusterType)
|
||||
if clusterType == "" {
|
||||
clusterType = "duplicate_candidate"
|
||||
}
|
||||
var clusterID int64
|
||||
if err := s.q().QueryRowContext(ctx, `
|
||||
insert into cluster_groups(
|
||||
repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at
|
||||
)
|
||||
values(?, ?, ?, 'active', 'similarity', ?, ?, ?, ?)
|
||||
values(?, ?, ?, 'active', ?, ?, ?, ?, ?)
|
||||
on conflict(repo_id, stable_key) do update set
|
||||
stable_slug = excluded.stable_slug,
|
||||
cluster_type = excluded.cluster_type,
|
||||
@ -967,7 +975,7 @@ func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, i
|
||||
title = excluded.title,
|
||||
updated_at = excluded.updated_at
|
||||
returning id
|
||||
`, repoID, stableKey, stableSlug, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil {
|
||||
`, repoID, stableKey, stableSlug, clusterType, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil {
|
||||
return 0, fmt.Errorf("upsert durable cluster: %w", err)
|
||||
}
|
||||
if _, err := s.q().ExecContext(ctx, `
|
||||
@ -1167,6 +1175,9 @@ func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64,
|
||||
}
|
||||
|
||||
func (s *Store) latestRawClusterRunID(ctx context.Context, repoID int64) (int64, bool, error) {
|
||||
if !s.hasTable(ctx, "cluster_runs") || !s.hasTable(ctx, "clusters") {
|
||||
return 0, false, nil
|
||||
}
|
||||
row := s.db.QueryRowContext(ctx, `
|
||||
select cr.id
|
||||
from cluster_runs cr
|
||||
@ -1292,23 +1303,46 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
|
||||
placeholders = append(placeholders, "?")
|
||||
args = append(args, threadID)
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select thread_id, summary_kind, summary_text
|
||||
from document_summaries
|
||||
where thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by thread_id, summary_kind, updated_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select document summaries: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make(map[int64]map[string]string)
|
||||
if s.hasTable(ctx, "document_summaries") {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select thread_id, summary_kind, summary_text
|
||||
from document_summaries
|
||||
where thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by thread_id, summary_kind, updated_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select document summaries: %w", err)
|
||||
}
|
||||
if err := scanSummaryRows(rows, out, "document summary"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if s.hasTable(ctx, "thread_key_summaries") && s.hasTable(ctx, "thread_revisions") {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select tr.thread_id, tks.summary_kind, tks.key_text
|
||||
from thread_key_summaries tks
|
||||
join thread_revisions tr on tr.id = tks.thread_revision_id
|
||||
where tr.thread_id in (`+strings.Join(placeholders, ",")+`)
|
||||
order by tr.thread_id, tks.summary_kind, tks.created_at desc
|
||||
`, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select thread key summaries: %w", err)
|
||||
}
|
||||
if err := scanSummaryRows(rows, out, "thread key summary"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func scanSummaryRows(rows *sql.Rows, out map[int64]map[string]string, source string) error {
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var threadID int64
|
||||
var kind, text string
|
||||
if err := rows.Scan(&threadID, &kind, &text); err != nil {
|
||||
return nil, fmt.Errorf("scan document summary: %w", err)
|
||||
return fmt.Errorf("scan %s: %w", source, err)
|
||||
}
|
||||
if out[threadID] == nil {
|
||||
out[threadID] = map[string]string{}
|
||||
@ -1318,9 +1352,9 @@ func (s *Store) summariesByThreadIDs(ctx context.Context, threadIDs []int64) (ma
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("iterate document summaries: %w", err)
|
||||
return fmt.Errorf("iterate %s rows: %w", source, err)
|
||||
}
|
||||
return out, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func snippetRunes(value string, limit int) string {
|
||||
|
||||
@ -48,10 +48,19 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
args = append(args, options.Limit)
|
||||
}
|
||||
rows, err := s.q().QueryContext(ctx, `
|
||||
select t.id, t.number, t.kind, t.title, coalesce(d.body, ''), coalesce(d.raw_text, ''), coalesce(d.dedupe_text, ''),
|
||||
select t.id, t.number, t.kind, t.title, coalesce(d.body, t.body, ''), coalesce(d.raw_text, t.body, ''), coalesce(d.dedupe_text, t.title || ' ' || coalesce(t.body, '')),
|
||||
coalesce((
|
||||
select tks.key_text
|
||||
from thread_key_summaries tks
|
||||
join thread_revisions tr on tr.id = tks.thread_revision_id
|
||||
where tr.thread_id = t.id
|
||||
and tks.summary_kind in ('llm_key_summary', 'llm_key_3line')
|
||||
order by tks.created_at desc, tr.created_at desc, tks.id desc
|
||||
limit 1
|
||||
), ''),
|
||||
coalesce(tv.content_hash, '')
|
||||
from threads t
|
||||
join documents d on d.thread_id = t.id
|
||||
left join documents d on d.thread_id = t.id
|
||||
left join thread_vectors tv on tv.thread_id = t.id and tv.basis = ? and tv.model = ?
|
||||
where `+strings.Join(where, " and ")+`
|
||||
order by coalesce(t.updated_at_gh, t.updated_at) desc, t.number desc`+limitSQL,
|
||||
@ -64,14 +73,17 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
out := make([]EmbeddingTask, 0)
|
||||
for rows.Next() {
|
||||
var task EmbeddingTask
|
||||
var body, rawText, dedupeText, existingHash string
|
||||
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &existingHash); err != nil {
|
||||
var body, rawText, dedupeText, keySummary, existingHash string
|
||||
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &keySummary, &existingHash); err != nil {
|
||||
return nil, fmt.Errorf("scan embedding task: %w", err)
|
||||
}
|
||||
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText)
|
||||
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText, keySummary)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(text) == "" {
|
||||
continue
|
||||
}
|
||||
task.Text = text
|
||||
task.ContentHash = embeddingContentHash(basis, model, text)
|
||||
if !options.Force && existingHash == task.ContentHash {
|
||||
@ -85,7 +97,7 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (string, error) {
|
||||
func embeddingTextForBasis(basis, title, body, rawText, dedupeText, keySummary string) (string, error) {
|
||||
switch basis {
|
||||
case "", "title_original":
|
||||
parts := []string{strings.TrimSpace(title)}
|
||||
@ -97,6 +109,12 @@ func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (stri
|
||||
return strings.TrimSpace(strings.Join(parts, "\n\n")), nil
|
||||
case "dedupe_text":
|
||||
return strings.TrimSpace(dedupeText), nil
|
||||
case "llm_key_summary":
|
||||
keySummary = strings.TrimSpace(keySummary)
|
||||
if keySummary == "" {
|
||||
return "", nil
|
||||
}
|
||||
return strings.TrimSpace("title: " + strings.TrimSpace(title) + "\n\nkey_summary:\n" + keySummary), nil
|
||||
default:
|
||||
return "", fmt.Errorf("embedding basis %q is not supported yet", basis)
|
||||
}
|
||||
|
||||
69
internal/store/embedding_tasks_test.go
Normal file
69
internal/store/embedding_tasks_test.go
Normal file
@ -0,0 +1,69 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestListEmbeddingTasksUsesLatestLLMKeySummary(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
repoID, err := st.UpsertRepository(ctx, Repository{
|
||||
Owner: "openclaw",
|
||||
Name: "gitcrawl",
|
||||
FullName: "openclaw/gitcrawl",
|
||||
RawJSON: "{}",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("repo: %v", err)
|
||||
}
|
||||
threadID, err := st.UpsertThread(ctx, Thread{
|
||||
RepoID: repoID,
|
||||
GitHubID: "1",
|
||||
Number: 7,
|
||||
Kind: "issue",
|
||||
State: "open",
|
||||
Title: "Download stalls",
|
||||
Body: "Large download stalls near completion.",
|
||||
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
LabelsJSON: "[]",
|
||||
AssigneesJSON: "[]",
|
||||
RawJSON: "{}",
|
||||
ContentHash: "hash",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("thread: %v", err)
|
||||
}
|
||||
if _, err := st.DB().ExecContext(ctx, `
|
||||
insert into thread_revisions(id, thread_id, content_hash, title_hash, body_hash, labels_hash, created_at)
|
||||
values(1, ?, 'hash', 'title', 'body', 'labels', '2026-04-26T00:00:00Z');
|
||||
insert into thread_key_summaries(thread_revision_id, summary_kind, prompt_version, provider, model, input_hash, output_hash, key_text, created_at)
|
||||
values(1, 'llm_key_3line', 'v1', 'openai', 'gpt-5-mini', 'input', 'output', 'intent: fix downloads\nsurface: downloader\nmechanism: retry stalled stream', '2026-04-26T00:01:00Z');
|
||||
`, threadID); err != nil {
|
||||
t.Fatalf("seed summary: %v", err)
|
||||
}
|
||||
|
||||
tasks, err := st.ListEmbeddingTasks(ctx, EmbeddingTaskOptions{
|
||||
RepoID: repoID,
|
||||
Basis: "llm_key_summary",
|
||||
Model: "text-embedding-3-large",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("tasks: %v", err)
|
||||
}
|
||||
if len(tasks) != 1 {
|
||||
t.Fatalf("tasks = %d, want 1", len(tasks))
|
||||
}
|
||||
if !strings.Contains(tasks[0].Text, "title: Download stalls") || !strings.Contains(tasks[0].Text, "key_summary:") {
|
||||
t.Fatalf("unexpected embedding text: %q", tasks[0].Text)
|
||||
}
|
||||
}
|
||||
@ -2,7 +2,10 @@ package store
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var humanKeyWords = []string{
|
||||
@ -44,10 +47,51 @@ func clusterHumanName(repoID, representativeThreadID, clusterID int64) string {
|
||||
if representativeThreadID != 0 {
|
||||
key = fmt.Sprintf("repo:%d:cluster-representative:%d", repoID, representativeThreadID)
|
||||
}
|
||||
hash := sha256.Sum256([]byte(key))
|
||||
return fmt.Sprintf("%s-%s-%s",
|
||||
humanKeyWords[int(hash[0])%len(humanKeyWords)],
|
||||
humanKeyWords[int(hash[1])%len(humanKeyWords)],
|
||||
humanKeyWords[int(hash[2])%len(humanKeyWords)],
|
||||
)
|
||||
return HumanKeyForValue(key).Slug
|
||||
}
|
||||
|
||||
type HumanKey struct {
|
||||
Hash string
|
||||
Slug string
|
||||
Checksum string
|
||||
}
|
||||
|
||||
func StableHash(value string) string {
|
||||
sum := sha256.Sum256([]byte(value))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func HumanKeyForValue(value string) HumanKey {
|
||||
return HumanKeyFromHash(StableHash(value))
|
||||
}
|
||||
|
||||
func HumanKeyFromHash(hash string) HumanKey {
|
||||
normalized := strings.ToLower(hash)
|
||||
index := func(offset int) int {
|
||||
value, err := strconv.ParseInt(normalized[offset:offset+2], 16, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return int(value) % len(humanKeyWords)
|
||||
}
|
||||
checksumValue, err := strconv.ParseInt(normalized[6:12], 16, 64)
|
||||
checksum := "0000"
|
||||
if err == nil {
|
||||
checksum = strconv.FormatInt(checksumValue, 36)
|
||||
if len(checksum) < 4 {
|
||||
checksum = strings.Repeat("0", 4-len(checksum)) + checksum
|
||||
}
|
||||
if len(checksum) > 4 {
|
||||
checksum = checksum[len(checksum)-4:]
|
||||
}
|
||||
}
|
||||
return HumanKey{
|
||||
Hash: normalized,
|
||||
Slug: fmt.Sprintf("%s-%s-%s", humanKeyWords[index(0)], humanKeyWords[index(2)], humanKeyWords[index(4)]),
|
||||
Checksum: checksum,
|
||||
}
|
||||
}
|
||||
|
||||
func HumanKeyStableSlug(key HumanKey) string {
|
||||
return key.Slug + "-" + key.Checksum
|
||||
}
|
||||
|
||||
13
internal/store/human_key_test.go
Normal file
13
internal/store/human_key_test.go
Normal file
@ -0,0 +1,13 @@
|
||||
package store
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestHumanKeyForValueMatchesGhcrawlRepresentativeIdentity(t *testing.T) {
|
||||
key := HumanKeyForValue("repo:1:cluster-representative:546")
|
||||
if key.Hash != "e77f18999d72cc6d27c5d3d0aa2c02cdc8cad3c1be077feb70062bc55eae98fd" {
|
||||
t.Fatalf("hash = %q", key.Hash)
|
||||
}
|
||||
if HumanKeyStableSlug(key) != "usage-matrix-binary-zrzm" {
|
||||
t.Fatalf("stable slug = %q", HumanKeyStableSlug(key))
|
||||
}
|
||||
}
|
||||
@ -5,6 +5,8 @@ import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PortablePruneOptions struct {
|
||||
@ -13,16 +15,18 @@ type PortablePruneOptions struct {
|
||||
}
|
||||
|
||||
type PortablePruneStats struct {
|
||||
DBPath string `json:"db_path"`
|
||||
BodyChars int `json:"body_chars"`
|
||||
BytesBefore int64 `json:"bytes_before"`
|
||||
BytesAfter int64 `json:"bytes_after"`
|
||||
ThreadsPruned int64 `json:"threads_pruned"`
|
||||
RepositoriesPruned int64 `json:"repositories_pruned"`
|
||||
FingerprintsPruned int64 `json:"fingerprints_pruned"`
|
||||
DocumentsDeleted int64 `json:"documents_deleted"`
|
||||
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
|
||||
Vacuumed bool `json:"vacuumed"`
|
||||
DBPath string `json:"db_path"`
|
||||
BodyChars int `json:"body_chars"`
|
||||
BytesBefore int64 `json:"bytes_before"`
|
||||
BytesAfter int64 `json:"bytes_after"`
|
||||
ThreadsPruned int64 `json:"threads_pruned"`
|
||||
RepositoriesPruned int64 `json:"repositories_pruned"`
|
||||
FingerprintsPruned int64 `json:"fingerprints_pruned"`
|
||||
DocumentsDeleted int64 `json:"documents_deleted"`
|
||||
DocumentsFTSRebuilt bool `json:"documents_fts_rebuilt"`
|
||||
DroppedTables []string `json:"dropped_tables,omitempty"`
|
||||
DroppedColumns []string `json:"dropped_columns,omitempty"`
|
||||
Vacuumed bool `json:"vacuumed"`
|
||||
}
|
||||
|
||||
func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePruneOptions) (PortablePruneStats, error) {
|
||||
@ -107,6 +111,9 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
|
||||
}
|
||||
stats.DocumentsFTSRebuilt = true
|
||||
}
|
||||
if err := s.canonicalizePortableSchema(ctx, options.BodyChars, &stats); err != nil {
|
||||
return stats, err
|
||||
}
|
||||
if options.Vacuum {
|
||||
if _, err := s.db.ExecContext(ctx, `pragma wal_checkpoint(TRUNCATE)`); err != nil {
|
||||
return stats, fmt.Errorf("checkpoint wal: %w", err)
|
||||
@ -121,6 +128,110 @@ func (s *Store) PrunePortablePayloads(ctx context.Context, options PortablePrune
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (s *Store) canonicalizePortableSchema(ctx context.Context, bodyChars int, stats *PortablePruneStats) error {
|
||||
if s.hasColumn(ctx, "threads", "body") && !s.hasColumn(ctx, "threads", "body_excerpt") {
|
||||
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_excerpt text`); err != nil {
|
||||
return fmt.Errorf("add portable threads.body_excerpt: %w", err)
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
update threads
|
||||
set body_excerpt = case when length(body) > ? then substr(body, 1, ?) else body end
|
||||
where body is not null
|
||||
`, bodyChars, bodyChars); err != nil {
|
||||
return fmt.Errorf("backfill portable body excerpts: %w", err)
|
||||
}
|
||||
}
|
||||
if !s.hasColumn(ctx, "threads", "body_length") {
|
||||
if _, err := s.db.ExecContext(ctx, `alter table threads add column body_length integer not null default 0`); err != nil {
|
||||
return fmt.Errorf("add portable threads.body_length: %w", err)
|
||||
}
|
||||
}
|
||||
for _, column := range []struct {
|
||||
table string
|
||||
name string
|
||||
}{
|
||||
{table: "repositories", name: "raw_json"},
|
||||
{table: "threads", name: "raw_json"},
|
||||
{table: "threads", name: "body"},
|
||||
} {
|
||||
if !s.hasColumn(ctx, column.table, column.name) {
|
||||
continue
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `alter table `+sqliteIdentifier(column.table)+` drop column `+sqliteIdentifier(column.name)); err != nil {
|
||||
return fmt.Errorf("drop portable column %s.%s: %w", column.table, column.name, err)
|
||||
}
|
||||
stats.DroppedColumns = append(stats.DroppedColumns, column.table+"."+column.name)
|
||||
}
|
||||
for _, table := range canonicalPortableDroppedTables() {
|
||||
if !s.tableExists(ctx, table) {
|
||||
continue
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `drop table if exists `+sqliteIdentifier(table)); err != nil {
|
||||
return fmt.Errorf("drop portable table %s: %w", table, err)
|
||||
}
|
||||
stats.DroppedTables = append(stats.DroppedTables, table)
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
create table if not exists portable_metadata (
|
||||
key text primary key,
|
||||
value text not null
|
||||
)
|
||||
`); err != nil {
|
||||
return fmt.Errorf("ensure portable metadata: %w", err)
|
||||
}
|
||||
metadata := map[string]string{
|
||||
"schema": "ghcrawl-portable-sync-v1",
|
||||
"body_chars": fmt.Sprintf("%d", bodyChars),
|
||||
"excluded": "raw_json,comments,documents,fts,vectors,code_snapshots,cluster_events,run_history,similarity_edges,blobs",
|
||||
"exported_at": time.Now().UTC().Format(timeLayout),
|
||||
"source_path": s.path,
|
||||
}
|
||||
for key, value := range metadata {
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
insert into portable_metadata(key, value)
|
||||
values(?, ?)
|
||||
on conflict(key) do update set value = excluded.value
|
||||
`, key, value); err != nil {
|
||||
return fmt.Errorf("write portable metadata %s: %w", key, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func canonicalPortableDroppedTables() []string {
|
||||
return []string{
|
||||
"documents_fts",
|
||||
"documents_fts_config",
|
||||
"documents_fts_data",
|
||||
"documents_fts_docsize",
|
||||
"documents_fts_idx",
|
||||
"comments",
|
||||
"documents",
|
||||
"document_embeddings",
|
||||
"document_summaries",
|
||||
"thread_vectors",
|
||||
"thread_code_snapshots",
|
||||
"thread_changed_files",
|
||||
"thread_hunk_signatures",
|
||||
"cluster_events",
|
||||
"cluster_members",
|
||||
"clusters",
|
||||
"sync_runs",
|
||||
"summary_runs",
|
||||
"embedding_runs",
|
||||
"cluster_runs",
|
||||
"similarity_edges",
|
||||
"blobs",
|
||||
}
|
||||
}
|
||||
|
||||
func sqliteIdentifier(value string) string {
|
||||
if value == "" || strings.ContainsAny(value, "\"\x00") {
|
||||
panic(fmt.Sprintf("unsafe SQLite identifier: %q", value))
|
||||
}
|
||||
return `"` + value + `"`
|
||||
}
|
||||
|
||||
func (s *Store) tableExists(ctx context.Context, table string) bool {
|
||||
var name string
|
||||
err := s.db.QueryRowContext(ctx, `select name from sqlite_master where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)
|
||||
|
||||
@ -43,7 +43,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
|
||||
var githubRepoID sql.NullString
|
||||
var rawJSON sql.NullString
|
||||
err := s.q().QueryRowContext(ctx, `
|
||||
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
|
||||
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
|
||||
from repositories
|
||||
where full_name = ?
|
||||
`, fullName).Scan(&repo.ID, &repo.Owner, &repo.Name, &repo.FullName, &githubRepoID, &rawJSON, &repo.UpdatedAt)
|
||||
@ -57,7 +57,7 @@ func (s *Store) RepositoryByFullName(ctx context.Context, fullName string) (Repo
|
||||
|
||||
func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, owner, name, full_name, github_repo_id, raw_json, updated_at
|
||||
select id, owner, name, full_name, github_repo_id, `+s.repositoryRawJSONExpr(ctx)+`, updated_at
|
||||
from repositories
|
||||
order by coalesce(updated_at, '') desc, id desc
|
||||
`)
|
||||
@ -84,6 +84,13 @@ func (s *Store) ListRepositories(ctx context.Context) ([]Repository, error) {
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
func (s *Store) repositoryRawJSONExpr(ctx context.Context) string {
|
||||
if s.hasColumn(ctx, "repositories", "raw_json") {
|
||||
return "raw_json"
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func nullString(value string) sql.NullString {
|
||||
return sql.NullString{String: value, Valid: value != ""}
|
||||
}
|
||||
|
||||
@ -214,6 +214,20 @@ create table if not exists thread_fingerprints (
|
||||
unique(thread_revision_id, algorithm_version)
|
||||
);
|
||||
|
||||
create table if not exists thread_key_summaries (
|
||||
id integer primary key,
|
||||
thread_revision_id integer not null references thread_revisions(id) on delete cascade,
|
||||
summary_kind text not null,
|
||||
prompt_version text not null,
|
||||
provider text not null,
|
||||
model text not null,
|
||||
input_hash text not null,
|
||||
output_hash text not null,
|
||||
key_text text not null,
|
||||
created_at text not null,
|
||||
unique(thread_revision_id, summary_kind, prompt_version, provider, model)
|
||||
);
|
||||
|
||||
create table if not exists sync_runs (
|
||||
id integer primary key,
|
||||
repo_id integer references repositories(id) on delete cascade,
|
||||
|
||||
@ -71,14 +71,15 @@ func (s *Store) searchThreads(ctx context.Context, repoID int64, query string, l
|
||||
return nil, nil
|
||||
}
|
||||
pattern := "%" + escapeLike(needle) + "%"
|
||||
bodyExpr := s.threadBodyExpr(ctx, "")
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, number, kind, state, title, html_url, author_login,
|
||||
coalesce(nullif(body, ''), title)
|
||||
coalesce(nullif(`+bodyExpr+`, ''), title)
|
||||
from threads
|
||||
where repo_id = ?
|
||||
and (
|
||||
lower(title) like ? escape '\'
|
||||
or lower(coalesce(body, '')) like ? escape '\'
|
||||
or lower(coalesce(`+bodyExpr+`, '')) like ? escape '\'
|
||||
)
|
||||
order by coalesce(updated_at_gh, updated_at) desc, number desc
|
||||
limit ?
|
||||
|
||||
@ -155,8 +155,24 @@ func (s *Store) Status(ctx context.Context) (Status, error) {
|
||||
return Status{}, fmt.Errorf("count clusters: %w", err)
|
||||
}
|
||||
var lastSync string
|
||||
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read last sync: %w", err)
|
||||
if s.hasTable(ctx, "sync_runs") {
|
||||
if err := s.db.QueryRowContext(ctx, `select coalesce(max(finished_at), '') from sync_runs where status in ('success', 'completed')`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read last sync: %w", err)
|
||||
}
|
||||
} else if s.hasTable(ctx, "repo_sync_state") {
|
||||
if err := s.db.QueryRowContext(ctx, `
|
||||
select coalesce(
|
||||
max(last_open_close_reconciled_at),
|
||||
max(last_overlapping_open_scan_completed_at),
|
||||
max(last_non_overlapping_scan_completed_at),
|
||||
max(last_full_open_scan_started_at),
|
||||
max(updated_at),
|
||||
''
|
||||
)
|
||||
from repo_sync_state
|
||||
`).Scan(&lastSync); err != nil {
|
||||
return Status{}, fmt.Errorf("read portable sync state: %w", err)
|
||||
}
|
||||
}
|
||||
if lastSync != "" {
|
||||
parsed, err := time.Parse(timeLayout, lastSync)
|
||||
@ -206,6 +222,12 @@ func (s *Store) ensureLegacyPortableColumns(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) hasTable(ctx context.Context, table string) bool {
|
||||
var name string
|
||||
err := s.db.QueryRowContext(ctx, `select name from sqlite_schema where type in ('table', 'virtual table') and name = ?`, table).Scan(&name)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *Store) ensureColumn(ctx context.Context, table, column, definition string) error {
|
||||
if s.hasColumn(ctx, table, column) {
|
||||
return nil
|
||||
|
||||
@ -90,6 +90,132 @@ func TestOpenReadOnlyDoesNotMutateStore(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenReadOnlySupportsCanonicalPortableStore(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbPath := filepath.Join(t.TempDir(), "portable.db")
|
||||
db, err := sql.Open("sqlite", dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open seed db: %v", err)
|
||||
}
|
||||
_, err = db.ExecContext(ctx, `
|
||||
create table repositories (
|
||||
id integer primary key,
|
||||
owner text not null,
|
||||
name text not null,
|
||||
full_name text not null,
|
||||
github_repo_id text,
|
||||
updated_at text not null
|
||||
);
|
||||
create table threads (
|
||||
id integer primary key,
|
||||
repo_id integer not null,
|
||||
github_id text not null,
|
||||
number integer not null,
|
||||
kind text not null,
|
||||
state text not null,
|
||||
title text not null,
|
||||
body_excerpt text,
|
||||
body_length integer not null default 0,
|
||||
author_login text,
|
||||
author_type text,
|
||||
html_url text not null,
|
||||
labels_json text not null,
|
||||
assignees_json text not null,
|
||||
content_hash text not null,
|
||||
is_draft integer not null default 0,
|
||||
created_at_gh text,
|
||||
updated_at_gh text,
|
||||
closed_at_gh text,
|
||||
merged_at_gh text,
|
||||
first_pulled_at text,
|
||||
last_pulled_at text,
|
||||
updated_at text not null,
|
||||
closed_at_local text,
|
||||
close_reason_local text
|
||||
);
|
||||
create table repo_sync_state (
|
||||
repo_id integer primary key,
|
||||
last_full_open_scan_started_at text,
|
||||
last_overlapping_open_scan_completed_at text,
|
||||
last_non_overlapping_scan_completed_at text,
|
||||
last_open_close_reconciled_at text,
|
||||
updated_at text not null
|
||||
);
|
||||
create table cluster_groups (
|
||||
id integer primary key,
|
||||
repo_id integer not null,
|
||||
stable_key text not null,
|
||||
stable_slug text not null,
|
||||
status text not null,
|
||||
cluster_type text not null,
|
||||
representative_thread_id integer,
|
||||
title text,
|
||||
created_at text not null,
|
||||
updated_at text not null,
|
||||
closed_at text
|
||||
);
|
||||
insert into repositories(id, owner, name, full_name, updated_at)
|
||||
values(1, 'openclaw', 'openclaw', 'openclaw/openclaw', '2026-04-28T00:00:00Z');
|
||||
insert into threads(id, repo_id, github_id, number, kind, state, title, body_excerpt, html_url, labels_json, assignees_json, content_hash, updated_at)
|
||||
values(1, 1, '1', 42, 'issue', 'open', 'portable issue', 'portable body', 'https://github.com/openclaw/openclaw/issues/42', '[]', '[]', 'hash', '2026-04-28T00:00:00Z');
|
||||
insert into repo_sync_state(repo_id, last_open_close_reconciled_at, updated_at)
|
||||
values(1, '2026-04-28T01:02:03Z', '2026-04-28T01:02:03Z');
|
||||
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at)
|
||||
values(1, 1, 'stable', 'stable', 'active', 'similarity', 1, 'portable cluster', '2026-04-28T00:00:00Z', '2026-04-28T00:00:00Z');
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatalf("seed portable db: %v", err)
|
||||
}
|
||||
if err := db.Close(); err != nil {
|
||||
t.Fatalf("close seed db: %v", err)
|
||||
}
|
||||
before, err := os.ReadFile(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read db before: %v", err)
|
||||
}
|
||||
|
||||
st, err := OpenReadOnly(ctx, dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("open readonly portable: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
status, err := st.Status(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("portable status: %v", err)
|
||||
}
|
||||
if status.RepositoryCount != 1 || status.ThreadCount != 1 || status.OpenThreadCount != 1 || status.ClusterCount != 1 {
|
||||
t.Fatalf("unexpected portable status: %#v", status)
|
||||
}
|
||||
if status.LastSyncAt.IsZero() {
|
||||
t.Fatalf("portable last sync was not read from repo_sync_state: %#v", status)
|
||||
}
|
||||
repo, err := st.RepositoryByFullName(ctx, "openclaw/openclaw")
|
||||
if err != nil {
|
||||
t.Fatalf("portable repository: %v", err)
|
||||
}
|
||||
if repo.RawJSON != "" {
|
||||
t.Fatalf("portable raw json: got %q want empty", repo.RawJSON)
|
||||
}
|
||||
threads, err := st.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repo.ID, Numbers: []int{42}})
|
||||
if err != nil {
|
||||
t.Fatalf("portable threads: %v", err)
|
||||
}
|
||||
if len(threads) != 1 || threads[0].Body != "portable body" || threads[0].RawJSON != "" {
|
||||
t.Fatalf("unexpected portable thread: %#v", threads)
|
||||
}
|
||||
if err := st.Close(); err != nil {
|
||||
t.Fatalf("close portable readonly: %v", err)
|
||||
}
|
||||
after, err := os.ReadFile(dbPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read db after: %v", err)
|
||||
}
|
||||
if !bytes.Equal(after, before) {
|
||||
t.Fatal("readonly portable open mutated database bytes")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenMigratesPortableStoreColumns(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
dbPath := filepath.Join(t.TempDir(), "portable.db")
|
||||
@ -252,21 +378,26 @@ func TestPrunePortablePayloads(t *testing.T) {
|
||||
t.Fatalf("unexpected stats: %#v", stats)
|
||||
}
|
||||
|
||||
var repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features string
|
||||
var documentCount int
|
||||
if err := st.DB().QueryRowContext(ctx, `select raw_json from repositories where id = 1`).Scan(&repoRaw); err != nil {
|
||||
t.Fatalf("repo raw: %v", err)
|
||||
var bodyExcerpt, titleTokens, linkedRefs, buckets, features string
|
||||
if st.hasColumn(ctx, "repositories", "raw_json") {
|
||||
t.Fatal("repositories.raw_json was not dropped")
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select body, raw_json from threads where id = 1`).Scan(&body, &threadRaw); err != nil {
|
||||
t.Fatalf("thread payload: %v", err)
|
||||
if st.hasColumn(ctx, "threads", "raw_json") {
|
||||
t.Fatal("threads.raw_json was not dropped")
|
||||
}
|
||||
if st.hasColumn(ctx, "threads", "body") {
|
||||
t.Fatal("threads.body was not dropped")
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select body_excerpt from threads where id = 1`).Scan(&bodyExcerpt); err != nil {
|
||||
t.Fatalf("thread body excerpt: %v", err)
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select title_tokens_json, linked_refs_json, module_buckets_json, feature_json from thread_fingerprints where id = 1`).Scan(&titleTokens, &linkedRefs, &buckets, &features); err != nil {
|
||||
t.Fatalf("fingerprint payload: %v", err)
|
||||
}
|
||||
if err := st.DB().QueryRowContext(ctx, `select count(*) from documents`).Scan(&documentCount); err != nil {
|
||||
t.Fatalf("document count: %v", err)
|
||||
if st.tableExists(ctx, "documents") {
|
||||
t.Fatal("documents table was not dropped")
|
||||
}
|
||||
if repoRaw != "" || body != "abcdefgh" || threadRaw != "" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" || documentCount != 0 {
|
||||
t.Fatalf("payloads not pruned: repoRaw=%q body=%q threadRaw=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q documents=%d", repoRaw, body, threadRaw, titleTokens, linkedRefs, buckets, features, documentCount)
|
||||
if bodyExcerpt != "abcdefgh" || titleTokens != "[]" || linkedRefs != "[]" || buckets != "[]" || features != "{}" {
|
||||
t.Fatalf("payloads not pruned: bodyExcerpt=%q titleTokens=%q linkedRefs=%q buckets=%q features=%q", bodyExcerpt, titleTokens, linkedRefs, buckets, features)
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,6 +77,59 @@ func (s *Store) UpsertThread(ctx context.Context, thread Thread) (int64, error)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Store) MarkOpenThreadClosedFromGitHub(ctx context.Context, thread Thread) (bool, error) {
|
||||
if thread.RepoID <= 0 {
|
||||
return false, fmt.Errorf("repo id must be positive")
|
||||
}
|
||||
if thread.Number <= 0 {
|
||||
return false, fmt.Errorf("thread number must be positive")
|
||||
}
|
||||
if thread.Kind == "" {
|
||||
return false, fmt.Errorf("thread kind is required")
|
||||
}
|
||||
if thread.State == "" {
|
||||
thread.State = "closed"
|
||||
}
|
||||
result, err := s.q().ExecContext(ctx, `
|
||||
update threads
|
||||
set github_id = ?,
|
||||
state = ?,
|
||||
title = ?,
|
||||
body = ?,
|
||||
author_login = ?,
|
||||
author_type = ?,
|
||||
html_url = ?,
|
||||
labels_json = ?,
|
||||
assignees_json = ?,
|
||||
raw_json = ?,
|
||||
content_hash = ?,
|
||||
is_draft = ?,
|
||||
created_at_gh = ?,
|
||||
updated_at_gh = ?,
|
||||
closed_at_gh = ?,
|
||||
merged_at_gh = ?,
|
||||
last_pulled_at = ?,
|
||||
updated_at = ?
|
||||
where repo_id = ?
|
||||
and kind = ?
|
||||
and number = ?
|
||||
and state = 'open'
|
||||
and closed_at_local is null
|
||||
`, thread.GitHubID, thread.State, thread.Title, nullString(thread.Body), nullString(thread.AuthorLogin),
|
||||
nullString(thread.AuthorType), thread.HTMLURL, thread.LabelsJSON, thread.AssigneesJSON, thread.RawJSON,
|
||||
thread.ContentHash, boolInt(thread.IsDraft), nullString(thread.CreatedAtGitHub), nullString(thread.UpdatedAtGitHub),
|
||||
nullString(thread.ClosedAtGitHub), nullString(thread.MergedAtGitHub), nullString(thread.LastPulledAt), thread.UpdatedAt,
|
||||
thread.RepoID, thread.Kind, thread.Number)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("mark open thread closed from github: %w", err)
|
||||
}
|
||||
affected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return affected > 0, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListThreads(ctx context.Context, repoID int64, includeClosed bool) ([]Thread, error) {
|
||||
return s.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repoID, IncludeClosed: includeClosed})
|
||||
}
|
||||
@ -108,10 +161,7 @@ func (s *Store) ListThreadsFiltered(ctx context.Context, options ThreadListOptio
|
||||
args = append(args, options.Limit)
|
||||
}
|
||||
rows, err := s.q().QueryContext(ctx, `
|
||||
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
|
||||
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
|
||||
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
|
||||
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
|
||||
select `+s.threadSelectColumns(ctx, "")+`
|
||||
from threads
|
||||
where `+where+`
|
||||
order by number`+limitSQL, args...)
|
||||
@ -210,6 +260,66 @@ func scanThread(rows interface {
|
||||
return thread, nil
|
||||
}
|
||||
|
||||
func (s *Store) threadSelectColumns(ctx context.Context, alias string) string {
|
||||
column := func(name string) string {
|
||||
if alias == "" {
|
||||
return name
|
||||
}
|
||||
return alias + "." + name
|
||||
}
|
||||
return strings.Join([]string{
|
||||
column("id"),
|
||||
column("repo_id"),
|
||||
column("github_id"),
|
||||
column("number"),
|
||||
column("kind"),
|
||||
column("state"),
|
||||
column("title"),
|
||||
s.threadBodyExpr(ctx, alias),
|
||||
column("author_login"),
|
||||
column("author_type"),
|
||||
column("html_url"),
|
||||
column("labels_json"),
|
||||
column("assignees_json"),
|
||||
s.threadRawJSONExpr(ctx, alias),
|
||||
column("content_hash"),
|
||||
column("is_draft"),
|
||||
column("created_at_gh"),
|
||||
column("updated_at_gh"),
|
||||
column("closed_at_gh"),
|
||||
column("merged_at_gh"),
|
||||
column("first_pulled_at"),
|
||||
column("last_pulled_at"),
|
||||
column("updated_at"),
|
||||
column("closed_at_local"),
|
||||
column("close_reason_local"),
|
||||
}, ", ")
|
||||
}
|
||||
|
||||
func (s *Store) threadBodyExpr(ctx context.Context, alias string) string {
|
||||
if s.hasColumn(ctx, "threads", "body") {
|
||||
return qualifiedColumn(alias, "body")
|
||||
}
|
||||
if s.hasColumn(ctx, "threads", "body_excerpt") {
|
||||
return qualifiedColumn(alias, "body_excerpt")
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func (s *Store) threadRawJSONExpr(ctx context.Context, alias string) string {
|
||||
if s.hasColumn(ctx, "threads", "raw_json") {
|
||||
return qualifiedColumn(alias, "raw_json")
|
||||
}
|
||||
return "''"
|
||||
}
|
||||
|
||||
func qualifiedColumn(alias, name string) string {
|
||||
if alias == "" {
|
||||
return name
|
||||
}
|
||||
return alias + "." + name
|
||||
}
|
||||
|
||||
func boolInt(value bool) int {
|
||||
if value {
|
||||
return 1
|
||||
|
||||
@ -63,6 +63,9 @@ func (s *Store) ListThreadVectors(ctx context.Context, repoID int64) ([]ThreadVe
|
||||
}
|
||||
|
||||
func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVectorQuery) ([]ThreadVector, error) {
|
||||
if !s.hasTable(ctx, "thread_vectors") {
|
||||
return []ThreadVector{}, nil
|
||||
}
|
||||
where, args := threadVectorWhere(query)
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
|
||||
@ -96,13 +99,13 @@ func (s *Store) ListThreadVectorsFiltered(ctx context.Context, query ThreadVecto
|
||||
}
|
||||
|
||||
func (s *Store) ThreadVectorByNumber(ctx context.Context, query ThreadVectorQuery, number int) (Thread, ThreadVector, error) {
|
||||
if !s.hasTable(ctx, "thread_vectors") {
|
||||
return Thread{}, ThreadVector{}, fmt.Errorf("thread #%d was not found with an embedding", number)
|
||||
}
|
||||
where, args := threadVectorWhere(query)
|
||||
args = append(args, number)
|
||||
row := s.db.QueryRowContext(ctx, `
|
||||
select t.id, t.repo_id, t.github_id, t.number, t.kind, t.state, t.title, t.body, t.author_login, t.author_type,
|
||||
t.html_url, t.labels_json, t.assignees_json, t.raw_json, t.content_hash, t.is_draft,
|
||||
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
|
||||
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local,
|
||||
select `+s.threadSelectColumns(ctx, "t")+`,
|
||||
tv.thread_id, tv.basis, tv.model, tv.dimensions, tv.content_hash, tv.vector_json, tv.vector_backend, tv.created_at, tv.updated_at
|
||||
from threads t
|
||||
join thread_vectors tv on tv.thread_id = t.id
|
||||
@ -131,10 +134,7 @@ func (s *Store) ThreadsByIDs(ctx context.Context, repoID int64, ids []int64) (ma
|
||||
args = append(args, id)
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
select id, repo_id, github_id, number, kind, state, title, body, author_login, author_type,
|
||||
html_url, labels_json, assignees_json, raw_json, content_hash, is_draft,
|
||||
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
|
||||
first_pulled_at, last_pulled_at, updated_at, closed_at_local, close_reason_local
|
||||
select `+s.threadSelectColumns(ctx, "")+`
|
||||
from threads
|
||||
where repo_id = ? and id in (`+strings.Join(placeholders, ",")+`)
|
||||
`, args...)
|
||||
|
||||
@ -45,6 +45,7 @@ type Stats struct {
|
||||
IssuesSynced int `json:"issues_synced"`
|
||||
PullRequestsSynced int `json:"pull_requests_synced"`
|
||||
CommentsSynced int `json:"comments_synced"`
|
||||
ThreadsClosed int `json:"threads_closed"`
|
||||
RequestedSince string `json:"requested_since,omitempty"`
|
||||
Limit int `json:"limit,omitempty"`
|
||||
MetadataOnly bool `json:"metadata_only"`
|
||||
@ -129,11 +130,18 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
|
||||
stats.IssuesSynced++
|
||||
}
|
||||
}
|
||||
if state == "open" && since != "" && options.Limit <= 0 {
|
||||
closed, err := s.applyClosedOverlapSweep(ctx, st, repoID, options, since)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stats.ThreadsClosed = closed
|
||||
}
|
||||
stats.FinishedAt = s.now().Format(time.RFC3339Nano)
|
||||
if _, err := st.RecordRun(ctx, store.RunRecord{
|
||||
RepoID: repoID,
|
||||
Kind: "sync",
|
||||
Scope: "open",
|
||||
Scope: state,
|
||||
Status: "success",
|
||||
StartedAt: stats.StartedAt,
|
||||
FinishedAt: stats.FinishedAt,
|
||||
@ -158,7 +166,7 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
|
||||
func normalizeState(value string) (string, error) {
|
||||
value = strings.TrimSpace(strings.ToLower(value))
|
||||
if value == "" {
|
||||
return "all", nil
|
||||
return "open", nil
|
||||
}
|
||||
switch value {
|
||||
case "open", "closed", "all":
|
||||
@ -168,6 +176,31 @@ func normalizeState(value string) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) applyClosedOverlapSweep(ctx context.Context, st *store.Store, repoID int64, options Options, since string) (int, error) {
|
||||
rows, err := s.client.ListRepositoryIssues(ctx, options.Owner, options.Repo, gh.ListIssuesOptions{
|
||||
State: "closed",
|
||||
Since: since,
|
||||
}, options.Reporter)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
closed := 0
|
||||
for _, row := range rows {
|
||||
thread := mapIssueToThread(repoID, row, s.now().Format(time.RFC3339Nano))
|
||||
updated, err := st.MarkOpenThreadClosedFromGitHub(ctx, thread)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if updated {
|
||||
closed++
|
||||
}
|
||||
}
|
||||
if closed > 0 {
|
||||
options.Reporter.Printf("[sync] closed overlap sweep matched %d stale open thread(s)", closed)
|
||||
}
|
||||
return closed, nil
|
||||
}
|
||||
|
||||
func normalizeSince(value string, now time.Time) (string, error) {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
|
||||
@ -17,6 +17,9 @@ func (fakeGitHub) GetRepo(ctx context.Context, owner, repo string, reporter gh.R
|
||||
}
|
||||
|
||||
func (fakeGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
|
||||
if options.State == "closed" {
|
||||
return nil, nil
|
||||
}
|
||||
return []map[string]any{
|
||||
{
|
||||
"id": 1,
|
||||
@ -89,6 +92,30 @@ func (f *stateCaptureGitHub) ListRepositoryIssues(ctx context.Context, owner, re
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type closedSweepGitHub struct {
|
||||
fakeGitHub
|
||||
}
|
||||
|
||||
func (closedSweepGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
|
||||
if options.State == "closed" {
|
||||
return []map[string]any{{
|
||||
"id": 1,
|
||||
"number": 7,
|
||||
"state": "closed",
|
||||
"title": "download stalls",
|
||||
"body": "large file download stalls",
|
||||
"html_url": "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
"created_at": "2026-04-26T00:00:00Z",
|
||||
"updated_at": "2026-04-27T00:00:00Z",
|
||||
"closed_at": "2026-04-27T00:00:00Z",
|
||||
"labels": []map[string]any{{"name": "bug"}},
|
||||
"assignees": []map[string]any{},
|
||||
"user": map[string]any{"login": "vincentkoc", "type": "User"},
|
||||
}}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestSyncPersistsIssuesAndPullRequests(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
@ -189,7 +216,7 @@ func TestSyncPassesRequestedState(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncDefaultsToAllStates(t *testing.T) {
|
||||
func TestSyncDefaultsToOpenState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
@ -202,8 +229,8 @@ func TestSyncDefaultsToAllStates(t *testing.T) {
|
||||
if _, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl"}); err != nil {
|
||||
t.Fatalf("sync: %v", err)
|
||||
}
|
||||
if client.state != "all" {
|
||||
t.Fatalf("default state = %q, want all", client.state)
|
||||
if client.state != "open" {
|
||||
t.Fatalf("default state = %q, want open", client.state)
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,3 +247,61 @@ func TestSyncRejectsInvalidState(t *testing.T) {
|
||||
t.Fatal("expected invalid state to fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncOpenSinceAppliesClosedOverlapSweep(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
repoID, err := st.UpsertRepository(ctx, store.Repository{
|
||||
Owner: "openclaw",
|
||||
Name: "gitcrawl",
|
||||
FullName: "openclaw/gitcrawl",
|
||||
RawJSON: "{}",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("seed repo: %v", err)
|
||||
}
|
||||
if _, err := st.UpsertThread(ctx, store.Thread{
|
||||
RepoID: repoID,
|
||||
GitHubID: "1",
|
||||
Number: 7,
|
||||
Kind: "issue",
|
||||
State: "open",
|
||||
Title: "download stalls",
|
||||
Body: "large file download stalls",
|
||||
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
LabelsJSON: "[]",
|
||||
AssigneesJSON: "[]",
|
||||
RawJSON: "{}",
|
||||
ContentHash: "old",
|
||||
CreatedAtGitHub: "2026-04-26T00:00:00Z",
|
||||
UpdatedAtGitHub: "2026-04-26T00:00:00Z",
|
||||
FirstPulledAt: "2026-04-26T00:00:00Z",
|
||||
LastPulledAt: "2026-04-26T00:00:00Z",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
}); err != nil {
|
||||
t.Fatalf("seed thread: %v", err)
|
||||
}
|
||||
|
||||
s := New(closedSweepGitHub{}, st)
|
||||
s.now = func() time.Time { return time.Date(2026, 4, 27, 1, 0, 0, 0, time.UTC) }
|
||||
stats, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl", Since: "1h"})
|
||||
if err != nil {
|
||||
t.Fatalf("sync: %v", err)
|
||||
}
|
||||
if stats.ThreadsClosed != 1 {
|
||||
t.Fatalf("threads closed = %d, want 1", stats.ThreadsClosed)
|
||||
}
|
||||
threads, err := st.ListThreads(ctx, repoID, true)
|
||||
if err != nil {
|
||||
t.Fatalf("threads: %v", err)
|
||||
}
|
||||
if len(threads) != 1 || threads[0].State != "closed" || threads[0].ClosedAtGitHub == "" {
|
||||
t.Fatalf("thread not closed from overlap sweep: %#v", threads)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user