From e7d3c327fd44ed96664601ee8111c19dc1412285 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 27 Apr 2026 14:25:30 -0700 Subject: [PATCH] feat: add local cluster member overrides --- internal/store/clusters.go | 233 ++++++++++++++++++++++++++++++++ internal/store/clusters_test.go | 100 ++++++++++++++ 2 files changed, 333 insertions(+) diff --git a/internal/store/clusters.go b/internal/store/clusters.go index 54d3b72..ff4e7db 100644 --- a/internal/store/clusters.go +++ b/internal/store/clusters.go @@ -3,6 +3,7 @@ package store import ( "context" "database/sql" + "encoding/json" "fmt" "strings" "time" @@ -53,6 +54,14 @@ type ClusterDetail struct { Members []ClusterMemberDetail `json:"members"` } +type ClusterMemberOverride struct { + ClusterID int64 `json:"cluster_id"` + ThreadID int64 `json:"thread_id"` + Number int `json:"number"` + Action string `json:"action"` + Reason string `json:"reason,omitempty"` +} + func (s *Store) ListClusterSummaries(ctx context.Context, options ClusterSummaryOptions) ([]ClusterSummary, error) { where := `cg.repo_id = ?` args := []any{options.RepoID} @@ -284,6 +293,230 @@ func (s *Store) ReopenClusterLocally(ctx context.Context, repoID, clusterID int6 return nil } +func (s *Store) ExcludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) { + if repoID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive") + } + if clusterID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive") + } + if number <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive") + } + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "local exclude" + } + var result ClusterMemberOverride + err := s.WithTx(ctx, func(tx *Store) error { + threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, false) + if err != nil { + return err + } + now := time.Now().UTC().Format(timeLayout) + reasonJSON, err := json.Marshal(map[string]string{"reason": reason}) + if err != nil { + return fmt.Errorf("encode override reason: %w", err) + } + if _, err := tx.q().ExecContext(ctx, ` + update cluster_memberships + set state = 'excluded', removed_by = 'local', removed_reason_json = ?, removed_at = ?, updated_at = ? + where cluster_id = ? and thread_id = ? + `, string(reasonJSON), now, now, clusterID, threadID); err != nil { + return fmt.Errorf("exclude cluster member: %w", err) + } + if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action in ('include', 'canonical')`, clusterID, threadID); err != nil { + return fmt.Errorf("clear stale member overrides: %w", err) + } + if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "exclude", reason, now); err != nil { + return err + } + if err := tx.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now); err != nil { + return err + } + result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "exclude", Reason: reason} + return nil + }) + if err != nil { + return ClusterMemberOverride{}, err + } + return result, nil +} + +func (s *Store) IncludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) { + if repoID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive") + } + if clusterID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive") + } + if number <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive") + } + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "local include" + } + var result ClusterMemberOverride + err := s.WithTx(ctx, func(tx *Store) error { + threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, false) + if err != nil { + return err + } + now := time.Now().UTC().Format(timeLayout) + update, err := tx.q().ExecContext(ctx, ` + update cluster_memberships + set state = 'active', removed_by = null, removed_reason_json = null, removed_at = null, updated_at = ? + where cluster_id = ? and thread_id = ? + `, now, clusterID, threadID) + if err != nil { + return fmt.Errorf("include cluster member: %w", err) + } + if affected, err := update.RowsAffected(); err == nil && affected == 0 { + return fmt.Errorf("thread #%d is not in cluster %d", number, clusterID) + } + if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action = 'exclude'`, clusterID, threadID); err != nil { + return fmt.Errorf("clear exclude override: %w", err) + } + if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "include", reason, now); err != nil { + return err + } + if err := tx.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now); err != nil { + return err + } + result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "include", Reason: reason} + return nil + }) + if err != nil { + return ClusterMemberOverride{}, err + } + return result, nil +} + +func (s *Store) SetClusterCanonicalLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) { + if repoID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive") + } + if clusterID <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive") + } + if number <= 0 { + return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive") + } + reason = strings.TrimSpace(reason) + if reason == "" { + reason = "local canonical" + } + var result ClusterMemberOverride + err := s.WithTx(ctx, func(tx *Store) error { + threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, true) + if err != nil { + return err + } + now := time.Now().UTC().Format(timeLayout) + if _, err := tx.q().ExecContext(ctx, ` + update cluster_memberships + set role = case when thread_id = ? then 'canonical' else 'member' end, + updated_at = ? + where cluster_id = ? and state = 'active' + `, threadID, now, clusterID); err != nil { + return fmt.Errorf("set canonical member roles: %w", err) + } + update, err := tx.q().ExecContext(ctx, ` + update cluster_groups + set representative_thread_id = ?, updated_at = ? + where repo_id = ? and id = ? + `, threadID, now, repoID, clusterID) + if err != nil { + return fmt.Errorf("set cluster canonical: %w", err) + } + if affected, err := update.RowsAffected(); err == nil && affected == 0 { + return fmt.Errorf("cluster %d was not found", clusterID) + } + if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and action = 'canonical'`, clusterID); err != nil { + return fmt.Errorf("clear canonical overrides: %w", err) + } + if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action = 'exclude'`, clusterID, threadID); err != nil { + return fmt.Errorf("clear exclude override: %w", err) + } + if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "canonical", reason, now); err != nil { + return err + } + result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "canonical", Reason: reason} + return nil + }) + if err != nil { + return ClusterMemberOverride{}, err + } + return result, nil +} + +func (s *Store) clusterMemberThreadID(ctx context.Context, repoID, clusterID int64, number int, requireActive bool) (int64, error) { + where := `cg.repo_id = ? and cg.id = ? and t.repo_id = ? and t.number = ?` + if requireActive { + where += ` and cm.state = 'active'` + } + row := s.q().QueryRowContext(ctx, ` + select t.id + from cluster_groups cg + join cluster_memberships cm on cm.cluster_id = cg.id + join threads t on t.id = cm.thread_id + where `+where+` + limit 1 + `, repoID, clusterID, repoID, number) + var threadID int64 + if err := row.Scan(&threadID); err != nil { + if err == sql.ErrNoRows { + if requireActive { + return 0, fmt.Errorf("active thread #%d is not in cluster %d", number, clusterID) + } + return 0, fmt.Errorf("thread #%d is not in cluster %d", number, clusterID) + } + return 0, fmt.Errorf("find cluster member: %w", err) + } + return threadID, nil +} + +func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, threadID int64, action, reason, now string) error { + if _, err := s.q().ExecContext(ctx, ` + insert into cluster_overrides(repo_id, cluster_id, thread_id, action, reason, created_at) + values(?, ?, ?, ?, ?, ?) + on conflict(cluster_id, thread_id, action) do update set + reason = excluded.reason, + created_at = excluded.created_at + `, repoID, clusterID, threadID, action, reason, now); err != nil { + return fmt.Errorf("record cluster override: %w", err) + } + return nil +} + +func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, clusterID int64, now string) error { + if _, err := s.q().ExecContext(ctx, ` + update cluster_groups + set representative_thread_id = ( + select cm.thread_id + from cluster_memberships cm + join threads t on t.id = cm.thread_id + where cm.cluster_id = cluster_groups.id and cm.state = 'active' + order by case cm.role when 'canonical' then 0 when 'representative' then 1 else 2 end, + coalesce(cm.score_to_representative, 0) desc, + t.number asc + limit 1 + ), + updated_at = ? + where repo_id = ? and id = ? + and ( + representative_thread_id is null + or representative_thread_id not in ( + select thread_id from cluster_memberships where cluster_id = ? and state = 'active' + ) + ) + `, now, repoID, clusterID, clusterID); err != nil { + return fmt.Errorf("refresh cluster representative: %w", err) + } + return nil +} + func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, includeClosed bool) (ClusterSummary, error) { where := `cg.repo_id = ? and cg.id = ?` args := []any{repoID, clusterID} diff --git a/internal/store/clusters_test.go b/internal/store/clusters_test.go index 9453a6f..226b266 100644 --- a/internal/store/clusters_test.go +++ b/internal/store/clusters_test.go @@ -121,3 +121,103 @@ func TestCloseAndReopenClusterLocally(t *testing.T) { t.Fatalf("reopened cluster not visible/cleared: %#v", active) } } + +func TestClusterMemberLocalOverrides(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db")) + if err != nil { + t.Fatalf("open store: %v", err) + } + defer st.Close() + + repoID, err := st.UpsertRepository(ctx, Repository{Owner: "openclaw", Name: "gitcrawl", FullName: "openclaw/gitcrawl", RawJSON: "{}", UpdatedAt: "2026-04-26T00:00:00Z"}) + if err != nil { + t.Fatalf("repo: %v", err) + } + firstID, err := st.UpsertThread(ctx, Thread{ + RepoID: repoID, GitHubID: "31", Number: 31, Kind: "issue", State: "open", + Title: "first member", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/31", + LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-31", UpdatedAt: "2026-04-26T00:00:00Z", + }) + if err != nil { + t.Fatalf("first thread: %v", err) + } + secondID, err := st.UpsertThread(ctx, Thread{ + RepoID: repoID, GitHubID: "32", Number: 32, Kind: "issue", State: "open", + Title: "second member", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/32", + LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-32", UpdatedAt: "2026-04-26T00:00:00Z", + }) + if err != nil { + t.Fatalf("second thread: %v", err) + } + if _, err := st.DB().ExecContext(ctx, ` + insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, representative_thread_id, title, created_at, updated_at) + values(30, ?, 'key-30', 'slug-30', 'active', ?, 'Cluster title', '2026-04-26T00:00:00Z', '2026-04-26T00:00:01Z') + `, repoID, firstID); err != nil { + t.Fatalf("seed cluster: %v", err) + } + if _, err := st.DB().ExecContext(ctx, ` + insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at) + values(30, ?, 'representative', 'active', 'system', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z') + `, firstID); err != nil { + t.Fatalf("seed first member: %v", err) + } + if _, err := st.DB().ExecContext(ctx, ` + insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at) + values(30, ?, 'member', 'active', 'system', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z') + `, secondID); err != nil { + t.Fatalf("seed second member: %v", err) + } + + excluded, err := st.ExcludeClusterMemberLocally(ctx, repoID, 30, 31, "not related") + if err != nil { + t.Fatalf("exclude member: %v", err) + } + if excluded.ThreadID != firstID || excluded.Action != "exclude" { + t.Fatalf("unexpected exclude result: %#v", excluded) + } + detail, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail after exclude: %v", err) + } + if len(detail.Members) != 1 || detail.Members[0].Thread.Number != 32 || detail.Cluster.RepresentativeThreadID != secondID { + t.Fatalf("excluded member should be hidden and representative refreshed: %#v", detail) + } + + included, err := st.IncludeClusterMemberLocally(ctx, repoID, 30, 31, "belongs here") + if err != nil { + t.Fatalf("include member: %v", err) + } + if included.ThreadID != firstID || included.Action != "include" { + t.Fatalf("unexpected include result: %#v", included) + } + detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail after include: %v", err) + } + if len(detail.Members) != 2 { + t.Fatalf("included member should be visible again: %#v", detail) + } + + canonical, err := st.SetClusterCanonicalLocally(ctx, repoID, 30, 31, "best duplicate") + if err != nil { + t.Fatalf("set canonical: %v", err) + } + if canonical.ThreadID != firstID || canonical.Action != "canonical" { + t.Fatalf("unexpected canonical result: %#v", canonical) + } + detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail after canonical: %v", err) + } + if detail.Cluster.RepresentativeThreadID != firstID || detail.Members[0].Thread.Number != 31 || detail.Members[0].Role != "canonical" { + t.Fatalf("canonical member should become representative and sort first: %#v", detail) + } + var excludeOverrides int + if err := st.DB().QueryRowContext(ctx, `select count(*) from cluster_overrides where cluster_id = 30 and action = 'exclude'`).Scan(&excludeOverrides); err != nil { + t.Fatalf("count exclude overrides: %v", err) + } + if excludeOverrides != 0 { + t.Fatalf("include/canonical should clear stale exclude overrides, got %d", excludeOverrides) + } +}