diff --git a/internal/store/clusters.go b/internal/store/clusters.go index ff4e7db..aa6c385 100644 --- a/internal/store/clusters.go +++ b/internal/store/clusters.go @@ -62,6 +62,26 @@ type ClusterMemberOverride struct { Reason string `json:"reason,omitempty"` } +type DurableClusterInput struct { + StableKey string + StableSlug string + RepresentativeThreadID int64 + Title string + Members []DurableClusterMemberInput +} + +type DurableClusterMemberInput struct { + ThreadID int64 + Role string + ScoreToRepresentative *float64 +} + +type SaveDurableClustersResult struct { + RunID int64 `json:"run_id"` + ClusterCount int `json:"cluster_count"` + MemberCount int `json:"member_count"` +} + func (s *Store) ListClusterSummaries(ctx context.Context, options ClusterSummaryOptions) ([]ClusterSummary, error) { where := `cg.repo_id = ?` args := []any{options.RepoID} @@ -293,6 +313,75 @@ func (s *Store) ReopenClusterLocally(ctx context.Context, repoID, clusterID int6 return nil } +func (s *Store) SaveDurableClusters(ctx context.Context, repoID int64, inputs []DurableClusterInput) (SaveDurableClustersResult, error) { + if repoID <= 0 { + return SaveDurableClustersResult{}, fmt.Errorf("repo id must be positive") + } + now := time.Now().UTC().Format(timeLayout) + result := SaveDurableClustersResult{ClusterCount: len(inputs)} + err := s.WithTx(ctx, func(tx *Store) error { + runID, err := tx.insertClusterRun(ctx, repoID, now) + if err != nil { + return err + } + result.RunID = runID + for _, input := range inputs { + clusterID, err := tx.upsertDurableCluster(ctx, repoID, runID, input, now) + if err != nil { + return err + } + memberIDs := make([]int64, 0, len(input.Members)) + for _, member := range input.Members { + if member.ThreadID <= 0 { + return fmt.Errorf("cluster %q has invalid member thread id", input.StableKey) + } + role := strings.TrimSpace(member.Role) + if role == "" { + role = "member" + } + if _, err := tx.q().ExecContext(ctx, ` + insert into cluster_memberships( + cluster_id, thread_id, role, state, score_to_representative, + first_seen_run_id, last_seen_run_id, added_by, added_reason_json, created_at, updated_at + ) + values(?, ?, ?, 'active', ?, ?, ?, 'cluster', '{}', ?, ?) + on conflict(cluster_id, thread_id) do update set + role = excluded.role, + state = 'active', + score_to_representative = excluded.score_to_representative, + last_seen_run_id = excluded.last_seen_run_id, + removed_by = null, + removed_reason_json = null, + removed_at = null, + updated_at = excluded.updated_at + `, clusterID, member.ThreadID, role, nullableFloat(member.ScoreToRepresentative), runID, runID, now, now); err != nil { + return fmt.Errorf("upsert durable cluster member: %w", err) + } + memberIDs = append(memberIDs, member.ThreadID) + result.MemberCount++ + } + if err := tx.markMissingClusterMembersRemoved(ctx, clusterID, memberIDs, now); err != nil { + return err + } + if err := tx.applyClusterOverrides(ctx, repoID, clusterID, now); err != nil { + return err + } + } + if _, err := tx.q().ExecContext(ctx, ` + update cluster_runs + set finished_at = ?, stats_json = ? + where id = ? + `, now, fmt.Sprintf(`{"cluster_count":%d,"member_count":%d}`, result.ClusterCount, result.MemberCount), runID); err != nil { + return fmt.Errorf("finish cluster run: %w", err) + } + return nil + }) + if err != nil { + return SaveDurableClustersResult{}, err + } + return result, nil +} + func (s *Store) ExcludeClusterMemberLocally(ctx context.Context, repoID, clusterID int64, number int, reason string) (ClusterMemberOverride, error) { if repoID <= 0 { return ClusterMemberOverride{}, fmt.Errorf("repo id must be positive") @@ -477,6 +566,81 @@ func (s *Store) clusterMemberThreadID(ctx context.Context, repoID, clusterID int return threadID, nil } +func (s *Store) insertClusterRun(ctx context.Context, repoID int64, now string) (int64, error) { + var runID int64 + if err := s.q().QueryRowContext(ctx, ` + insert into cluster_runs(repo_id, scope, status, started_at) + values(?, 'durable', 'success', ?) + returning id + `, repoID, now).Scan(&runID); err != nil { + return 0, fmt.Errorf("insert cluster run: %w", err) + } + return runID, nil +} + +func (s *Store) upsertDurableCluster(ctx context.Context, repoID, runID int64, input DurableClusterInput, now string) (int64, error) { + stableKey := strings.TrimSpace(input.StableKey) + if stableKey == "" { + return 0, fmt.Errorf("durable cluster stable key is required") + } + stableSlug := strings.TrimSpace(input.StableSlug) + if stableSlug == "" { + stableSlug = stableKey + } + var clusterID int64 + if err := s.q().QueryRowContext(ctx, ` + insert into cluster_groups( + repo_id, stable_key, stable_slug, status, cluster_type, representative_thread_id, title, created_at, updated_at + ) + values(?, ?, ?, 'active', 'similarity', ?, ?, ?, ?) + on conflict(repo_id, stable_key) do update set + stable_slug = excluded.stable_slug, + cluster_type = excluded.cluster_type, + representative_thread_id = case + when cluster_groups.status = 'closed' then cluster_groups.representative_thread_id + else excluded.representative_thread_id + end, + title = excluded.title, + updated_at = excluded.updated_at + returning id + `, repoID, stableKey, stableSlug, nullInt(input.RepresentativeThreadID), nullString(input.Title), now, now).Scan(&clusterID); err != nil { + return 0, fmt.Errorf("upsert durable cluster: %w", err) + } + if _, err := s.q().ExecContext(ctx, ` + insert into cluster_events(cluster_id, run_id, event_type, actor_kind, payload_json, created_at) + values(?, ?, 'seen', 'cluster', '{}', ?) + `, clusterID, runID, now); err != nil { + return 0, fmt.Errorf("record durable cluster event: %w", err) + } + return clusterID, nil +} + +func (s *Store) markMissingClusterMembersRemoved(ctx context.Context, clusterID int64, memberIDs []int64, now string) error { + if len(memberIDs) == 0 { + return nil + } + placeholders := make([]string, 0, len(memberIDs)) + args := []any{`{"reason":"not seen in latest cluster run"}`, now, now, clusterID} + for _, id := range memberIDs { + placeholders = append(placeholders, "?") + args = append(args, id) + } + if _, err := s.q().ExecContext(ctx, ` + update cluster_memberships + set state = 'removed', + removed_by = 'cluster', + removed_reason_json = ?, + removed_at = ?, + updated_at = ? + where cluster_id = ? + and thread_id not in (`+strings.Join(placeholders, ",")+`) + and state = 'active' + `, args...); err != nil { + return fmt.Errorf("mark missing cluster members removed: %w", err) + } + return nil +} + func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, threadID int64, action, reason, now string) error { if _, err := s.q().ExecContext(ctx, ` insert into cluster_overrides(repo_id, cluster_id, thread_id, action, reason, created_at) @@ -490,6 +654,63 @@ func (s *Store) upsertClusterOverride(ctx context.Context, repoID, clusterID, th return nil } +func (s *Store) applyClusterOverrides(ctx context.Context, repoID, clusterID int64, now string) error { + if _, err := s.q().ExecContext(ctx, ` + update cluster_memberships + set state = 'excluded', + removed_by = 'local', + removed_reason_json = coalesce(removed_reason_json, '{"reason":"local override"}'), + removed_at = coalesce(removed_at, ?), + updated_at = ? + where cluster_id = ? + and thread_id in (select thread_id from cluster_overrides where repo_id = ? and cluster_id = ? and action = 'exclude') + `, now, now, clusterID, repoID, clusterID); err != nil { + return fmt.Errorf("apply exclude overrides: %w", err) + } + if _, err := s.q().ExecContext(ctx, ` + update cluster_memberships + set state = 'active', + removed_by = null, + removed_reason_json = null, + removed_at = null, + updated_at = ? + where cluster_id = ? + and thread_id in (select thread_id from cluster_overrides where repo_id = ? and cluster_id = ? and action = 'include') + `, now, clusterID, repoID, clusterID); err != nil { + return fmt.Errorf("apply include overrides: %w", err) + } + var canonicalThreadID sql.NullInt64 + err := s.q().QueryRowContext(ctx, ` + select thread_id + from cluster_overrides + where repo_id = ? and cluster_id = ? and action = 'canonical' + order by created_at desc, id desc + limit 1 + `, repoID, clusterID).Scan(&canonicalThreadID) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("find canonical override: %w", err) + } + if canonicalThreadID.Valid { + if _, err := s.q().ExecContext(ctx, ` + update cluster_memberships + set role = case when thread_id = ? then 'canonical' else 'member' end, + updated_at = ? + where cluster_id = ? and state = 'active' + `, canonicalThreadID.Int64, now, clusterID); err != nil { + return fmt.Errorf("apply canonical override role: %w", err) + } + if _, err := s.q().ExecContext(ctx, ` + update cluster_groups + set representative_thread_id = ?, updated_at = ? + where repo_id = ? and id = ? + `, canonicalThreadID.Int64, now, repoID, clusterID); err != nil { + return fmt.Errorf("apply canonical override representative: %w", err) + } + return nil + } + return s.ensureActiveClusterRepresentative(ctx, repoID, clusterID, now) +} + func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, clusterID int64, now string) error { if _, err := s.q().ExecContext(ctx, ` update cluster_groups @@ -517,6 +738,17 @@ func (s *Store) ensureActiveClusterRepresentative(ctx context.Context, repoID, c return nil } +func nullInt(value int64) sql.NullInt64 { + return sql.NullInt64{Int64: value, Valid: value != 0} +} + +func nullableFloat(value *float64) sql.NullFloat64 { + if value == nil { + return sql.NullFloat64{} + } + return sql.NullFloat64{Float64: *value, Valid: true} +} + func (s *Store) clusterSummaryByID(ctx context.Context, repoID, clusterID int64, includeClosed bool) (ClusterSummary, error) { where := `cg.repo_id = ? and cg.id = ?` args := []any{repoID, clusterID} diff --git a/internal/store/clusters_test.go b/internal/store/clusters_test.go index 226b266..02b97b2 100644 --- a/internal/store/clusters_test.go +++ b/internal/store/clusters_test.go @@ -221,3 +221,82 @@ func TestClusterMemberLocalOverrides(t *testing.T) { t.Fatalf("include/canonical should clear stale exclude overrides, got %d", excludeOverrides) } } + +func TestSaveDurableClustersAppliesLocalOverrides(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db")) + if err != nil { + t.Fatalf("open store: %v", err) + } + defer st.Close() + + repoID, err := st.UpsertRepository(ctx, Repository{Owner: "openclaw", Name: "gitcrawl", FullName: "openclaw/gitcrawl", RawJSON: "{}", UpdatedAt: "2026-04-26T00:00:00Z"}) + if err != nil { + t.Fatalf("repo: %v", err) + } + firstID, err := st.UpsertThread(ctx, Thread{ + RepoID: repoID, GitHubID: "41", Number: 41, Kind: "issue", State: "open", + Title: "first duplicate", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/41", + LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-41", UpdatedAt: "2026-04-26T00:00:00Z", + }) + if err != nil { + t.Fatalf("first thread: %v", err) + } + secondID, err := st.UpsertThread(ctx, Thread{ + RepoID: repoID, GitHubID: "42", Number: 42, Kind: "issue", State: "open", + Title: "second duplicate", HTMLURL: "https://github.com/openclaw/gitcrawl/issues/42", + LabelsJSON: "[]", AssigneesJSON: "[]", RawJSON: "{}", ContentHash: "hash-42", UpdatedAt: "2026-04-26T00:00:00Z", + }) + if err != nil { + t.Fatalf("second thread: %v", err) + } + score := 0.93 + input := DurableClusterInput{ + StableKey: "members:41,42", + StableSlug: "cluster-4142", + RepresentativeThreadID: firstID, + Title: "first duplicate", + Members: []DurableClusterMemberInput{ + {ThreadID: firstID, Role: "representative"}, + {ThreadID: secondID, Role: "member", ScoreToRepresentative: &score}, + }, + } + result, err := st.SaveDurableClusters(ctx, repoID, []DurableClusterInput{input}) + if err != nil { + t.Fatalf("save durable clusters: %v", err) + } + if result.ClusterCount != 1 || result.MemberCount != 2 || result.RunID == 0 { + t.Fatalf("unexpected save result: %#v", result) + } + detail, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: 1, IncludeClosed: false, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail after save: %v", err) + } + if detail.Cluster.StableSlug != "cluster-4142" || len(detail.Members) != 2 { + t.Fatalf("unexpected saved cluster detail: %#v", detail) + } + + if _, err := st.ExcludeClusterMemberLocally(ctx, repoID, detail.Cluster.ID, 41, "not related"); err != nil { + t.Fatalf("exclude member: %v", err) + } + if _, err := st.SetClusterCanonicalLocally(ctx, repoID, detail.Cluster.ID, 42, "best issue"); err != nil { + t.Fatalf("set canonical: %v", err) + } + if _, err := st.SaveDurableClusters(ctx, repoID, []DurableClusterInput{input}); err != nil { + t.Fatalf("resave durable clusters: %v", err) + } + detail, err = st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: detail.Cluster.ID, IncludeClosed: false, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail after overrides: %v", err) + } + if len(detail.Members) != 1 || detail.Members[0].Thread.ID != secondID || detail.Members[0].Role != "canonical" || detail.Cluster.RepresentativeThreadID != secondID { + t.Fatalf("saved cluster should replay local overrides: %#v", detail) + } + all, err := st.ClusterDetail(ctx, ClusterDetailOptions{RepoID: repoID, ClusterID: detail.Cluster.ID, IncludeClosed: true, MemberLimit: 10}) + if err != nil { + t.Fatalf("cluster detail including excluded: %v", err) + } + if len(all.Members) != 2 || all.Members[1].State != "excluded" { + t.Fatalf("excluded member should remain visible with include closed: %#v", all) + } +}