fix(cluster): align gitcrawl sync and embedding parity
This commit is contained in:
parent
85ce5ced12
commit
832cd09dc0
@ -16,7 +16,7 @@ gitcrawl doctor
|
||||
gitcrawl sync owner/repo
|
||||
gitcrawl sync owner/repo --state open
|
||||
gitcrawl refresh owner/repo
|
||||
gitcrawl cluster owner/repo --threshold 0.82
|
||||
gitcrawl cluster owner/repo --threshold 0.80
|
||||
gitcrawl clusters owner/repo
|
||||
gitcrawl durable-clusters owner/repo
|
||||
gitcrawl cluster-detail owner/repo --id 123
|
||||
@ -35,9 +35,9 @@ gitcrawl tui owner/repo
|
||||
```
|
||||
|
||||
`gitcrawl clusters`, `gitcrawl durable-clusters`, and `gitcrawl tui` show active primary cluster memberships by default. Pass `--include-closed` to inspect closed rows and historical secondary memberships.
|
||||
`gitcrawl cluster` and `gitcrawl refresh` build bounded nearest-neighbor clusters by default (`--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`) and add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges also need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
|
||||
`gitcrawl cluster` and `gitcrawl refresh` build bounded nearest-neighbor clusters by default (`--threshold 0.80`, `--max-cluster-size 40`, `--k 16`, `--cross-kind-threshold 0.93`) and add deterministic GitHub reference evidence for direct issue/PR links such as `#123`, `issues/123`, and `pull/123`. Weak embedding edges also need concrete title-token overlap unless their similarity is already high, which keeps generic low-confidence bridges from forming unrelated clusters.
|
||||
`gitcrawl tui` infers the most recently updated local repository when `owner/repo` is omitted. `serve` is intentionally not part of `gitcrawl`.
|
||||
`gitcrawl sync` fetches issues and pull requests in every GitHub state by default. Pass `--state open` or `--state closed` to limit a sync to one state.
|
||||
`gitcrawl sync` fetches open issues and pull requests by default. Pass `--state all` or `--state closed` for explicit backfill workflows; incremental open syncs with `--since` also sweep recently closed items so local open state does not rot.
|
||||
The TUI starts at `--min-size 5` so maintainer-significant active clusters are visible first; pass `--min-size 1` to include singletons. Mouse support is built in: click rows, wheel panes, and right-click for copy, sort, filter, jump, link, neighbor, local close/reopen, and member triage actions. Press `a` to open the same action menu from the keyboard, `#` to jump directly to an issue or PR number, `p` to switch between repositories already present in the local store, or `n` to load neighbors for the selected issue or PR. Enter from the members pane also loads neighbors before opening detail. The TUI quietly refreshes from the local store every 15 seconds.
|
||||
|
||||
## Local Defaults
|
||||
|
||||
@ -32,6 +32,7 @@ const (
|
||||
defaultTUIWorkingSetLimit = 500
|
||||
defaultClusterMaxSize = 40
|
||||
defaultClusterFanout = 16
|
||||
defaultClusterThreshold = 0.80
|
||||
defaultCrossKindMinScore = 0.93
|
||||
highConfidenceEdgeScore = 0.90
|
||||
weakEdgeMinTitleOverlap = 0.18
|
||||
@ -248,9 +249,9 @@ func (a *App) runRefresh(ctx context.Context, args []string) error {
|
||||
includeComments := fs.Bool("include-comments", false, "hydrate comments during sync")
|
||||
fs.Bool("include-code", false, "accepted for compatibility; code hydration is not implemented yet")
|
||||
since := fs.String("since", "", "GitHub since timestamp")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
|
||||
limitRaw := fs.String("limit", "", "maximum sync or embedding rows")
|
||||
thresholdRaw := fs.String("threshold", "0.82", "minimum cluster cosine score")
|
||||
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cluster cosine score")
|
||||
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
|
||||
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
|
||||
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
|
||||
@ -545,7 +546,7 @@ func (a *App) runNeighbors(ctx context.Context, args []string) error {
|
||||
func (a *App) runCluster(ctx context.Context, args []string) error {
|
||||
fs := flag.NewFlagSet("cluster", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
thresholdRaw := fs.String("threshold", "0.82", "minimum cosine score")
|
||||
thresholdRaw := fs.String("threshold", fmt.Sprintf("%.2f", defaultClusterThreshold), "minimum cosine score")
|
||||
minSizeRaw := fs.String("min-size", "2", "minimum cluster member count")
|
||||
maxClusterSizeRaw := fs.String("max-cluster-size", strconv.Itoa(defaultClusterMaxSize), "maximum members per generated cluster")
|
||||
fanoutRaw := fs.String("k", strconv.Itoa(defaultClusterFanout), "nearest-neighbor fanout per thread")
|
||||
@ -726,7 +727,7 @@ func (a *App) embedRepository(ctx context.Context, owner, repoName string, optio
|
||||
if batchSize <= 0 {
|
||||
batchSize = 64
|
||||
}
|
||||
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL()})
|
||||
client := openai.New(openai.Options{APIKey: token.Value, BaseURL: openAIBaseURL(), Dimensions: rt.Config.OpenAI.EmbedDimensions})
|
||||
for start := 0; start < len(tasks); start += batchSize {
|
||||
end := start + batchSize
|
||||
if end > len(tasks) {
|
||||
@ -1484,7 +1485,7 @@ func (a *App) runSync(ctx context.Context, args []string) error {
|
||||
fs := flag.NewFlagSet("sync", flag.ContinueOnError)
|
||||
fs.SetOutput(io.Discard)
|
||||
since := fs.String("since", "", "GitHub since timestamp")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default all")
|
||||
state := fs.String("state", "", "GitHub issue state: open|closed|all; default open")
|
||||
limitRaw := fs.String("limit", "", "maximum issue/PR rows")
|
||||
jsonOut := fs.Bool("json", false, "write JSON output")
|
||||
includeComments := fs.Bool("include-comments", false, "hydrate issue comments, PR reviews, and PR review comments")
|
||||
|
||||
@ -32,11 +32,12 @@ type GitHubConfig struct {
|
||||
}
|
||||
|
||||
type OpenAIConfig struct {
|
||||
APIKeyEnv string `toml:"api_key_env"`
|
||||
SummaryModel string `toml:"summary_model"`
|
||||
EmbedModel string `toml:"embed_model"`
|
||||
BatchSize int `toml:"batch_size"`
|
||||
Concurrency int `toml:"concurrency"`
|
||||
APIKeyEnv string `toml:"api_key_env"`
|
||||
SummaryModel string `toml:"summary_model"`
|
||||
EmbedModel string `toml:"embed_model"`
|
||||
EmbedDimensions int `toml:"embed_dimensions"`
|
||||
BatchSize int `toml:"batch_size"`
|
||||
Concurrency int `toml:"concurrency"`
|
||||
}
|
||||
|
||||
type TUIConfig struct {
|
||||
@ -62,11 +63,12 @@ func Default() Config {
|
||||
TokenEnv: DefaultTokenEnv,
|
||||
},
|
||||
OpenAI: OpenAIConfig{
|
||||
APIKeyEnv: DefaultOpenAIEnv,
|
||||
SummaryModel: "gpt-5.4",
|
||||
EmbedModel: "text-embedding-3-small",
|
||||
BatchSize: 64,
|
||||
Concurrency: 2,
|
||||
APIKeyEnv: DefaultOpenAIEnv,
|
||||
SummaryModel: "gpt-5.4",
|
||||
EmbedModel: "text-embedding-3-small",
|
||||
EmbedDimensions: 1024,
|
||||
BatchSize: 64,
|
||||
Concurrency: 2,
|
||||
},
|
||||
TUI: TUIConfig{
|
||||
DefaultSort: "recent",
|
||||
@ -154,6 +156,9 @@ func (c *Config) Normalize() error {
|
||||
if c.OpenAI.EmbedModel == "" {
|
||||
c.OpenAI.EmbedModel = envOrDefault("GITCRAWL_EMBED_MODEL", def.OpenAI.EmbedModel)
|
||||
}
|
||||
if c.OpenAI.EmbedDimensions <= 0 {
|
||||
c.OpenAI.EmbedDimensions = def.OpenAI.EmbedDimensions
|
||||
}
|
||||
if c.OpenAI.BatchSize <= 0 {
|
||||
c.OpenAI.BatchSize = def.OpenAI.BatchSize
|
||||
}
|
||||
|
||||
@ -20,17 +20,20 @@ type Client struct {
|
||||
apiKey string
|
||||
baseURL string
|
||||
httpClient *http.Client
|
||||
dimensions int
|
||||
}
|
||||
|
||||
type Options struct {
|
||||
APIKey string
|
||||
BaseURL string
|
||||
Dimensions int
|
||||
HTTPClient *http.Client
|
||||
}
|
||||
|
||||
type embeddingRequest struct {
|
||||
Model string `json:"model"`
|
||||
Input []string `json:"input"`
|
||||
Model string `json:"model"`
|
||||
Input []string `json:"input"`
|
||||
Dimensions int `json:"dimensions,omitempty"`
|
||||
}
|
||||
|
||||
type embeddingResponse struct {
|
||||
@ -57,6 +60,7 @@ func New(options Options) *Client {
|
||||
apiKey: strings.TrimSpace(options.APIKey),
|
||||
baseURL: baseURL,
|
||||
httpClient: httpClient,
|
||||
dimensions: options.Dimensions,
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +75,7 @@ func (c *Client) Embed(ctx context.Context, model string, texts []string) ([][]f
|
||||
if c.apiKey == "" {
|
||||
return nil, fmt.Errorf("OpenAI API key is required")
|
||||
}
|
||||
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts})
|
||||
payload, err := json.Marshal(embeddingRequest{Model: model, Input: texts, Dimensions: c.dimensions})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal embeddings request: %w", err)
|
||||
}
|
||||
|
||||
@ -14,6 +14,9 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
|
||||
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
||||
t.Fatalf("decode request: %v", err)
|
||||
}
|
||||
if request.Dimensions != 1024 {
|
||||
t.Fatalf("dimensions = %d, want 1024", request.Dimensions)
|
||||
}
|
||||
response := embeddingResponse{}
|
||||
for index := range request.Input {
|
||||
vector := make([]float64, 1536)
|
||||
@ -36,7 +39,7 @@ func TestEmbedAcceptsLargeBatchResponse(t *testing.T) {
|
||||
for index := range inputs {
|
||||
inputs[index] = "thread text"
|
||||
}
|
||||
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL}).Embed(context.Background(), "text-embedding-3-small", inputs)
|
||||
vectors, err := New(Options{APIKey: "test", BaseURL: server.URL, Dimensions: 1024}).Embed(context.Background(), "text-embedding-3-small", inputs)
|
||||
if err != nil {
|
||||
t.Fatalf("embed: %v", err)
|
||||
}
|
||||
|
||||
@ -48,10 +48,19 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
args = append(args, options.Limit)
|
||||
}
|
||||
rows, err := s.q().QueryContext(ctx, `
|
||||
select t.id, t.number, t.kind, t.title, coalesce(d.body, ''), coalesce(d.raw_text, ''), coalesce(d.dedupe_text, ''),
|
||||
select t.id, t.number, t.kind, t.title, coalesce(d.body, t.body, ''), coalesce(d.raw_text, t.body, ''), coalesce(d.dedupe_text, t.title || ' ' || coalesce(t.body, '')),
|
||||
coalesce((
|
||||
select tks.key_text
|
||||
from thread_key_summaries tks
|
||||
join thread_revisions tr on tr.id = tks.thread_revision_id
|
||||
where tr.thread_id = t.id
|
||||
and tks.summary_kind in ('llm_key_summary', 'llm_key_3line')
|
||||
order by tks.created_at desc, tr.created_at desc, tks.id desc
|
||||
limit 1
|
||||
), ''),
|
||||
coalesce(tv.content_hash, '')
|
||||
from threads t
|
||||
join documents d on d.thread_id = t.id
|
||||
left join documents d on d.thread_id = t.id
|
||||
left join thread_vectors tv on tv.thread_id = t.id and tv.basis = ? and tv.model = ?
|
||||
where `+strings.Join(where, " and ")+`
|
||||
order by coalesce(t.updated_at_gh, t.updated_at) desc, t.number desc`+limitSQL,
|
||||
@ -64,14 +73,17 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
out := make([]EmbeddingTask, 0)
|
||||
for rows.Next() {
|
||||
var task EmbeddingTask
|
||||
var body, rawText, dedupeText, existingHash string
|
||||
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &existingHash); err != nil {
|
||||
var body, rawText, dedupeText, keySummary, existingHash string
|
||||
if err := rows.Scan(&task.ThreadID, &task.Number, &task.Kind, &task.Title, &body, &rawText, &dedupeText, &keySummary, &existingHash); err != nil {
|
||||
return nil, fmt.Errorf("scan embedding task: %w", err)
|
||||
}
|
||||
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText)
|
||||
text, err := embeddingTextForBasis(basis, task.Title, body, rawText, dedupeText, keySummary)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if strings.TrimSpace(text) == "" {
|
||||
continue
|
||||
}
|
||||
task.Text = text
|
||||
task.ContentHash = embeddingContentHash(basis, model, text)
|
||||
if !options.Force && existingHash == task.ContentHash {
|
||||
@ -85,7 +97,7 @@ func (s *Store) ListEmbeddingTasks(ctx context.Context, options EmbeddingTaskOpt
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (string, error) {
|
||||
func embeddingTextForBasis(basis, title, body, rawText, dedupeText, keySummary string) (string, error) {
|
||||
switch basis {
|
||||
case "", "title_original":
|
||||
parts := []string{strings.TrimSpace(title)}
|
||||
@ -97,6 +109,12 @@ func embeddingTextForBasis(basis, title, body, rawText, dedupeText string) (stri
|
||||
return strings.TrimSpace(strings.Join(parts, "\n\n")), nil
|
||||
case "dedupe_text":
|
||||
return strings.TrimSpace(dedupeText), nil
|
||||
case "llm_key_summary":
|
||||
keySummary = strings.TrimSpace(keySummary)
|
||||
if keySummary == "" {
|
||||
return "", nil
|
||||
}
|
||||
return strings.TrimSpace("title: " + strings.TrimSpace(title) + "\n\nkey_summary:\n" + keySummary), nil
|
||||
default:
|
||||
return "", fmt.Errorf("embedding basis %q is not supported yet", basis)
|
||||
}
|
||||
|
||||
69
internal/store/embedding_tasks_test.go
Normal file
69
internal/store/embedding_tasks_test.go
Normal file
@ -0,0 +1,69 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestListEmbeddingTasksUsesLatestLLMKeySummary(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
repoID, err := st.UpsertRepository(ctx, Repository{
|
||||
Owner: "openclaw",
|
||||
Name: "gitcrawl",
|
||||
FullName: "openclaw/gitcrawl",
|
||||
RawJSON: "{}",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("repo: %v", err)
|
||||
}
|
||||
threadID, err := st.UpsertThread(ctx, Thread{
|
||||
RepoID: repoID,
|
||||
GitHubID: "1",
|
||||
Number: 7,
|
||||
Kind: "issue",
|
||||
State: "open",
|
||||
Title: "Download stalls",
|
||||
Body: "Large download stalls near completion.",
|
||||
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
LabelsJSON: "[]",
|
||||
AssigneesJSON: "[]",
|
||||
RawJSON: "{}",
|
||||
ContentHash: "hash",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("thread: %v", err)
|
||||
}
|
||||
if _, err := st.DB().ExecContext(ctx, `
|
||||
insert into thread_revisions(id, thread_id, content_hash, title_hash, body_hash, labels_hash, created_at)
|
||||
values(1, ?, 'hash', 'title', 'body', 'labels', '2026-04-26T00:00:00Z');
|
||||
insert into thread_key_summaries(thread_revision_id, summary_kind, prompt_version, provider, model, input_hash, output_hash, key_text, created_at)
|
||||
values(1, 'llm_key_3line', 'v1', 'openai', 'gpt-5-mini', 'input', 'output', 'intent: fix downloads\nsurface: downloader\nmechanism: retry stalled stream', '2026-04-26T00:01:00Z');
|
||||
`, threadID); err != nil {
|
||||
t.Fatalf("seed summary: %v", err)
|
||||
}
|
||||
|
||||
tasks, err := st.ListEmbeddingTasks(ctx, EmbeddingTaskOptions{
|
||||
RepoID: repoID,
|
||||
Basis: "llm_key_summary",
|
||||
Model: "text-embedding-3-large",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("tasks: %v", err)
|
||||
}
|
||||
if len(tasks) != 1 {
|
||||
t.Fatalf("tasks = %d, want 1", len(tasks))
|
||||
}
|
||||
if !strings.Contains(tasks[0].Text, "title: Download stalls") || !strings.Contains(tasks[0].Text, "key_summary:") {
|
||||
t.Fatalf("unexpected embedding text: %q", tasks[0].Text)
|
||||
}
|
||||
}
|
||||
@ -214,6 +214,20 @@ create table if not exists thread_fingerprints (
|
||||
unique(thread_revision_id, algorithm_version)
|
||||
);
|
||||
|
||||
create table if not exists thread_key_summaries (
|
||||
id integer primary key,
|
||||
thread_revision_id integer not null references thread_revisions(id) on delete cascade,
|
||||
summary_kind text not null,
|
||||
prompt_version text not null,
|
||||
provider text not null,
|
||||
model text not null,
|
||||
input_hash text not null,
|
||||
output_hash text not null,
|
||||
key_text text not null,
|
||||
created_at text not null,
|
||||
unique(thread_revision_id, summary_kind, prompt_version, provider, model)
|
||||
);
|
||||
|
||||
create table if not exists sync_runs (
|
||||
id integer primary key,
|
||||
repo_id integer references repositories(id) on delete cascade,
|
||||
|
||||
@ -77,6 +77,59 @@ func (s *Store) UpsertThread(ctx context.Context, thread Thread) (int64, error)
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Store) MarkOpenThreadClosedFromGitHub(ctx context.Context, thread Thread) (bool, error) {
|
||||
if thread.RepoID <= 0 {
|
||||
return false, fmt.Errorf("repo id must be positive")
|
||||
}
|
||||
if thread.Number <= 0 {
|
||||
return false, fmt.Errorf("thread number must be positive")
|
||||
}
|
||||
if thread.Kind == "" {
|
||||
return false, fmt.Errorf("thread kind is required")
|
||||
}
|
||||
if thread.State == "" {
|
||||
thread.State = "closed"
|
||||
}
|
||||
result, err := s.q().ExecContext(ctx, `
|
||||
update threads
|
||||
set github_id = ?,
|
||||
state = ?,
|
||||
title = ?,
|
||||
body = ?,
|
||||
author_login = ?,
|
||||
author_type = ?,
|
||||
html_url = ?,
|
||||
labels_json = ?,
|
||||
assignees_json = ?,
|
||||
raw_json = ?,
|
||||
content_hash = ?,
|
||||
is_draft = ?,
|
||||
created_at_gh = ?,
|
||||
updated_at_gh = ?,
|
||||
closed_at_gh = ?,
|
||||
merged_at_gh = ?,
|
||||
last_pulled_at = ?,
|
||||
updated_at = ?
|
||||
where repo_id = ?
|
||||
and kind = ?
|
||||
and number = ?
|
||||
and state = 'open'
|
||||
and closed_at_local is null
|
||||
`, thread.GitHubID, thread.State, thread.Title, nullString(thread.Body), nullString(thread.AuthorLogin),
|
||||
nullString(thread.AuthorType), thread.HTMLURL, thread.LabelsJSON, thread.AssigneesJSON, thread.RawJSON,
|
||||
thread.ContentHash, boolInt(thread.IsDraft), nullString(thread.CreatedAtGitHub), nullString(thread.UpdatedAtGitHub),
|
||||
nullString(thread.ClosedAtGitHub), nullString(thread.MergedAtGitHub), nullString(thread.LastPulledAt), thread.UpdatedAt,
|
||||
thread.RepoID, thread.Kind, thread.Number)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("mark open thread closed from github: %w", err)
|
||||
}
|
||||
affected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return affected > 0, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListThreads(ctx context.Context, repoID int64, includeClosed bool) ([]Thread, error) {
|
||||
return s.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repoID, IncludeClosed: includeClosed})
|
||||
}
|
||||
|
||||
@ -45,6 +45,7 @@ type Stats struct {
|
||||
IssuesSynced int `json:"issues_synced"`
|
||||
PullRequestsSynced int `json:"pull_requests_synced"`
|
||||
CommentsSynced int `json:"comments_synced"`
|
||||
ThreadsClosed int `json:"threads_closed"`
|
||||
RequestedSince string `json:"requested_since,omitempty"`
|
||||
Limit int `json:"limit,omitempty"`
|
||||
MetadataOnly bool `json:"metadata_only"`
|
||||
@ -129,11 +130,18 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
|
||||
stats.IssuesSynced++
|
||||
}
|
||||
}
|
||||
if state == "open" && since != "" && options.Limit <= 0 {
|
||||
closed, err := s.applyClosedOverlapSweep(ctx, st, repoID, options, since)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stats.ThreadsClosed = closed
|
||||
}
|
||||
stats.FinishedAt = s.now().Format(time.RFC3339Nano)
|
||||
if _, err := st.RecordRun(ctx, store.RunRecord{
|
||||
RepoID: repoID,
|
||||
Kind: "sync",
|
||||
Scope: "open",
|
||||
Scope: state,
|
||||
Status: "success",
|
||||
StartedAt: stats.StartedAt,
|
||||
FinishedAt: stats.FinishedAt,
|
||||
@ -158,7 +166,7 @@ func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
|
||||
func normalizeState(value string) (string, error) {
|
||||
value = strings.TrimSpace(strings.ToLower(value))
|
||||
if value == "" {
|
||||
return "all", nil
|
||||
return "open", nil
|
||||
}
|
||||
switch value {
|
||||
case "open", "closed", "all":
|
||||
@ -168,6 +176,31 @@ func normalizeState(value string) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Syncer) applyClosedOverlapSweep(ctx context.Context, st *store.Store, repoID int64, options Options, since string) (int, error) {
|
||||
rows, err := s.client.ListRepositoryIssues(ctx, options.Owner, options.Repo, gh.ListIssuesOptions{
|
||||
State: "closed",
|
||||
Since: since,
|
||||
}, options.Reporter)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
closed := 0
|
||||
for _, row := range rows {
|
||||
thread := mapIssueToThread(repoID, row, s.now().Format(time.RFC3339Nano))
|
||||
updated, err := st.MarkOpenThreadClosedFromGitHub(ctx, thread)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if updated {
|
||||
closed++
|
||||
}
|
||||
}
|
||||
if closed > 0 {
|
||||
options.Reporter.Printf("[sync] closed overlap sweep matched %d stale open thread(s)", closed)
|
||||
}
|
||||
return closed, nil
|
||||
}
|
||||
|
||||
func normalizeSince(value string, now time.Time) (string, error) {
|
||||
value = strings.TrimSpace(value)
|
||||
if value == "" {
|
||||
|
||||
@ -17,6 +17,9 @@ func (fakeGitHub) GetRepo(ctx context.Context, owner, repo string, reporter gh.R
|
||||
}
|
||||
|
||||
func (fakeGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
|
||||
if options.State == "closed" {
|
||||
return nil, nil
|
||||
}
|
||||
return []map[string]any{
|
||||
{
|
||||
"id": 1,
|
||||
@ -89,6 +92,30 @@ func (f *stateCaptureGitHub) ListRepositoryIssues(ctx context.Context, owner, re
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type closedSweepGitHub struct {
|
||||
fakeGitHub
|
||||
}
|
||||
|
||||
func (closedSweepGitHub) ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error) {
|
||||
if options.State == "closed" {
|
||||
return []map[string]any{{
|
||||
"id": 1,
|
||||
"number": 7,
|
||||
"state": "closed",
|
||||
"title": "download stalls",
|
||||
"body": "large file download stalls",
|
||||
"html_url": "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
"created_at": "2026-04-26T00:00:00Z",
|
||||
"updated_at": "2026-04-27T00:00:00Z",
|
||||
"closed_at": "2026-04-27T00:00:00Z",
|
||||
"labels": []map[string]any{{"name": "bug"}},
|
||||
"assignees": []map[string]any{},
|
||||
"user": map[string]any{"login": "vincentkoc", "type": "User"},
|
||||
}}, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestSyncPersistsIssuesAndPullRequests(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
@ -189,7 +216,7 @@ func TestSyncPassesRequestedState(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncDefaultsToAllStates(t *testing.T) {
|
||||
func TestSyncDefaultsToOpenState(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
@ -202,8 +229,8 @@ func TestSyncDefaultsToAllStates(t *testing.T) {
|
||||
if _, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl"}); err != nil {
|
||||
t.Fatalf("sync: %v", err)
|
||||
}
|
||||
if client.state != "all" {
|
||||
t.Fatalf("default state = %q, want all", client.state)
|
||||
if client.state != "open" {
|
||||
t.Fatalf("default state = %q, want open", client.state)
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,3 +247,61 @@ func TestSyncRejectsInvalidState(t *testing.T) {
|
||||
t.Fatal("expected invalid state to fail")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncOpenSinceAppliesClosedOverlapSweep(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st, err := store.Open(ctx, filepath.Join(t.TempDir(), "gitcrawl.db"))
|
||||
if err != nil {
|
||||
t.Fatalf("open store: %v", err)
|
||||
}
|
||||
defer st.Close()
|
||||
|
||||
repoID, err := st.UpsertRepository(ctx, store.Repository{
|
||||
Owner: "openclaw",
|
||||
Name: "gitcrawl",
|
||||
FullName: "openclaw/gitcrawl",
|
||||
RawJSON: "{}",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("seed repo: %v", err)
|
||||
}
|
||||
if _, err := st.UpsertThread(ctx, store.Thread{
|
||||
RepoID: repoID,
|
||||
GitHubID: "1",
|
||||
Number: 7,
|
||||
Kind: "issue",
|
||||
State: "open",
|
||||
Title: "download stalls",
|
||||
Body: "large file download stalls",
|
||||
HTMLURL: "https://github.com/openclaw/gitcrawl/issues/7",
|
||||
LabelsJSON: "[]",
|
||||
AssigneesJSON: "[]",
|
||||
RawJSON: "{}",
|
||||
ContentHash: "old",
|
||||
CreatedAtGitHub: "2026-04-26T00:00:00Z",
|
||||
UpdatedAtGitHub: "2026-04-26T00:00:00Z",
|
||||
FirstPulledAt: "2026-04-26T00:00:00Z",
|
||||
LastPulledAt: "2026-04-26T00:00:00Z",
|
||||
UpdatedAt: "2026-04-26T00:00:00Z",
|
||||
}); err != nil {
|
||||
t.Fatalf("seed thread: %v", err)
|
||||
}
|
||||
|
||||
s := New(closedSweepGitHub{}, st)
|
||||
s.now = func() time.Time { return time.Date(2026, 4, 27, 1, 0, 0, 0, time.UTC) }
|
||||
stats, err := s.Sync(ctx, Options{Owner: "openclaw", Repo: "gitcrawl", Since: "1h"})
|
||||
if err != nil {
|
||||
t.Fatalf("sync: %v", err)
|
||||
}
|
||||
if stats.ThreadsClosed != 1 {
|
||||
t.Fatalf("threads closed = %d, want 1", stats.ThreadsClosed)
|
||||
}
|
||||
threads, err := st.ListThreads(ctx, repoID, true)
|
||||
if err != nil {
|
||||
t.Fatalf("threads: %v", err)
|
||||
}
|
||||
if len(threads) != 1 || threads[0].State != "closed" || threads[0].ClosedAtGitHub == "" {
|
||||
t.Fatalf("thread not closed from overlap sweep: %#v", threads)
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user