fix: improve cluster graph quality

This commit is contained in:
Peter Steinberger 2026-04-28 10:53:50 +01:00
parent 269a50d536
commit 746eda48c6
No known key found for this signature in database
8 changed files with 718 additions and 33 deletions

View File

@ -7,3 +7,5 @@
- Default `gitcrawl sync` to `--state all`, keeping closed issue and pull request state fresh unless a narrower state is requested.
- Let `gitcrawl search` fall back to compact thread title/body data when portable stores have pruned generated document indexes.
- Refresh clean portable-store checkouts before read-only commands so `search`, `threads`, clusters, and the TUI see freshly published GitHub backup data automatically.
- Show active primary cluster memberships by default in `clusters`, `durable-clusters`, and the TUI, with `--include-closed` reserved for historical audit views.
- Split generated clusters with bounded nearest-neighbor graph safeguards, GitHub reference evidence, and cross-kind score pruning so weak similarity bridges stop merging unrelated reports into one mega-cluster.

View File

@ -34,9 +34,11 @@ gitcrawl tui
gitcrawl tui owner/repo
```
`gitcrawl clusters`, `gitcrawl durable-clusters`, and `gitcrawl tui` show active primary cluster memberships by default. Pass `--include-closed` to inspect closed rows and historical secondary memberships.
`gitcrawl cluster` and `gitcrawl refresh` build bounded nearest-neighbor clusters by default (`--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`) and add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`.
`gitcrawl tui` infers the most recently updated local repository when `owner/repo` is omitted. `serve` is intentionally not part of `gitcrawl`.
`gitcrawl sync` fetches issues and pull requests in every GitHub state by default. Pass `--state open` or `--state closed` to limit a sync to one state.
The TUI starts at `--min-size 5` so maintainer-significant clusters are visible first; pass `--min-size 1` to include singletons. Mouse support is built in: click rows, wheel panes, and right-click for copy, sort, filter, jump, link, neighbor, local close/reopen, and member triage actions. Press `a` to open the same action menu from the keyboard, `#` to jump directly to an issue or PR number, `p` to switch between repositories already present in the local store, or `n` to load neighbors for the selected issue or PR. Enter from the members pane also loads neighbors before opening detail. The TUI quietly refreshes from the local store every 15 seconds.
The TUI starts at `--min-size 5` so maintainer-significant active clusters are visible first; pass `--min-size 1` to include singletons. Mouse support is built in: click rows, wheel panes, and right-click for copy, sort, filter, jump, link, neighbor, local close/reopen, and member triage actions. Press `a` to open the same action menu from the keyboard, `#` to jump directly to an issue or PR number, `p` to switch between repositories already present in the local store, or `n` to load neighbors for the selected issue or PR. Enter from the members pane also loads neighbors before opening detail. The TUI quietly refreshes from the local store every 15 seconds.
## Local Defaults

View File

