feat: add local cluster member overrides

This commit is contained in:
Vincent Koc 2026-04-27 14:25:30 -07:00
parent 5283c85776
commit e7d3c327fd
No known key found for this signature in database
2 changed files with 333 additions and 0 deletions

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
@ -53,6 +54,14 @@ type ClusterDetail struct {
Members []ClusterMemberDetail `json:"members"`
}
type ClusterMemberOverride struct {
ClusterID int64 `json:"cluster_id"`
ThreadID int64 `json:"thread_id"`
Number int `json:"number"`
Action string `json:"action"`
Reason string `json:"reason,omitempty"`
}
func (s *Store) ListClusterSummaries(ctx context.Context, options ClusterSummaryOptions) ([]ClusterSummary, error) {
where := `cg.repo_id = ?`
args := []any{options.RepoID}
@ -284,6 +293,230 @@ func (s *Store) ReopenClusterLocally(ctx context.Context, repoID, clusterID int6
return nil
}
func (s *Store) ExcludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) {
if repoID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive")
}
if clusterID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive")
}
if number <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive")
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "local exclude"
}
var result ClusterMemberOverride
err := s.WithTx(ctx, func(tx *Store) error {
threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, false)
if err != nil {
return err
}
now := time.Now().UTC().Format(timeLayout)
reasonJSON, err := json.Marshal(map[string]string{"reason": reason})
if err != nil {
return fmt.Errorf("encode override reason: %w", err)
}
if _, err := tx.q().ExecContext(ctx, `
update cluster_memberships
set state = 'excluded', removed_by = 'local', removed_reason_json = ?, removed_at = ?, updated_at = ?
where cluster_id = ? and thread_id = ?
`, string(reasonJSON), now, now, clusterID, threadID); err != nil {
return fmt.Errorf("exclude cluster member: %w", err)
}
if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action in ('include', 'canonical')`, clusterID, threadID); err != nil {
return fmt.Errorf("clear stale member overrides: %w", err)
}
if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "exclude", reason, now); err != nil {
return err
}
if err := tx.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now); err != nil {
return err
}
result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "exclude", Reason: reason}
return nil
})
if err != nil {
return ClusterMemberOverride{}, err
}
return result, nil
}
func (s *Store) IncludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) {
if repoID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive")
}
if clusterID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive")
}
if number <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive")
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "local include"
}
var result ClusterMemberOverride
err := s.WithTx(ctx, func(tx *Store) error {
threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, false)
if err != nil {
return err
}
now := time.Now().UTC().Format(timeLayout)
update, err := tx.q().ExecContext(ctx, `
update cluster_memberships
set state = 'active', removed_by = null, removed_reason_json = null, removed_at = null, updated_at = ?
where cluster_id = ? and thread_id = ?
`, now, clusterID, threadID)
if err != nil {
return fmt.Errorf("include cluster member: %w", err)
}
if affected, err := update.RowsAffected(); err == nil && affected == 0 {
return fmt.Errorf("thread #%d is not in cluster %d", number, clusterID)
}
if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action = 'exclude'`, clusterID, threadID); err != nil {
return fmt.Errorf("clear exclude override: %w", err)
}
if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "include", reason, now); err != nil {
return err
}
if err := tx.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now); err != nil {
return err
}
result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "include", Reason: reason}
return nil
})
if err != nil {
return ClusterMemberOverride{}, err
}
return result, nil
}
func (s *Store) SetClusterCanonicalLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) {
if repoID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive")
}
if clusterID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("cluster id must be positive")
}
if number <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("thread number must be positive")
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "local canonical"
}
var result ClusterMemberOverride
err := s.WithTx(ctx, func(tx *Store) error {
threadID, err := tx.clusterMemberThreadID(ctx, repoID, clusterID, number, true)
if err != nil {
return err
}
now := time.Now().UTC().Format(timeLayout)
if _, err := tx.q().ExecContext(ctx, `
update cluster_memberships
set role = case when thread_id = ? then 'canonical' else 'member' end,
updated_at = ?
where cluster_id = ? and state = 'active'
`, threadID, now, clusterID); err != nil {
return fmt.Errorf("set canonical member roles: %w", err)
}
update, err := tx.q().ExecContext(ctx, `
update cluster_groups
set representative_thread_id = ?, updated_at = ?
where repo_id = ? and id = ?
`, threadID, now, repoID, clusterID)
if err != nil {
return fmt.Errorf("set cluster canonical: %w", err)
}
if affected, err := update.RowsAffected(); err == nil && affected == 0 {
return fmt.Errorf("cluster %d was not found", clusterID)
}
if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and action = 'canonical'`, clusterID); err != nil {
return fmt.Errorf("clear canonical overrides: %w", err)
}
if _, err := tx.q().ExecContext(ctx, `delete from cluster_overrides where cluster_id = ? and thread_id = ? and action = 'exclude'`, clusterID, threadID); err != nil {
return fmt.Errorf("clear exclude override: %w", err)
}
if err := tx.upsertClusterOverride(ctx, repoID, clusterID, threadID, "canonical", reason, now); err != nil {
return err
}
result = ClusterMemberOverride{ClusterID: clusterID, ThreadID: threadID, Number: number, Action: "canonical", Reason: reason}
return nil
})
if err != nil {
return ClusterMemberOverride{}, err
}
return result, nil
}
func (s *Store) clusterMemberThreadID(ctx context.Context, repoID, clusterID int64, number int, requireActive bool) (int64, error) {
where := `cg.repo_id = ? and cg.id = ? and t.repo_id = ? and t.number = ?`
if requireActive {
where += ` and cm.state = 'active'`
}
row := s.q().QueryRowContext(ctx, `
select t.id
from cluster_groups cg
join cluster_memberships cm on cm.cluster_id = cg.id
join threads t on t.id = cm.thread_id
where `+where+`
limit 1
`, repoID, clusterID, repoID, number)
var threadID int64
if err := row.Scan(&threadID); err != nil {
if err == sql.ErrNoRows {
if requireActive {
return 0, fmt.Errorf("active thread #%d is not in cluster %d", number, clusterID)
}
return 0, fmt.Errorf("thread #%d is not in cluster %d", number, clusterID)
}
return 0, fmt.Errorf("find cluster member: %w", err)
}
return threadID, nil
}
func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, threadID int64, action, reason, now string) error {
if _, err := s.q().ExecContext(ctx, `
insert into cluster_overrides(repo_id, cluster_id, thread_id, action, reason, created_at)
values(?, ?, ?, ?, ?, ?)
on conflict(cluster_id, thread_id, action) do update set
reason = excluded.reason,
created_at = excluded.created_at
`, repoID, clusterID, threadID, action, reason, now); err != nil {
return fmt.Errorf("record cluster override: %w", err)
}
return nil
}
func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, clusterID int64, now string) error {
if _, err := s.q().ExecContext(ctx, `
update cluster_groups
set representative_thread_id = (
select cm.thread_id
from cluster_memberships cm
join threads t on t.id = cm.thread_id
where cm.cluster_id = cluster_groups.id and cm.state = 'active'
order by case cm.role when 'canonical' then 0 when 'representative' then 1 else 2 end,
coalesce(cm.score_to_representative, 0) desc,
t.number asc
limit 1
),
updated_at = ?
where repo_id = ? and id = ?
and (
representative_thread_id is null
or representative_thread_id not in (
select thread_id from cluster_memberships where cluster_id = ? and state = 'active'
)
)
`, now, repoID, clusterID, clusterID); err != nil {
return fmt.Errorf("refresh cluster representative: %w", err)
}
return nil
}
func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, includeClosed bool) (ClusterSummary, error) {
where := `cg.repo_id = ? and cg.id = ?`
args := []any{repoID, clusterID}

View File

@ -121,3 +121,103 @@ func TestCloseAndReopenClusterLocally(t *testing.T) {
t.Fatalf("reopened cluster not visible/cleared: %#v", active)
}
}
func TestClusterMemberLocalOverrides(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, Repository{Owner: "openclaw", Name: "gitcrawl", FullName: "openclaw/gitcrawl", RawJSON: "{}", UpdatedAt: "2026-04-26T00:00:00Z"})
if err != nil {
t.Fatalf("repo: %v", err)
}
firstID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "31", Number: 31, Kind: "issue", State: "open",
Title: "first member", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/31",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-31", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("first thread: %v", err)
}
secondID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "32", Number: 32, Kind: "issue", State: "open",
Title: "second member", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/32",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-32", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("second thread: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_groups(id, repo_id, stable_key, stable_slug, status, representative_thread_id, title, created_at, updated_at)
values(30, ?, 'key-30', 'slug-30', 'active', ?, 'Cluster title', '2026-04-26T00:00:00Z', '2026-04-26T00:00:01Z')
`, repoID, firstID); err != nil {
t.Fatalf("seed cluster: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at)
values(30, ?, 'representative', 'active', 'system', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z')
`, firstID); err != nil {
t.Fatalf("seed first member: %v", err)
}
if _, err := st.DB().ExecContext(ctx, `
insert into cluster_memberships(cluster_id, thread_id, role, state, added_by, added_reason_json, created_at, updated_at)
values(30, ?, 'member', 'active', 'system', '{}', '2026-04-26T00:00:00Z', '2026-04-26T00:00:00Z')
`, secondID); err != nil {
t.Fatalf("seed second member: %v", err)
}
excluded, err := st.ExcludeClusterMemberLocally(ctx, repoID, 30, 31, "not related")
if err != nil {
t.Fatalf("exclude member: %v", err)
}
if excluded.ThreadID != firstID || excluded.Action != "exclude" {
t.Fatalf("unexpected exclude result: %#v", excluded)
}
detail, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail after exclude: %v", err)
}
if len(detail.Members) != 1 || detail.Members[0].Thread.Number != 32 || detail.Cluster.RepresentativeThreadID != secondID {
t.Fatalf("excluded member should be hidden and representative refreshed: %#v", detail)
}
included, err := st.IncludeClusterMemberLocally(ctx, repoID, 30, 31, "belongs here")
if err != nil {
t.Fatalf("include member: %v", err)
}
if included.ThreadID != firstID || included.Action != "include" {
t.Fatalf("unexpected include result: %#v", included)
}
detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail after include: %v", err)
}
if len(detail.Members) != 2 {
t.Fatalf("included member should be visible again: %#v", detail)
}
canonical, err := st.SetClusterCanonicalLocally(ctx, repoID, 30, 31, "best duplicate")
if err != nil {
t.Fatalf("set canonical: %v", err)
}
if canonical.ThreadID != firstID || canonical.Action != "canonical" {
t.Fatalf("unexpected canonical result: %#v", canonical)
}
detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 30, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail after canonical: %v", err)
}
if detail.Cluster.RepresentativeThreadID != firstID || detail.Members[0].Thread.Number != 31 || detail.Members[0].Role != "canonical" {
t.Fatalf("canonical member should become representative and sort first: %#v", detail)
}
var excludeOverrides int
if err := st.DB().QueryRowContext(ctx, `select count(*) from cluster_overrides where cluster_id = 30 and action = 'exclude'`).Scan(&excludeOverrides); err != nil {
t.Fatalf("count exclude overrides: %v", err)
}
if excludeOverrides != 0 {
t.Fatalf("include/canonical should clear stale exclude overrides, got %d", excludeOverrides)
}
}