feat: implement local cluster command

This commit is contained in:
Vincent Koc 2026-04-27 14:49:35 -07:00
parent 3da9d30dfd
commit efa1be066f
No known key found for this signature in database
2 changed files with 293 additions and 1 deletions

View File

@ -2,6 +2,8 @@ package cli
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"flag"
@ -10,10 +12,12 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
clusterer "github.com/openclaw/gitcrawl/internal/cluster"
"github.com/openclaw/gitcrawl/internal/config"
gh "github.com/openclaw/gitcrawl/internal/github"
"github.com/openclaw/gitcrawl/internal/store"
@ -140,11 +144,13 @@ func (a *App) Run(ctx context.Context, args []string) error {
return a.runClusterDetail(ctx, rest[1:])
case "neighbors":
return a.runNeighbors(ctx, rest[1:])
case "cluster":
return a.runCluster(ctx, rest[1:])
case "portable":
return a.runPortable(ctx, rest[1:])
case "tui":
return a.runTUI(ctx, rest[1:])
case "refresh", "summarize", "key-summaries", "embed", "cluster", "cluster-experiment", "durable-clusters", "cluster-explain", "merge-clusters", "split-cluster", "export-sync", "import-sync", "validate-sync", "portable-size", "sync-status", "optimize", "completion":
case "refresh", "summarize", "key-summaries", "embed", "cluster-experiment", "durable-clusters", "cluster-explain", "merge-clusters", "split-cluster", "export-sync", "import-sync", "validate-sync", "portable-size", "sync-status", "optimize", "completion":
_ = ctx
return notImplemented(rest[0])
default:
@ -366,6 +372,91 @@ func (a *App) runNeighbors(ctx context.Context, args []string) error {
}, true)
}
func (a *App) runCluster(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("cluster", flag.ContinueOnError)
fs.SetOutput(io.Discard)
thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score")
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
limitRaw := fs.String("limit", "", "maximum vector rows to cluster")
model := fs.String("model", "", "embedding model")
basis := fs.String("basis", "", "embedding basis")
jsonOut := fs.Bool("json", false, "write JSON output")
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"threshold": true, "min-size": true, "limit": true, "model": true, "basis": true})); err != nil {
return usageErr(err)
}
a.applyCommandJSON(*jsonOut)
if fs.NArg() != 1 {
return usageErr(fmt.Errorf("cluster requires owner/repo"))
}
owner, repoName, err := parseOwnerRepo(fs.Arg(0))
if err != nil {
return usageErr(err)
}
threshold, err := parseOptionalFloat(*thresholdRaw)
if err != nil {
return usageErr(err)
}
if threshold <= 0 || threshold > 1 {
return usageErr(fmt.Errorf("cluster requires --threshold between 0 and 1"))
}
minSize, err := parseOptionalPositiveInt(*minSizeRaw)
if err != nil {
return usageErr(err)
}
if minSize <= 0 {
minSize = 2
}
limit, err := parseOptionalPositiveInt(*limitRaw)
if err != nil {
return usageErr(err)
}
rt, err := a.openLocalRuntime(ctx)
if err != nil {
return err
}
defer rt.Store.Close()
repo, err := rt.repository(ctx, owner, repoName)
if err != nil {
return err
}
query := store.ThreadVectorQuery{
RepoID: repo.ID,
Model: firstNonEmpty(strings.TrimSpace(*model), rt.Config.OpenAI.EmbedModel),
Basis: firstNonEmpty(strings.TrimSpace(*basis), rt.Config.EmbeddingBasis),
}
vectors, err := rt.Store.ListThreadVectorsFiltered(ctx, query)
if err != nil {
return err
}
if len(vectors) == 0 && strings.TrimSpace(*model) == "" && strings.TrimSpace(*basis) == "" {
vectors, err = rt.Store.ListThreadVectorsFiltered(ctx, store.ThreadVectorQuery{RepoID: repo.ID})
if err != nil {
return err
}
}
if limit > 0 && len(vectors) > limit {
vectors = vectors[:limit]
}
inputs, edgeCount, err := buildDurableClusterInputs(ctx, rt.Store, repo.ID, vectors, threshold, minSize)
if err != nil {
return err
}
saveResult, err := rt.Store.SaveDurableClusters(ctx, repo.ID, inputs)
if err != nil {
return err
}
return a.writeOutput("cluster", map[string]any{
"repository": repo.FullName,
"threshold": threshold,
"min_size": minSize,
"vector_count": len(vectors),
"edge_count": edgeCount,
"cluster_count": saveResult.ClusterCount,
"member_count": saveResult.MemberCount,
"run_id": saveResult.RunID,
}, true)
}
func (a *App) runClusters(ctx context.Context, args []string) error {
fs := flag.NewFlagSet("clusters", flag.ContinueOnError)
fs.SetOutput(io.Discard)
@ -1400,6 +1491,97 @@ func parseClusterMemberCommandIDs(command, clusterIDRaw, numberRaw string) (int,
return clusterID, number, nil
}
func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, threshold float64, minSize int) ([]store.DurableClusterInput, int, error) {
if minSize <= 0 {
minSize = 2
}
threadIDs := make([]int64, 0, len(storedVectors))
vectorByThreadID := make(map[int64][]float64, len(storedVectors))
for _, stored := range storedVectors {
threadIDs = append(threadIDs, stored.ThreadID)
vectorByThreadID[stored.ThreadID] = stored.Vector
}
threads, err := st.ThreadsByIDs(ctx, repoID, threadIDs)
if err != nil {
return nil, 0, err
}
nodes := make([]clusterer.Node, 0, len(storedVectors))
for _, stored := range storedVectors {
thread, ok := threads[stored.ThreadID]
if !ok {
continue
}
nodes = append(nodes, clusterer.Node{ThreadID: stored.ThreadID, Number: thread.Number, Title: thread.Title})
}
edges := make([]clusterer.Edge, 0)
pairScores := map[string]float64{}
for left := 0; left < len(nodes); left++ {
for right := left + 1; right < len(nodes); right++ {
leftID := nodes[left].ThreadID
rightID := nodes[right].ThreadID
score := vector.Cosine(vectorByThreadID[leftID], vectorByThreadID[rightID])
if score < threshold {
continue
}
edges = append(edges, clusterer.Edge{LeftThreadID: leftID, RightThreadID: rightID, Score: score})
pairScores[threadIDPairKey(leftID, rightID)] = score
}
}
built := clusterer.Build(nodes, edges)
inputs := make([]store.DurableClusterInput, 0, len(built))
for _, builtCluster := range built {
if len(builtCluster.Members) < minSize {
continue
}
sort.Slice(builtCluster.Members, func(i, j int) bool {
left := threads[builtCluster.Members[i]]
right := threads[builtCluster.Members[j]]
return left.Number < right.Number
})
rep := threads[builtCluster.RepresentativeThreadID]
input := store.DurableClusterInput{
StableKey: durableClusterStableKey(builtCluster.Members),
StableSlug: durableClusterSlug(builtCluster.Members),
RepresentativeThreadID: builtCluster.RepresentativeThreadID,
Title: rep.Title,
Members: make([]store.DurableClusterMemberInput, 0, len(builtCluster.Members)),
}
for _, threadID := range builtCluster.Members {
role := "member"
var scorePtr *float64
if threadID == builtCluster.RepresentativeThreadID {
role = "representative"
} else if score, ok := pairScores[threadIDPairKey(threadID, builtCluster.RepresentativeThreadID)]; ok {
scoreCopy := score
scorePtr = &scoreCopy
}
input.Members = append(input.Members, store.DurableClusterMemberInput{ThreadID: threadID, Role: role, ScoreToRepresentative: scorePtr})
}
inputs = append(inputs, input)
}
return inputs, len(edges), nil
}
func durableClusterStableKey(threadIDs []int64) string {
parts := make([]string, 0, len(threadIDs))
for _, id := range threadIDs {
parts = append(parts, strconv.FormatInt(id, 10))
}
return "members:" + strings.Join(parts, ",")
}
func durableClusterSlug(threadIDs []int64) string {
sum := sha256.Sum256([]byte(durableClusterStableKey(threadIDs)))
return "cluster-" + hex.EncodeToString(sum[:])[:12]
}
func threadIDPairKey(left, right int64) string {
if left > right {
left, right = right, left
}
return strconv.FormatInt(left, 10) + ":" + strconv.FormatInt(right, 10)
}
func parseOptionalFloat(value string) (float64, error) {
if strings.TrimSpace(value) == "" {
return 0, nil

View File

@ -499,6 +499,116 @@ func TestClusterMemberOverrideCommands(t *testing.T) {
}
}
func TestClusterCommandPersistsDurableClusters(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
configPath := filepath.Join(dir, "config.toml")
dbPath := filepath.Join(dir, "gitcrawl.db")
app := New()
if err := app.Run(ctx, []string{"--config", configPath, "init", "--db", dbPath}); err != nil {
t.Fatalf("init: %v", err)
}
st, err := store.Open(ctx, dbPath)
if err != nil {
t.Fatalf("open store: %v", err)
}
repoID, err := st.UpsertRepository(ctx, store.Repository{
Owner: "openclaw",
Name: "openclaw",
FullName: "openclaw/openclaw",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed repository: %v", err)
}
firstID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "91",
Number: 91,
Kind: "issue",
State: "open",
Title: "First duplicate",
HTMLURL: "https://github.com/openclaw/openclaw/issues/91",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-91",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed first thread: %v", err)
}
secondID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "92",
Number: 92,
Kind: "issue",
State: "open",
Title: "Second duplicate",
HTMLURL: "https://github.com/openclaw/openclaw/issues/92",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-92",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed second thread: %v", err)
}
thirdID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "93",
Number: 93,
Kind: "issue",
State: "open",
Title: "Unrelated issue",
HTMLURL: "https://github.com/openclaw/openclaw/issues/93",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-93",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed third thread: %v", err)
}
now := time.Now().UTC().Format(time.RFC3339Nano)
for _, vector := range []store.ThreadVector{
{ThreadID: firstID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-91", Vector: []float64{1, 0}, CreatedAt: now, UpdatedAt: now},
{ThreadID: secondID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-92", Vector: []float64{0.95, 0.05}, CreatedAt: now, UpdatedAt: now},
{ThreadID: thirdID, Basis: "title_original", Model: "text-embedding-3-small", Dimensions: 2, ContentHash: "hash-93", Vector: []float64{0, 1}, CreatedAt: now, UpdatedAt: now},
} {
if err := st.UpsertThreadVector(ctx, vector); err != nil {
t.Fatalf("upsert vector: %v", err)
}
}
if err := st.Close(); err != nil {
t.Fatalf("close store: %v", err)
}
run := New()
var stdout bytes.Buffer
run.Stdout = &stdout
if err := run.Run(ctx, []string{"--config", configPath, "cluster", "openclaw/openclaw", "--threshold", "0.90", "--json"}); err != nil {
t.Fatalf("cluster: %v", err)
}
if !strings.Contains(stdout.String(), `"cluster_count": 1`) {
t.Fatalf("cluster output = %q", stdout.String())
}
st, err = store.Open(ctx, dbPath)
if err != nil {
t.Fatalf("reopen store: %v", err)
}
defer st.Close()
clusters, err := st.ListClusterSummaries(ctx, store.ClusterSummaryOptions{RepoID: repoID, IncludeClosed: false, MinSize: 1, Limit: 20})
if err != nil {
t.Fatalf("list clusters: %v", err)
}
if len(clusters) != 1 || clusters[0].MemberCount != 2 {
t.Fatalf("expected one durable cluster, got %#v", clusters)
}
}
func TestTUIHelp(t *testing.T) {
app := New()
var stdout bytes.Buffer