@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
@ -29,8 +30,15 @@ import (
const (
defaultTUIMinSize = 5
defaultTUIWorkingSetLimit = 500
defaultClusterMaxSize = 40
defaultClusterFanout = 16
defaultCrossKindMinScore = 0.93
deterministicRefScore = 0.94
sharedRefMaxBucketSize = 8
)
var threadReferencePattern = regexp.MustCompile(`(?i)(?:#|issues/|pull/)(\d+)`)
type App struct {
Stdout io.Writer
Stderr io.Writer
@ -236,8 +244,11 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
limitRaw := fs.String("limit", "", "maximum sync or embedding rows")
thresholdRaw := fs.String("threshold", "0.82", "minimum cluster cosine score")
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
jsonOut := fs.Bool("json", false, "write JSON output")
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"since": true, "state": true, "limit": true, "threshold": true, "min-size": true})); err != nil {
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"since": true, "state": true, "limit": true, "threshold": true, "min-size": true, "max-cluster-size": true, "k": true, "cross-kind-threshold": true})); err != nil {
return usageErr(err)
}
a.applyCommandJSON(*jsonOut)
@ -269,6 +280,10 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
if minSize <= 0 {
minSize = 2
}
maxClusterSize, fanout, crossKindThreshold, err := parseClusterShapeOptions("refresh", *maxClusterSizeRaw, *fanoutRaw, *crossKindThresholdRaw)
if err != nil {
return err
}
result := refreshResult{
Repository: owner + "/" + repoName,
@ -325,7 +340,13 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
return err
}
}
clusterResult, err := clusterRepository(ctx, rt.Store, repo.ID, vectors, threshold, minSize)
clusterResult, err := clusterRepository(ctx, rt.Store, repo.ID, vectors, clusterBuildOptions{
Threshold: threshold,
MinSize: minSize,
MaxClusterSize: maxClusterSize,
Fanout: fanout,
CrossKindThreshold: crossKindThreshold,
})
_ = rt.Store.Close()
if err != nil {
return err
@ -333,7 +354,10 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
result.Repository = repo.FullName
result.Cluster = map[string]any{
"threshold": threshold,
"cross_kind": crossKindThreshold,
"min_size": minSize,
"max_size": maxClusterSize,
"k": fanout,
"vector_count": len(vectors),
"edge_count": clusterResult.EdgeCount,
"cluster_count": clusterResult.ClusterCount,
@ -515,11 +539,14 @@ func (a *App) runCluster(ctx context.Context, args []string) error {
fs.SetOutput(io.Discard)
thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score")
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
crossKindThresholdRaw := fs.String("cross-kind-threshold", fmt.Sprintf("%.2f", defaultCrossKindMinScore), "minimum score for issue/pull request edges")
limitRaw := fs.String("limit", "", "maximum vector rows to cluster")
model := fs.String("model", "", "embedding model")
basis := fs.String("basis", "", "embedding basis")
jsonOut := fs.Bool("json", false, "write JSON output")
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"threshold": true, "min-size": true, "limit": true, "model": true, "basis": true})); err != nil {
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"threshold": true, "min-size": true, "max-cluster-size": true, "k": true, "cross-kind-threshold": true, "limit": true, "model": true, "basis": true})); err != nil {
return usageErr(err)
}
a.applyCommandJSON(*jsonOut)
@ -544,6 +571,10 @@ func (a *App) runCluster(ctx context.Context, args []string) error {
if minSize <= 0 {
minSize = 2
}
maxClusterSize, fanout, crossKindThreshold, err := parseClusterShapeOptions("cluster", *maxClusterSizeRaw, *fanoutRaw, *crossKindThresholdRaw)
if err != nil {
return err
}
limit, err := parseOptionalPositiveInt(*limitRaw)
if err != nil {
return usageErr(err)
@ -575,14 +606,23 @@ func (a *App) runCluster(ctx context.Context, args []string) error {
if limit > 0 && len(vectors) > limit {
vectors = vectors[:limit]
}
clusterResult, err := clusterRepository(ctx, rt.Store, repo.ID, vectors, threshold, minSize)
clusterResult, err := clusterRepository(ctx, rt.Store, repo.ID, vectors, clusterBuildOptions{
Threshold: threshold,
MinSize: minSize,
MaxClusterSize: maxClusterSize,
Fanout: fanout,
CrossKindThreshold: crossKindThreshold,
})
if err != nil {
return err
}
return a.writeOutput("cluster", map[string]any{
"repository": repo.FullName,
"threshold": threshold,
"cross_kind": crossKindThreshold,
"min_size": minSize,
"max_size": maxClusterSize,
"k": fanout,
"vector_count": len(vectors),
"edge_count": clusterResult.EdgeCount,
"cluster_count": clusterResult.ClusterCount,
@ -768,7 +808,8 @@ func (a *App) runClusterList(ctx context.Context, command string, args []string,
minSizeRaw := fs.String("min-size", "", "minimum active member count")
limitRaw := fs.String("limit", "", "maximum cluster rows")
sortMode := fs.String("sort", "recent", "sort mode: recent|size")
hideClosed := fs.Bool("hide-closed", false, "hide non-active or closed clusters")
includeClosed := fs.Bool("include-closed", false, "include closed and secondary historical cluster memberships")
hideClosed := fs.Bool("hide-closed", false, "deprecated; active primary clusters are the default")
jsonOut := fs.Bool("json", false, "write JSON output")
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"min-size": true, "limit": true, "sort": true})); err != nil {
return usageErr(err)
@ -805,7 +846,7 @@ func (a *App) runClusterList(ctx context.Context, command string, args []string,
}
options := store.ClusterSummaryOptions{
RepoID: repo.ID,
IncludeClosed: !*hideClosed,
IncludeClosed: *includeClosed && !*hideClosed,
MinSize: minSize,
Limit: limit,
Sort: sort,
@ -831,7 +872,8 @@ func (a *App) runTUI(ctx context.Context, args []string) error {
minSizeRaw := fs.String("min-size", "", "minimum active member count")
limitRaw := fs.String("limit", "20", "maximum cluster rows")
sortMode := fs.String("sort", "", "sort mode: recent|size")
hideClosed := fs.Bool("hide-closed", false, "hide non-active or closed clusters")
includeClosed := fs.Bool("include-closed", false, "include closed and secondary historical cluster memberships")
hideClosed := fs.Bool("hide-closed", false, "deprecated; active primary clusters are the default")
jsonOut := fs.Bool("json", false, "write JSON output")
if err := fs.Parse(normalizeCommandArgs(args, map[string]bool{"min-size": true, "limit": true, "sort": true})); err != nil {
if errors.Is(err, flag.ErrHelp) {
@ -882,10 +924,11 @@ func (a *App) runTUI(ctx context.Context, args []string) error {
if sort != "recent" && sort != "size" {
return usageErr(fmt.Errorf("unsupported sort %q", sort))
}
showClosed := *includeClosed && !*hideClosed
clusters, err := rt.Store.ListDisplayClusterSummaries(ctx, store.ClusterSummaryOptions{
RepoID: repo.ID,
IncludeClosed: !*hideClosed,
IncludeClosed: showClosed,
MinSize: minSize,
Limit: limit,
Sort: sort,
@ -896,7 +939,7 @@ func (a *App) runTUI(ctx context.Context, args []string) error {
if interactive {
workingSet, err := rt.Store.ListDisplayClusterSummaries(ctx, store.ClusterSummaryOptions{
RepoID: repo.ID,
IncludeClosed: !*hideClosed,
IncludeClosed: showClosed,
MinSize: 1,
Limit: maxInt(defaultTUIWorkingSetLimit, limit),
Sort: sort,
@ -916,7 +959,7 @@ func (a *App) runTUI(ctx context.Context, args []string) error {
Sort: sort,
MinSize: minSize,
Limit: limit,
HideClosed: *hideClosed,
HideClosed: !showClosed,
EmbedModel: rt.Config.OpenAI.EmbedModel,
EmbeddingBasis: rt.Config.EmbeddingBasis,
Clusters: clusters,
@ -1847,9 +1890,54 @@ func parseClusterMemberCommandIDs(command, clusterIDRaw, numberRaw string) (int,
return clusterID, number, nil
}
func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, threshold float64, minSize int) ([]store.DurableClusterInput, int, error) {
if minSize <= 0 {
minSize = 2
type clusterBuildOptions struct {
Threshold float64
MinSize int
MaxClusterSize int
Fanout int
CrossKindThreshold float64
}
func parseClusterShapeOptions(command, maxClusterSizeRaw, fanoutRaw, crossKindThresholdRaw string) (int, int, float64, error) {
maxClusterSize, err := parseOptionalPositiveInt(maxClusterSizeRaw)
if err != nil {
return 0, 0, 0, err
}
fanout, err := parseOptionalPositiveInt(fanoutRaw)
if err != nil {
return 0, 0, 0, err
}
crossKindThreshold, err := parseOptionalFloat(crossKindThresholdRaw)
if err != nil {
return 0, 0, 0, err
}
if maxClusterSize == 0 {
maxClusterSize = defaultClusterMaxSize
}
if fanout == 0 {
fanout = defaultClusterFanout
}
if crossKindThreshold == 0 {
crossKindThreshold = defaultCrossKindMinScore
}
if crossKindThreshold < 0 || crossKindThreshold > 1 {
return 0, 0, 0, fmt.Errorf("%s requires --cross-kind-threshold between 0 and 1", command)
}
return maxClusterSize, fanout, crossKindThreshold, nil
}
func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, options clusterBuildOptions) ([]store.DurableClusterInput, int, error) {
if options.MinSize <= 0 {
options.MinSize = 2
}
if options.MaxClusterSize <= 0 {
options.MaxClusterSize = defaultClusterMaxSize
}
if options.Fanout <= 0 {
options.Fanout = defaultClusterFanout
}
if options.CrossKindThreshold <= 0 {
options.CrossKindThreshold = defaultCrossKindMinScore
}
threadIDs := make([]int64, 0, len(storedVectors))
vectorByThreadID := make(map[int64][]float64, len(storedVectors))
@ -1869,24 +1957,35 @@ func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int6
}
nodes = append(nodes, clusterer.Node{ThreadID: stored.ThreadID, Number: thread.Number, Title: thread.Title})
}
edges := make([]clusterer.Edge, 0)
pairScores := map[string]float64{}
candidateByPair := map[string]clusterer.Edge{}
for left := 0; left < len(nodes); left++ {
for right := left + 1; right < len(nodes); right++ {
leftID := nodes[left].ThreadID
rightID := nodes[right].ThreadID
score := vector.Cosine(vectorByThreadID[leftID], vectorByThreadID[rightID])
if score < threshold {
if score < options.Threshold {
continue
}
edges = append(edges, clusterer.Edge{LeftThreadID: leftID, RightThreadID: rightID, Score: score})
pairScores[threadIDPairKey(leftID, rightID)] = score
if threads[leftID].Kind != threads[rightID].Kind && score < options.CrossKindThreshold {
continue
}
upsertClusterEdge(candidateByPair, leftID, rightID, score)
}
}
built := clusterer.Build(nodes, edges)
addDeterministicReferenceEdges(candidateByPair, nodes, threads)
candidates := make([]clusterer.Edge, 0, len(candidateByPair))
for _, edge := range candidateByPair {
candidates = append(candidates, edge)
}
edges := keepTopEdges(candidates, options.Fanout)
pairScores := map[string]float64{}
for _, edge := range edges {
pairScores[threadIDPairKey(edge.LeftThreadID, edge.RightThreadID)] = edge.Score
}
built := clusterer.BuildWithOptions(nodes, edges, clusterer.Options{MaxSize: options.MaxClusterSize})
inputs := make([]store.DurableClusterInput, 0, len(built))
for _, builtCluster := range built {
if len(builtCluster.Members) < minSize {
if len(builtCluster.Members) < options.MinSize {
continue
}
sort.Slice(builtCluster.Members, func(i, j int) bool {
@ -1918,6 +2017,116 @@ func buildDurableClusterInputs(ctx context.Context, st *store.Store, repoID int6
return inputs, len(edges), nil
}
func upsertClusterEdge(edges map[string]clusterer.Edge, leftID, rightID int64, score float64) {
if leftID == rightID {
return
}
key := threadIDPairKey(leftID, rightID)
if existing, ok := edges[key]; ok && existing.Score >= score {
return
}
if leftID > rightID {
leftID, rightID = rightID, leftID
}
edges[key] = clusterer.Edge{LeftThreadID: leftID, RightThreadID: rightID, Score: score}
}
func addDeterministicReferenceEdges(edges map[string]clusterer.Edge, nodes []clusterer.Node, threads map[int64]store.Thread) {
threadIDByNumber := make(map[int]int64, len(nodes))
for _, node := range nodes {
thread := threads[node.ThreadID]
threadIDByNumber[thread.Number] = node.ThreadID
}
refIDsByThreadID := make(map[int64]map[int64]bool, len(nodes))
threadsByReferencedNumber := map[int][]int64{}
for _, node := range nodes {
thread := threads[node.ThreadID]
refNumbers := referencedThreadNumbers(thread)
refIDs := map[int64]bool{}
for number := range refNumbers {
if referencedID, ok := threadIDByNumber[number]; ok && referencedID != node.ThreadID {
refIDs[referencedID] = true
}
threadsByReferencedNumber[number] = append(threadsByReferencedNumber[number], node.ThreadID)
}
refIDsByThreadID[node.ThreadID] = refIDs
}
for threadID, refIDs := range refIDsByThreadID {
for referencedID := range refIDs {
upsertClusterEdge(edges, threadID, referencedID, deterministicRefScore)
}
}
for _, threadIDs := range threadsByReferencedNumber {
if len(threadIDs) < 2 || len(threadIDs) > sharedRefMaxBucketSize {
continue
}
sort.Slice(threadIDs, func(i, j int) bool { return threadIDs[i] < threadIDs[j] })
for left := 0; left < len(threadIDs); left++ {
for right := left + 1; right < len(threadIDs); right++ {
upsertClusterEdge(edges, threadIDs[left], threadIDs[right], deterministicRefScore)
}
}
}
}
func referencedThreadNumbers(thread store.Thread) map[int]bool {
value := thread.Title + "\n" + thread.Body
refs := map[int]bool{}
for _, match := range threadReferencePattern.FindAllStringSubmatch(value, -1) {
if len(match) < 2 {
continue
}
number, err := strconv.Atoi(match[1])
if err != nil || number <= 0 || number == thread.Number {
continue
}
refs[number] = true
}
return refs
}
func keepTopEdges(edges []clusterer.Edge, fanout int) []clusterer.Edge {
if fanout <= 0 || len(edges) == 0 {
return edges
}
neighbors := map[int64][]clusterer.Edge{}
for _, edge := range edges {
neighbors[edge.LeftThreadID] = append(neighbors[edge.LeftThreadID], edge)
neighbors[edge.RightThreadID] = append(neighbors[edge.RightThreadID], edge)
}
top := map[int64]map[int64]bool{}
for threadID, list := range neighbors {
sort.SliceStable(list, func(i, j int) bool {
if list[i].Score == list[j].Score {
return edgeOtherThreadID(list[i], threadID) < edgeOtherThreadID(list[j], threadID)
}
return list[i].Score > list[j].Score
})
if len(list) > fanout {
list = list[:fanout]
}
seen := make(map[int64]bool, len(list))
for _, edge := range list {
seen[edgeOtherThreadID(edge, threadID)] = true
}
top[threadID] = seen
}
out := make([]clusterer.Edge, 0, len(edges))
for _, edge := range edges {
if top[edge.LeftThreadID][edge.RightThreadID] || top[edge.RightThreadID][edge.LeftThreadID] {
out = append(out, edge)
}
}
return out
}
func edgeOtherThreadID(edge clusterer.Edge, threadID int64) int64 {
if edge.LeftThreadID == threadID {
return edge.RightThreadID
}
return edge.LeftThreadID
}
type clusterRepositoryResult struct {
EdgeCount int
ClusterCount int
@ -1925,8 +2134,8 @@ type clusterRepositoryResult struct {
RunID int64
}
func clusterRepository(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, threshold float64, minSize int) (clusterRepositoryResult, error) {
inputs, edgeCount, err := buildDurableClusterInputs(ctx, st, repoID, storedVectors, threshold, minSize)
func clusterRepository(ctx context.Context, st *store.Store, repoID int64, storedVectors []store.ThreadVector, options clusterBuildOptions) (clusterRepositoryResult, error) {
inputs, edgeCount, err := buildDurableClusterInputs(ctx, st, repoID, storedVectors, options)
if err != nil {
return clusterRepositoryResult{}, err
}
@ -2109,10 +2318,10 @@ No API server is provided. There is intentionally no serve command.
const tuiUsageText = `gitcrawl tui opens the local terminal cluster browser.
Usage:
gitcrawl tui [owner/repo] [--limit N] [--min-size N] [--sort recent|size] [--hide-closed]
gitcrawl tui [owner/repo] [--limit N] [--min-size N] [--sort recent|size] [--include-closed]
If owner/repo is omitted, gitcrawl uses the most recently updated repository in the local database.
The TUI starts at --min-size 5 by default; pass --min-size 1 to show singleton clusters.
The TUI starts with active primary clusters at --min-size 5 by default; pass --min-size 1 to show singleton clusters, or --include-closed to inspect historical memberships.
Mouse is supported: click rows, wheel panes, right-click for actions, and use the menu for copy/sort/filter/jump/member triage controls.
Press a to open the same action menu from the keyboard.
Press # to jump directly to an issue or PR number.

View File

@ -14,6 +14,7 @@ import (
"testing"
"time"
clusterer "github.com/openclaw/gitcrawl/internal/cluster"
"github.com/openclaw/gitcrawl/internal/store"
)
@ -532,6 +533,112 @@ func TestCloseClusterCommandLocallyClosesCluster(t *testing.T) {
}
}
func TestClustersDefaultShowsActivePrimaryMembers(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
configPath := filepath.Join(dir, "config.toml")
dbPath := filepath.Join(dir, "gitcrawl.db")
app := New()
if err := app.Run(ctx, []string{"--config", configPath, "init", "--db", dbPath}); err != nil {
t.Fatalf("init: %v", err)
}
st, err := store.Open(ctx, dbPath)
if err != nil {
t.Fatalf("open store: %v", err)
}
repoID, err := st.UpsertRepository(ctx, store.Repository{
Owner: "openclaw",
Name: "openclaw",
FullName: "openclaw/openclaw",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed repository: %v", err)
}
openID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "90",
Number: 90,
Kind: "issue",
State: "open",
Title: "Open member",
HTMLURL: "https://github.com/openclaw/openclaw/issues/90",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-90",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed open thread: %v", err)
}
closedID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "91",
Number: 91,
Kind: "issue",
State: "closed",
Title: "Closed historical member",
HTMLURL: "https://github.com/openclaw/openclaw/issues/91",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-91",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed closed thread: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, representative_thread_id, title, created_at, updated_at)
values(90, ?, 'cluster-90', 'cluster-90', 'active', ?, 'Cluster 90', '2026-04-27T00:00:00Z', '2026-04-27T00:00:00Z');
`, repoID, openID); err != nil {
t.Fatalf("seed cluster group: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at)
values(90, ?, 'member', 'active', 'system', '{}', '2026-04-27T00:00:00Z', '2026-04-27T00:00:00Z'),
(90, ?, 'member', 'active', 'system', '{}', '2026-04-27T00:00:00Z', '2026-04-27T00:00:00Z');
`, openID, closedID); err != nil {
t.Fatalf("seed cluster memberships: %v", err)
}
if err := st.Close(); err != nil {
t.Fatalf("close store: %v", err)
}
run := New()
var stdout bytes.Buffer
run.Stdout = &stdout
if err := run.Run(ctx, []string{"--config", configPath, "--json", "clusters", "openclaw/openclaw", "--sort", "size", "--min-size", "1"}); err != nil {
t.Fatalf("clusters: %v", err)
}
var active struct {
Clusters []store.ClusterSummary `json:"clusters"`
}
if err := json.Unmarshal(stdout.Bytes(), &active); err != nil {
t.Fatalf("decode active clusters: %v\n%s", err, stdout.String())
}
if len(active.Clusters) != 1 || active.Clusters[0].MemberCount != 1 {
t.Fatalf("default clusters should show active primary members, got %#v", active.Clusters)
}
stdout.Reset()
withClosed := New()
withClosed.Stdout = &stdout
if err := withClosed.Run(ctx, []string{"--config", configPath, "--json", "clusters", "openclaw/openclaw", "--sort", "size", "--min-size", "1", "--include-closed"}); err != nil {
t.Fatalf("clusters include closed: %v", err)
}
var all struct {
Clusters []store.ClusterSummary `json:"clusters"`
}
if err := json.Unmarshal(stdout.Bytes(), &all); err != nil {
t.Fatalf("decode all clusters: %v\n%s", err, stdout.String())
}
if len(all.Clusters) != 1 || all.Clusters[0].MemberCount != 2 {
t.Fatalf("include-closed should preserve historical members, got %#v", all.Clusters)
}
}
func TestClusterMemberOverrideCommands(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()
@ -761,6 +868,171 @@ func TestClusterCommandPersistsDurableClusters(t *testing.T) {
}
}
func TestBuildDurableClusterInputsPrunesWeakCrossKindEdges(t *testing.T) {
ctx := context.Background()
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, store.Repository{
Owner: "openclaw",
Name: "openclaw",
FullName: "openclaw/openclaw",
RawJSON: "{}",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed repository: %v", err)
}
issueID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "201",
Number: 201,
Kind: "issue",
State: "open",
Title: "Slack zero inbound events",
HTMLURL: "https://github.com/openclaw/openclaw/issues/201",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-201",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed issue: %v", err)
}
prID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "202",
Number: 202,
Kind: "pull_request",
State: "open",
Title: "Slack socket mode import fix",
HTMLURL: "https://github.com/openclaw/openclaw/pull/202",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-202",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed pull request: %v", err)
}
vectors := []store.ThreadVector{
{ThreadID: issueID, Vector: []float64{1, 0}},
{ThreadID: prID, Vector: []float64{0.9, 0.435889894}},
}
inputs, edgeCount, err := buildDurableClusterInputs(ctx, st, repoID, vectors, clusterBuildOptions{
Threshold: 0.82,
MinSize: 2,
MaxClusterSize: defaultClusterMaxSize,
Fanout: 16,
CrossKindThreshold: 0.93,
})
if err != nil {
t.Fatalf("build inputs: %v", err)
}
if edgeCount != 0 || len(inputs) != 0 {
t.Fatalf("weak cross-kind edge should be pruned, edges=%d inputs=%#v", edgeCount, inputs)
}
inputs, edgeCount, err = buildDurableClusterInputs(ctx, st, repoID, vectors, clusterBuildOptions{
Threshold: 0.82,
MinSize: 2,
MaxClusterSize: defaultClusterMaxSize,
Fanout: 16,
CrossKindThreshold: 0.89,
})
if err != nil {
t.Fatalf("build relaxed inputs: %v", err)
}
if edgeCount != 1 || len(inputs) != 1 {
t.Fatalf("relaxed cross-kind threshold should keep edge, edges=%d inputs=%#v", edgeCount, inputs)
}
}
func TestBuildDurableClusterInputsKeepsDeterministicReferenceEdges(t *testing.T) {
ctx := context.Background()
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, store.Repository{
Owner: "openclaw",
Name: "openclaw",
FullName: "openclaw/openclaw",
RawJSON: "{}",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed repository: %v", err)
}
issueID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "301",
Number: 301,
Kind: "issue",
State: "open",
Title: "Gateway token regression",
Body: "Users cannot authorize device tokens.",
HTMLURL: "https://github.com/openclaw/openclaw/issues/301",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-301",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed issue: %v", err)
}
prID, err := st.UpsertThread(ctx, store.Thread{
RepoID: repoID,
GitHubID: "302",
Number: 302,
Kind: "pull_request",
State: "open",
Title: "Repair auth scope migration",
Body: "Fixes #301 by preserving the device-token scope during upgrade.",
HTMLURL: "https://github.com/openclaw/openclaw/pull/302",
LabelsJSON: "[]",
AssigneesJSON: "[]",
RawJSON: "{}",
ContentHash: "hash-302",
UpdatedAt: time.Now().UTC().Format(time.RFC3339Nano),
})
if err != nil {
t.Fatalf("seed pull request: %v", err)
}
vectors := []store.ThreadVector{
{ThreadID: issueID, Vector: []float64{1, 0}},
{ThreadID: prID, Vector: []float64{0, 1}},
}
inputs, edgeCount, err := buildDurableClusterInputs(ctx, st, repoID, vectors, clusterBuildOptions{
Threshold: 0.99,
MinSize: 2,
MaxClusterSize: defaultClusterMaxSize,
Fanout: 16,
CrossKindThreshold: 0.99,
})
if err != nil {
t.Fatalf("build inputs: %v", err)
}
if edgeCount != 1 || len(inputs) != 1 {
t.Fatalf("direct issue/PR reference should form an evidence edge, edges=%d inputs=%#v", edgeCount, inputs)
}
}
func TestKeepTopEdgesKeepsOneSidedNearestNeighbors(t *testing.T) {
edges := keepTopEdges([]clusterer.Edge{
{LeftThreadID: 1, RightThreadID: 2, Score: 0.95},
{LeftThreadID: 1, RightThreadID: 3, Score: 0.90},
}, 1)
if len(edges) != 2 {
t.Fatalf("one-sided top-k edges should be kept, got %#v", edges)
}
}
func TestRefreshEmbedsAndClustersWithoutSync(t *testing.T) {
ctx := context.Background()
dir := t.TempDir()

View File

@ -19,13 +19,41 @@ type Cluster struct {
Members []int64 `json:"members"`
}
type Options struct {
MaxSize int
}
func Build(nodes []Node, edges []Edge) []Cluster {
return BuildWithOptions(nodes, edges, Options{})
}
func BuildWithOptions(nodes []Node, edges []Edge, options Options) []Cluster {
uf := newUnionFind()
for _, node := range nodes {
uf.add(node.ThreadID)
}
for _, edge := range edges {
uf.union(edge.LeftThreadID, edge.RightThreadID)
keptEdges := edges
if options.MaxSize > 0 {
keptEdges = make([]Edge, 0, len(edges))
sortedEdges := append([]Edge(nil), edges...)
sort.SliceStable(sortedEdges, func(i, j int) bool {
if sortedEdges[i].Score == sortedEdges[j].Score {
if sortedEdges[i].LeftThreadID == sortedEdges[j].LeftThreadID {
return sortedEdges[i].RightThreadID < sortedEdges[j].RightThreadID
}
return sortedEdges[i].LeftThreadID < sortedEdges[j].LeftThreadID
}
return sortedEdges[i].Score > sortedEdges[j].Score
})
for _, edge := range sortedEdges {
if uf.unionBounded(edge.LeftThreadID, edge.RightThreadID, options.MaxSize) {
keptEdges = append(keptEdges, edge)
}
}
} else {
for _, edge := range edges {
uf.union(edge.LeftThreadID, edge.RightThreadID)
}
}
byRoot := map[int64][]int64{}
@ -33,7 +61,7 @@ func Build(nodes []Node, edges []Edge) []Cluster {
root := uf.find(node.ThreadID)
byRoot[root] = append(byRoot[root], node.ThreadID)
}
return format(nodes, edges, byRoot)
return format(nodes, keptEdges, byRoot)
}
func format(nodes []Node, edges []Edge, byRoot map[int64][]int64) []Cluster {
@ -123,3 +151,20 @@ func (u *unionFind) union(left, right int64) {
u.parent[rightRoot] = leftRoot
u.size[leftRoot] += u.size[rightRoot]
}
func (u *unionFind) unionBounded(left, right int64, maxSize int) bool {
leftRoot := u.find(left)
rightRoot := u.find(right)
if leftRoot == rightRoot {
return true
}
if u.size[leftRoot] < u.size[rightRoot] {
leftRoot, rightRoot = rightRoot, leftRoot
}
if u.size[leftRoot]+u.size[rightRoot] > maxSize {
return false
}
u.parent[rightRoot] = leftRoot
u.size[leftRoot] += u.size[rightRoot]
return true
}

View File

@ -20,3 +20,29 @@ func TestBuildConnectedComponents(t *testing.T) {
t.Fatalf("representative: got %d want 1", clusters[0].RepresentativeThreadID)
}
}
func TestBuildWithOptionsKeepsStrongBoundedComponents(t *testing.T) {
clusters := BuildWithOptions([]Node{
{ThreadID: 1, Number: 10},
{ThreadID: 2, Number: 11},
{ThreadID: 3, Number: 12},
{ThreadID: 4, Number: 13},
{ThreadID: 5, Number: 14},
{ThreadID: 6, Number: 15},
}, []Edge{
{LeftThreadID: 1, RightThreadID: 2, Score: 0.95},
{LeftThreadID: 2, RightThreadID: 3, Score: 0.94},
{LeftThreadID: 3, RightThreadID: 4, Score: 0.82},
{LeftThreadID: 4, RightThreadID: 5, Score: 0.81},
{LeftThreadID: 5, RightThreadID: 6, Score: 0.80},
}, Options{MaxSize: 3})
if len(clusters) != 2 {
t.Fatalf("clusters: got %d want 2: %#v", len(clusters), clusters)
}
if got := clusters[0].Members; len(got) != 3 || got[0] != 1 || got[1] != 2 || got[2] != 3 {
t.Fatalf("first cluster members: %#v", got)
}
if got := clusters[1].Members; len(got) != 3 || got[0] != 4 || got[1] != 5 || got[2] != 6 {
t.Fatalf("second cluster members: %#v", got)
}
}

View File

@ -221,7 +221,7 @@ func (s *Store) listDurableClusterSummaries(ctx context.Context, options Cluster
args = append(args, minSize, limit)
memberThreadJoin := `left join threads mt on mt.id = cm.thread_id`
if !options.IncludeClosed {
memberThreadJoin += ` and mt.closed_at_local is null`
memberThreadJoin += ` and ` + durableVisibleMemberPredicate("cg", "cm", "mt")
}
rows, err := s.db.QueryContext(ctx, `
@ -395,6 +395,26 @@ func parseIDSet(value string) map[int64]bool {
return out
}
func durableVisibleMemberPredicate(clusterAlias, membershipAlias, threadAlias string) string {
return threadAlias + `.state = 'open'
and ` + threadAlias + `.closed_at_local is null
and (
` + membershipAlias + `.role in ('canonical', 'representative')
or not exists (
select 1
from cluster_memberships visible_cm
join cluster_groups visible_cg on visible_cg.id = visible_cm.cluster_id
where visible_cm.thread_id = ` + membershipAlias + `.thread_id
and visible_cm.cluster_id <> ` + membershipAlias + `.cluster_id
and visible_cm.state = 'active'
and visible_cm.role in ('canonical', 'representative')
and visible_cg.repo_id = ` + clusterAlias + `.repo_id
and visible_cg.status = 'active'
and visible_cg.closed_at is null
)
)`
}
func idSetOverlapRatio(left, right map[int64]bool) float64 {
smaller := len(left)
if len(right) < smaller {
@ -435,7 +455,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO
where := `cm.cluster_id = ?`
args := []any{options.ClusterID}
if !options.IncludeClosed {
where += ` and cm.state = 'active' and t.closed_at_local is null`
where += ` and cm.state = 'active' and ` + durableVisibleMemberPredicate("cg", "cm", "t")
}
args = append(args, limit)
rows, err := s.db.QueryContext(ctx, `
@ -445,6 +465,7 @@ func (s *Store) DurableClusterDetail(ctx context.Context, options ClusterDetailO
t.created_at_gh, t.updated_at_gh, t.closed_at_gh, t.merged_at_gh,
t.first_pulled_at, t.last_pulled_at, t.updated_at, t.closed_at_local, t.close_reason_local
from cluster_memberships cm
join cluster_groups cg on cg.id = cm.cluster_id
join threads t on t.id = cm.thread_id
where `+where+`
order by case cm.role when 'canonical' then 0 when 'representative' then 1 else 2 end,
@ -550,7 +571,7 @@ func (s *Store) ClusterIDForThreadNumber(ctx context.Context, repoID int64, numb
where := `t.repo_id = ? and t.number = ?`
args := []any{repoID, number}
if !includeClosed {
where += ` and t.closed_at_local is null and cm.state = 'active' and cg.status = 'active' and cg.closed_at is null`
where += ` and cm.state = 'active' and cg.status = 'active' and cg.closed_at is null and ` + durableVisibleMemberPredicate("cg", "cm", "t")
}
row := s.db.QueryRowContext(ctx, `
select cg.id
@ -560,6 +581,7 @@ func (s *Store) ClusterIDForThreadNumber(ctx context.Context, repoID int64, numb
where `+where+`
order by case cm.state when 'active' then 0 else 1 end,
case cg.status when 'active' then 0 else 1 end,
case cm.role when 'canonical' then 0 when 'representative' then 1 else 2 end,
coalesce(cg.updated_at, '') desc,
cg.id desc
limit 1
@ -1090,19 +1112,28 @@ func nullableFloat(value *float64) sql.NullFloat64 {
func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, includeClosed bool) (ClusterSummary, error) {
where := `cg.repo_id = ? and cg.id = ?`
args := []any{repoID, clusterID}
memberCountExpr := `count(cm.thread_id)`
closedMemberCountExpr := `sum(case when t.closed_at_local is not null or t.state <> 'open' then 1 else 0 end)`
memberThreadJoin := ``
if !includeClosed {
where += ` and cg.status = 'active' and cg.closed_at is null`
memberCountExpr = `count(mt.id)`
closedMemberCountExpr = `0`
memberThreadJoin = `
left join threads mt on mt.id = cm.thread_id
and (` + durableVisibleMemberPredicate("cg", "cm", "mt") + `)`
}
row := s.db.QueryRowContext(ctx, `
select cg.id, cg.stable_slug, cg.status, cg.title, cg.representative_thread_id,
rt.number, rt.kind, rt.title,
count(cm.thread_id) as member_count,
`+memberCountExpr+` as member_count,
cg.updated_at, coalesce(cc.updated_at, cg.closed_at) as closed_at,
sum(case when t.closed_at_local is not null or t.state <> 'open' then 1 else 0 end) as closed_member_count
`+closedMemberCountExpr+` as closed_member_count
from cluster_groups cg
left join cluster_closures cc on cc.cluster_id = cg.id
left join cluster_memberships cm on cm.cluster_id = cg.id and cm.state = 'active'
left join threads t on t.id = cm.thread_id
`+memberThreadJoin+`
left join threads rt on rt.id = cg.representative_thread_id
where `+where+`
group by cg.id

View File

@ -63,6 +63,104 @@ func TestListClusterSummaries(t *testing.T) {
}
}
func TestDurableClusterSummariesUsePrimaryOpenMembers(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, Repository{Owner: "openclaw", Name: "openclaw", FullName: "openclaw/openclaw", RawJSON: "{}", UpdatedAt: "2026-04-26T00:00:00Z"})
if err != nil {
t.Fatalf("repo: %v", err)
}
canonicalID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "101", Number: 101, Kind: "issue", State: "open",
Title: "broad canonical", HTMLURL: "https://github.com/openclaw/openclaw/issues/101",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-101", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("canonical thread: %v", err)
}
closedID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "102", Number: 102, Kind: "issue", State: "closed",
Title: "closed stale related", HTMLURL: "https://github.com/openclaw/openclaw/issues/102",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-102", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("closed thread: %v", err)
}
specificID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "103", Number: 103, Kind: "issue", State: "open",
Title: "specific canonical elsewhere", HTMLURL: "https://github.com/openclaw/openclaw/issues/103",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-103", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("specific thread: %v", err)
}
relatedOnlyID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "104", Number: 104, Kind: "issue", State: "open",
Title: "real related member", HTMLURL: "https://github.com/openclaw/openclaw/issues/104",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-104", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("related-only thread: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, representative_thread_id, title, created_at, updated_at)
values(1000, ?, 'broad', 'broad', 'active', ?, 'Broad cluster', '2026-04-26T00:00:00Z', '2026-04-26T00:10:00Z'),
(1001, ?, 'specific', 'specific', 'active', ?, 'Specific cluster', '2026-04-26T00:00:00Z', '2026-04-26T00:20:00Z');
`, repoID, canonicalID, repoID, specificID); err != nil {
t.Fatalf("seed cluster groups: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at)
values(1000, ?, 'canonical', 'active', 'algo', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z'),
(1000, ?, 'related', 'active', 'algo', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z'),
(1000, ?, 'related', 'active', 'algo', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z'),
(1000, ?, 'related', 'active', 'algo', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z'),
(1001, ?, 'canonical', 'active', 'algo', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z');
`, canonicalID, closedID, specificID, relatedOnlyID, specificID); err != nil {
t.Fatalf("seed cluster memberships: %v", err)
}
active, err := st.ListClusterSummaries(ctx, ClusterSummaryOptions{RepoID: repoID, IncludeClosed: false, MinSize: 1, Limit: 10, Sort: "size"})
if err != nil {
t.Fatalf("list active clusters: %v", err)
}
if len(active) != 2 || active[0].ID != 1000 || active[0].MemberCount != 2 || active[1].ID != 1001 || active[1].MemberCount != 1 {
t.Fatalf("active summaries should count primary open members, got %#v", active)
}
if active[0].Status != "active" {
t.Fatalf("active summary status should not be derived from hidden historical members, got %#v", active[0])
}
detail, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 1000, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("active detail: %v", err)
}
if detail.Cluster.Status != "active" {
t.Fatalf("active detail status should not be derived from hidden historical members, got %#v", detail.Cluster)
}
if len(detail.Members) != 2 || detail.Members[0].Thread.Number != 101 || detail.Members[1].Thread.Number != 104 {
t.Fatalf("active detail should hide closed and secondary related members, got %#v", detail.Members)
}
clusterID, err := st.ClusterIDForThreadNumber(ctx, repoID, 103, false)
if err != nil {
t.Fatalf("cluster id for specific thread: %v", err)
}
if clusterID != 1001 {
t.Fatalf("specific canonical cluster id = %d, want 1001", clusterID)
}
all, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 1000, IncludeClosed: true, MemberLimit: 10})
if err != nil {
t.Fatalf("all detail: %v", err)
}
if len(all.Members) != 4 {
t.Fatalf("include closed should preserve all durable memberships, got %#v", all.Members)
}
}
func TestListDisplayClusterSummariesPrefersLatestRawRun(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))