diff --git a/internal/cli/app.go b/internal/cli/app.go index 64e619f..f8bc061 100644 --- a/internal/cli/app.go +++ b/internal/cli/app.go @@ -2,6 +2,8 @@ package cli import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "flag" @@ -10,10 +12,12 @@ import ( "os" "os/exec" "path/filepath" + "sort" "strconv" "strings" "time" + clusterer "github.com/openclaw/gitcrawl/internal/cluster" "github.com/openclaw/gitcrawl/internal/config" gh "github.com/openclaw/gitcrawl/internal/github" "github.com/openclaw/gitcrawl/internal/store" @@ -140,11 +144,13 @@ func (a *App) Run(ctx context.Context, args []string) error { return a.runClusterDetail(ctx, rest[1:]) case "neighbors": return a.runNeighbors(ctx, rest[1:]) + case "cluster": + return a.runCluster(ctx, rest[1:]) case "portable": return a.runPortable(ctx, rest[1:]) case "tui": return a.runTUI(ctx, rest[1:]) - case "refresh", "summarize", "key-summaries", "embed", "cluster", "cluster-experiment", "durable-clusters", "cluster-explain", "merge-clusters", "split-cluster", "export-sync", "import-sync", "validate-sync", "portable-size", "sync-status", "optimize", "completion": + case "refresh", "summarize", "key-summaries", "embed", "cluster-experiment", "durable-clusters", "cluster-explain", "merge-clusters", "split-cluster", "export-sync", "import-sync", "validate-sync", "portable-size", "sync-status", "optimize", "completion": _ = ctx return notImplemented(rest[0]) default: @@ -366,6 +372,91 @@ func (a *App) runNeighbors(ctx context.Context, args []string) error { }, true) } +func (a *App) runCluster(ctx context.Context, args []string) error { + fs := flag.NewFlagSet("cluster", flag.ContinueOnError) + fs.SetOutput(io.Discard) + thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score") + minSizeRaw := fs.String("min-size", "2", "minimum cluster member count") + limitRaw := fs.String("limit", "", "maximum vector rows to cluster") + model := fs.String("model", "", "embedding model") + basis := fs.String("basis", "", "embedding basis") + jsonOut := fs.Bool("json", false, "write JSON output") + if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"threshold": true, "min-size": true, "limit": true, "model": true, "basis": true})); err != nil { + return usageErr(err) + } + a.applyCommandJSON(*jsonOut) + if fs.NArg() != 1 { + return usageErr(fmt.Errorf("cluster requires owner/repo")) + } + owner, repoName, err := parseOwnerRepo(fs.Arg(0)) + if err != nil { + return usageErr(err) + } + threshold, err := parseOptionalFloat(*thresholdRaw) + if err != nil { + return usageErr(err) + } + if threshold <= 0 || threshold > 1 { + return usageErr(fmt.Errorf("cluster requires --threshold between 0 and 1")) + } + minSize, err := parseOptionalPositiveInt(*minSizeRaw) + if err != nil { + return usageErr(err) + } + if minSize <= 0 { + minSize = 2 + } + limit, err := parseOptionalPositiveInt(*limitRaw) + if err != nil { + return usageErr(err) + } + rt, err := a.openLocalRuntime(ctx) + if err != nil { + return err + } + defer rt.Store.Close() + repo, err := rt.repository(ctx, owner, repoName) + if err != nil { + return err + } + query := store.ThreadVectorQuery{ + RepoID: repo.ID, + Model: firstNonEmpty(strings.TrimSpace(*model), rt.Config.OpenAI.EmbedModel), + Basis: firstNonEmpty(strings.TrimSpace(*basis), rt.Config.EmbeddingBasis), + } + vectors, err := rt.Store.ListThreadVectorsFiltered(ctx, query) + if err != nil { + return err + } + if len(vectors) == 0 && strings.TrimSpace(*model) == "" && strings.TrimSpace(*basis) == "" { + vectors, err = rt.Store.ListThreadVectorsFiltered(ctx, store.ThreadVectorQuery{RepoID: repo.ID}) + if err != nil { + return err + } + } + if limit > 0 && len(vectors) > limit { + vectors = vectors[:limit] + } + inputs, edgeCount, err := buildDurableClusterInputs(ctx, rt.Store, repo.ID, vectors, threshold, minSize) + if err != nil { + return err + } + saveResult, err := rt.Store.SaveDurableClusters(ctx, repo.ID, inputs) + if err != nil { + return err + } + return a.writeOutput("cluster", map[string]any{ + "repository": repo.FullName, + "threshold": threshold, + "min_size": minSize, + "vector_count": len(vectors), + "edge_count": edgeCount, + "cluster_count": saveResult.ClusterCount, + "member_count": saveResult.MemberCount, + "run_id": saveResult.RunID, + }, true) +} + func (a *App) runClusters(ctx context.Context, args []string) error { fs := flag.NewFlagSet("clusters", flag.ContinueOnError) fs.SetOutput(io.Discard) @@ -1400,6 +1491,97 @@ func parseClusterMemberCommandIDs(command, clusterIDRaw, numberRaw string) (int, return clusterID, number, nil } +func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, threshold float64, minSize int) ([]store.DurableClusterInput, int, error) { + if minSize <= 0 { + minSize = 2 + } + threadIDs := make([]int64, 0, len(storedVectors)) + vectorByThreadID := make(map[int64][]float64, len(storedVectors)) + for _, stored := range storedVectors { + threadIDs = append(threadIDs, stored.ThreadID) + vectorByThreadID[stored.ThreadID] = stored.Vector + } + threads, err := st.ThreadsByIDs(ctx, repoID, threadIDs) + if err != nil { + return nil, 0, err + } + nodes := make([]clusterer.Node, 0, len(storedVectors)) + for _, stored := range storedVectors { + thread, ok := threads[stored.ThreadID] + if !ok { + continue + } + nodes = append(nodes, clusterer.Node{ThreadID: stored.ThreadID, Number: thread.Number, Title: thread.Title}) + } + edges := make([]clusterer.Edge, 0) + pairScores := map[string]float64{} + for left := 0; left < len(nodes); left++ { + for right := left + 1; right < len(nodes); right++ { + leftID := nodes[left].ThreadID + rightID := nodes[right].ThreadID + score := vector.Cosine(vectorByThreadID[leftID], vectorByThreadID[rightID]) + if score < threshold { + continue + } + edges = append(edges, clusterer.Edge{LeftThreadID: leftID, RightThreadID: rightID, Score: score}) + pairScores[threadIDPairKey(leftID, rightID)] = score + } + } + built := clusterer.Build(nodes, edges) + inputs := make([]store.DurableClusterInput, 0, len(built)) + for _, builtCluster := range built { + if len(builtCluster.Members) < minSize { + continue + } + sort.Slice(builtCluster.Members, func(i, j int) bool { + left := threads[builtCluster.Members[i]] + right := threads[builtCluster.Members[j]] + return left.Number < right.Number + }) + rep := threads[builtCluster.RepresentativeThreadID] + input := store.DurableClusterInput{ + StableKey: durableClusterStableKey(builtCluster.Members), + StableSlug: durableClusterSlug(builtCluster.Members), + RepresentativeThreadID: builtCluster.RepresentativeThreadID, + Title: rep.Title, + Members: make([]store.DurableClusterMemberInput, 0, len(builtCluster.Members)), + } + for _, threadID := range builtCluster.Members { + role := "member" + var scorePtr *float64 + if threadID == builtCluster.RepresentativeThreadID { + role = "representative" + } else if score, ok := pairScores[threadIDPairKey(threadID, builtCluster.RepresentativeThreadID)]; ok { + scoreCopy := score + scorePtr = &scoreCopy + } + input.Members = append(input.Members, store.DurableClusterMemberInput{ThreadID: threadID, Role: role, ScoreToRepresentative: scorePtr}) + } + inputs = append(inputs, input) + } + return inputs, len(edges), nil +} + +func durableClusterStableKey(threadIDs []int64) string { + parts := make([]string, 0, len(threadIDs)) + for _, id := range threadIDs { + parts = append(parts, strconv.FormatInt(id, 10)) + } + return "members:" + strings.Join(parts, ",") +} + +func durableClusterSlug(threadIDs []int64) string { + sum := sha256.Sum256([]byte(durableClusterStableKey(threadIDs))) + return "cluster-" + hex.EncodeToString(sum[:])[:12] +} + +func threadIDPairKey(left, right int64) string { + if left > right { + left, right = right, left + } + return strconv.FormatInt(left, 10) + ":" + strconv.FormatInt(right, 10) +} + func parseOptionalFloat(value string) (float64, error) { if strings.TrimSpace(value) == "" { return 0, nil diff --git a/internal/cli/app_test.go b/internal/cli/app_test.go index 7618c7f..c36396a 100644 --- a/internal/cli/app_test.go +++ b/internal/cli/app_test.go @@ -499,6 +499,116 @@ func TestClusterMemberOverrideCommands(t *testing.T) { } } +func TestClusterCommandPersistsDurableClusters(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + configPath := filepath.Join(dir, "config.toml") + dbPath := filepath.Join(dir, "gitcrawl.db") + app := New() + if err := app.Run(ctx, []string{"--config", configPath, "init", "--db", dbPath}); err != nil { + t.Fatalf("init: %v", err) + } + st, err := store.Open(ctx, dbPath) + if err != nil { + t.Fatalf("open store: %v", err) + } + repoID, err := st.UpsertRepository(ctx, store.Repository{ + Owner: "openclaw", + Name: "openclaw", + FullName: "openclaw/openclaw", + UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), + }) + if err != nil { + t.Fatalf("seed repository: %v", err) + } + firstID, err := st.UpsertThread(ctx, store.Thread{ + RepoID: repoID, + GitHubID: "91", + Number: 91, + Kind: "issue", + State: "open", + Title: "First duplicate", + HTMLURL: "https://github.com/openclaw/openclaw/issues/91", + LabelsJSON: "[]", + AssigneesJSON: "[]", + RawJSON: "{}", + ContentHash: "hash-91", + UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), + }) + if err != nil { + t.Fatalf("seed first thread: %v", err) + } + secondID, err := st.UpsertThread(ctx, store.Thread{ + RepoID: repoID, + GitHubID: "92", + Number: 92, + Kind: "issue", + State: "open", + Title: "Second duplicate", + HTMLURL: "https://github.com/openclaw/openclaw/issues/92", + LabelsJSON: "[]", + AssigneesJSON: "[]", + RawJSON: "{}", + ContentHash: "hash-92", + UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), + }) + if err != nil { + t.Fatalf("seed second thread: %v", err) + } + thirdID, err := st.UpsertThread(ctx, store.Thread{ + RepoID: repoID, + GitHubID: "93", + Number: 93, + Kind: "issue", + State: "open", + Title: "Unrelated issue", + HTMLURL: "https://github.com/openclaw/openclaw/issues/93", + LabelsJSON: "[]", + AssigneesJSON: "[]", + RawJSON: "{}", + ContentHash: "hash-93", + UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano), + }) + if err != nil { + t.Fatalf("seed third thread: %v", err) + } + now := time.Now().UTC().Format(time.RFC3339Nano) + for _, vector := range []store.ThreadVector{ + {ThreadID: firstID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-91", Vector: []float64{1, 0}, CreatedAt: now, UpdatedAt: now}, + {ThreadID: secondID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-92", Vector: []float64{0.95, 0.05}, CreatedAt: now, UpdatedAt: now}, + {ThreadID: thirdID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-93", Vector: []float64{0, 1}, CreatedAt: now, UpdatedAt: now}, + } { + if err := st.UpsertThreadVector(ctx, vector); err != nil { + t.Fatalf("upsert vector: %v", err) + } + } + if err := st.Close(); err != nil { + t.Fatalf("close store: %v", err) + } + + run := New() + var stdout bytes.Buffer + run.Stdout = &stdout + if err := run.Run(ctx, []string{"--config", configPath, "cluster", "openclaw/openclaw", "--threshold", "0.90", "--json"}); err != nil { + t.Fatalf("cluster: %v", err) + } + if !strings.Contains(stdout.String(), `"cluster_count": 1`) { + t.Fatalf("cluster output = %q", stdout.String()) + } + st, err = store.Open(ctx, dbPath) + if err != nil { + t.Fatalf("reopen store: %v", err) + } + defer st.Close() + clusters, err := st.ListClusterSummaries(ctx, store.ClusterSummaryOptions{RepoID: repoID, IncludeClosed: false, MinSize: 1, Limit: 20}) + if err != nil { + t.Fatalf("list clusters: %v", err) + } + if len(clusters) != 1 || clusters[0].MemberCount != 2 { + t.Fatalf("expected one durable cluster, got %#v", clusters) + } +} + func TestTUIHelp(t *testing.T) { app := New() var stdout bytes.Buffer