feat: persist durable clusters

This commit is contained in:
Vincent Koc 2026-04-27 14:45:51 -07:00
parent 2d37f9ec5a
commit 3da9d30dfd
No known key found for this signature in database
2 changed files with 311 additions and 0 deletions

View File

@ -62,6 +62,26 @@ type ClusterMemberOverride struct {
Reason string `json:"reason,omitempty"`
}
type DurableClusterInput struct {
StableKey string
StableSlug string
RepresentativeThreadID int64
Title string
Members []DurableClusterMemberInput
}
type DurableClusterMemberInput struct {
ThreadID int64
Role string
ScoreToRepresentative *float64
}
type SaveDurableClustersResult struct {
RunID int64 `json:"run_id"`
ClusterCount int `json:"cluster_count"`
MemberCount int `json:"member_count"`
}
func (s *Store) ListClusterSummaries(ctx context.Context, options ClusterSummaryOptions) ([]ClusterSummary, error) {
where := `cg.repo_id = ?`
args := []any{options.RepoID}
@ -293,6 +313,75 @@ func (s *Store) ReopenClusterLocally(ctx context.Context, repoID, clusterID int6
return nil
}
func (s *Store) SaveDurableClusters(ctx context.Context, repoID int64, inputs []DurableClusterInput) (SaveDurableClustersResult, error) {
if repoID <= 0 {
return SaveDurableClustersResult{}, fmt.Errorf("repo id must be positive")
}
now := time.Now().UTC().Format(timeLayout)
result := SaveDurableClustersResult{ClusterCount: len(inputs)}
err := s.WithTx(ctx, func(tx *Store) error {
runID, err := tx.insertClusterRun(ctx, repoID, now)
if err != nil {
return err
}
result.RunID = runID
for _, input := range inputs {
clusterID, err := tx.upsertDurableCluster(ctx, repoID, runID, input, now)
if err != nil {
return err
}
memberIDs := make([]int64, 0, len(input.Members))
for _, member := range input.Members {
if member.ThreadID <= 0 {
return fmt.Errorf("cluster %q has invalid member thread id", input.StableKey)
}
role := strings.TrimSpace(member.Role)
if role == "" {
role = "member"
}
if _, err := tx.q().ExecContext(ctx, `
insert into cluster_memberships(
cluster_id, thread_id, role, state, score_to_representative,
first_seen_run_id, last_seen_run_id, added_by, added_reason_json, created_at, updated_at
)
values(?, ?, ?, 'active', ?, ?, ?, 'cluster', '{}', ?, ?)
on conflict(cluster_id, thread_id) do update set
role = excluded.role,
state = 'active',
score_to_representative = excluded.score_to_representative,
last_seen_run_id = excluded.last_seen_run_id,
removed_by = null,
removed_reason_json = null,
removed_at = null,
updated_at = excluded.updated_at
`, clusterID, member.ThreadID, role, nullableFloat(member.ScoreToRepresentative), runID, runID, now, now); err != nil {
return fmt.Errorf("upsert durable cluster member: %w", err)
}
memberIDs = append(memberIDs, member.ThreadID)
result.MemberCount++
}
if err := tx.markMissingClusterMembersRemoved(ctx, clusterID, memberIDs, now); err != nil {
return err
}
if err := tx.applyClusterOverrides(ctx, repoID, clusterID, now); err != nil {
return err
}
}
if _, err := tx.q().ExecContext(ctx, `
update cluster_runs
set finished_at = ?, stats_json = ?
where id = ?
`, now, fmt.Sprintf(`{"cluster_count":%d,"member_count":%d}`, result.ClusterCount, result.MemberCount), runID); err != nil {
return fmt.Errorf("finish cluster run: %w", err)
}
return nil
})
if err != nil {
return SaveDurableClustersResult{}, err
}
return result, nil
}
func (s *Store) ExcludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) {
if repoID <= 0 {
return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive")
@ -477,6 +566,81 @@ func (s *Store) clusterMemberThreadID(ctx context.Context, repoID, clusterID int
return threadID, nil
}
func (s *Store) insertClusterRun(ctx context.Context, repoID int64, now string) (int64, error) {
var runID int64
if err := s.q().QueryRowContext(ctx, `
insert into cluster_runs(repo_id, scope, status, started_at)
values(?, 'durable', 'success', ?)
returning id
`, repoID, now).Scan(&runID); err != nil {
return 0, fmt.Errorf("insert cluster run: %w", err)
}
return runID, nil
}
func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, input DurableClusterInput, now string) (int64, error) {
stableKey := strings.TrimSpace(input.StableKey)
if stableKey == "" {
return 0, fmt.Errorf("durable cluster stable key is required")
}
stableSlug := strings.TrimSpace(input.StableSlug)
if stableSlug == "" {
stableSlug = stableKey
}
var clusterID int64
if err := s.q().QueryRowContext(ctx, `
insert into cluster_groups(
repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at
)
values(?, ?, ?, 'active', 'similarity', ?, ?, ?, ?)
on conflict(repo_id, stable_key) do update set
stable_slug = excluded.stable_slug,
cluster_type = excluded.cluster_type,
representative_thread_id = case
when cluster_groups.status = 'closed' then cluster_groups.representative_thread_id
else excluded.representative_thread_id
end,
title = excluded.title,
updated_at = excluded.updated_at
returning id
`, repoID, stableKey, stableSlug, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil {
return 0, fmt.Errorf("upsert durable cluster: %w", err)
}
if _, err := s.q().ExecContext(ctx, `
insert into cluster_events(cluster_id, run_id, event_type, actor_kind, payload_json, created_at)
values(?, ?, 'seen', 'cluster', '{}', ?)
`, clusterID, runID, now); err != nil {
return 0, fmt.Errorf("record durable cluster event: %w", err)
}
return clusterID, nil
}
func (s *Store) markMissingClusterMembersRemoved(ctx context.Context, clusterID int64, memberIDs []int64, now string) error {
if len(memberIDs) == 0 {
return nil
}
placeholders := make([]string, 0, len(memberIDs))
args := []any{`{"reason":"not seen in latest cluster run"}`, now, now, clusterID}
for _, id := range memberIDs {
placeholders = append(placeholders, "?")
args = append(args, id)
}
if _, err := s.q().ExecContext(ctx, `
update cluster_memberships
set state = 'removed',
removed_by = 'cluster',
removed_reason_json = ?,
removed_at = ?,
updated_at = ?
where cluster_id = ?
and thread_id not in (`+strings.Join(placeholders, ",")+`)
and state = 'active'
`, args...); err != nil {
return fmt.Errorf("mark missing cluster members removed: %w", err)
}
return nil
}
func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, threadID int64, action, reason, now string) error {
if _, err := s.q().ExecContext(ctx, `
insert into cluster_overrides(repo_id, cluster_id, thread_id, action, reason, created_at)
@ -490,6 +654,63 @@ func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, th
return nil
}
func (s *Store) applyClusterOverrides(ctx context.Context, repoID, clusterID int64, now string) error {
if _, err := s.q().ExecContext(ctx, `
update cluster_memberships
set state = 'excluded',
removed_by = 'local',
removed_reason_json = coalesce(removed_reason_json, '{"reason":"local override"}'),
removed_at = coalesce(removed_at, ?),
updated_at = ?
where cluster_id = ?
and thread_id in (select thread_id from cluster_overrides where repo_id = ? and cluster_id = ? and action = 'exclude')
`, now, now, clusterID, repoID, clusterID); err != nil {
return fmt.Errorf("apply exclude overrides: %w", err)
}
if _, err := s.q().ExecContext(ctx, `
update cluster_memberships
set state = 'active',
removed_by = null,
removed_reason_json = null,
removed_at = null,
updated_at = ?
where cluster_id = ?
and thread_id in (select thread_id from cluster_overrides where repo_id = ? and cluster_id = ? and action = 'include')
`, now, clusterID, repoID, clusterID); err != nil {
return fmt.Errorf("apply include overrides: %w", err)
}
var canonicalThreadID sql.NullInt64
err := s.q().QueryRowContext(ctx, `
select thread_id
from cluster_overrides
where repo_id = ? and cluster_id = ? and action = 'canonical'
order by created_at desc, id desc
limit 1
`, repoID, clusterID).Scan(&canonicalThreadID)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("find canonical override: %w", err)
}
if canonicalThreadID.Valid {
if _, err := s.q().ExecContext(ctx, `
update cluster_memberships
set role = case when thread_id = ? then 'canonical' else 'member' end,
updated_at = ?
where cluster_id = ? and state = 'active'
`, canonicalThreadID.Int64, now, clusterID); err != nil {
return fmt.Errorf("apply canonical override role: %w", err)
}
if _, err := s.q().ExecContext(ctx, `
update cluster_groups
set representative_thread_id = ?, updated_at = ?
where repo_id = ? and id = ?
`, canonicalThreadID.Int64, now, repoID, clusterID); err != nil {
return fmt.Errorf("apply canonical override representative: %w", err)
}
return nil
}
return s.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now)
}
func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, clusterID int64, now string) error {
if _, err := s.q().ExecContext(ctx, `
update cluster_groups
@ -517,6 +738,17 @@ func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, c
return nil
}
func nullInt(value int64) sql.NullInt64 {
return sql.NullInt64{Int64: value, Valid: value != 0}
}
func nullableFloat(value *float64) sql.NullFloat64 {
if value == nil {
return sql.NullFloat64{}
}
return sql.NullFloat64{Float64: *value, Valid: true}
}
func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, includeClosed bool) (ClusterSummary, error) {
where := `cg.repo_id = ? and cg.id = ?`
args := []any{repoID, clusterID}

View File

@ -221,3 +221,82 @@ func TestClusterMemberLocalOverrides(t *testing.T) {
t.Fatalf("include/canonical should clear stale exclude overrides, got %d", excludeOverrides)
}
}
func TestSaveDurableClustersAppliesLocalOverrides(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
if err != nil {
t.Fatalf("open store: %v", err)
}
defer st.Close()
repoID, err := st.UpsertRepository(ctx, Repository{Owner: "openclaw", Name: "gitcrawl", FullName: "openclaw/gitcrawl", RawJSON: "{}", UpdatedAt: "2026-04-26T00:00:00Z"})
if err != nil {
t.Fatalf("repo: %v", err)
}
firstID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "41", Number: 41, Kind: "issue", State: "open",
Title: "first duplicate", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/41",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-41", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("first thread: %v", err)
}
secondID, err := st.UpsertThread(ctx, Thread{
RepoID: repoID, GitHubID: "42", Number: 42, Kind: "issue", State: "open",
Title: "second duplicate", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/42",
LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-42", UpdatedAt: "2026-04-26T00:00:00Z",
})
if err != nil {
t.Fatalf("second thread: %v", err)
}
score := 0.93
input := DurableClusterInput{
StableKey: "members:41,42",
StableSlug: "cluster-4142",
RepresentativeThreadID: firstID,
Title: "first duplicate",
Members: []DurableClusterMemberInput{
{ThreadID: firstID, Role: "representative"},
{ThreadID: secondID, Role: "member", ScoreToRepresentative: &score},
},
}
result, err := st.SaveDurableClusters(ctx, repoID, []DurableClusterInput{input})
if err != nil {
t.Fatalf("save durable clusters: %v", err)
}
if result.ClusterCount != 1 || result.MemberCount != 2 || result.RunID == 0 {
t.Fatalf("unexpected save result: %#v", result)
}
detail, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 1, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail after save: %v", err)
}
if detail.Cluster.StableSlug != "cluster-4142" || len(detail.Members) != 2 {
t.Fatalf("unexpected saved cluster detail: %#v", detail)
}
if _, err := st.ExcludeClusterMemberLocally(ctx, repoID, detail.Cluster.ID, 41, "not related"); err != nil {
t.Fatalf("exclude member: %v", err)
}
if _, err := st.SetClusterCanonicalLocally(ctx, repoID, detail.Cluster.ID, 42, "best issue"); err != nil {
t.Fatalf("set canonical: %v", err)
}
if _, err := st.SaveDurableClusters(ctx, repoID, []DurableClusterInput{input}); err != nil {
t.Fatalf("resave durable clusters: %v", err)
}
detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: detail.Cluster.ID, IncludeClosed: false, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail after overrides: %v", err)
}
if len(detail.Members) != 1 || detail.Members[0].Thread.ID != secondID || detail.Members[0].Role != "canonical" || detail.Cluster.RepresentativeThreadID != secondID {
t.Fatalf("saved cluster should replay local overrides: %#v", detail)
}
all, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: detail.Cluster.ID, IncludeClosed: true, MemberLimit: 10})
if err != nil {
t.Fatalf("cluster detail including excluded: %v", err)
}
if len(all.Members) != 2 || all.Members[1].State != "excluded" {
t.Fatalf("excluded member should remain visible with include closed: %#v", all)
}
}