gitcrawl/internal/store/threads.go
Vincent Koc a418ffaca6
fix(cluster): align sync and clustering parity
## Summary
- align gitcrawl sync, portable read support, embeddings, durable clusters, and TUI cluster display with ghcrawl behavior
- add clearer TUI cluster state text for open vs closed rows

## Validation
- go test ./...
- GitHub CI run 25082322968 passed on ubuntu-latest and macos-latest
2026-04-28 16:09:20 -07:00

329 lines
10 KiB
Go

package store
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
)
type Thread struct {
ID int64 `json:"id"`
RepoID int64 `json:"repo_id"`
GitHubID string `json:"github_id"`
Number int `json:"number"`
Kind string `json:"kind"`
State string `json:"state"`
Title string `json:"title"`
Body string `json:"body,omitempty"`
AuthorLogin string `json:"author_login,omitempty"`
AuthorType string `json:"author_type,omitempty"`
HTMLURL string `json:"html_url"`
LabelsJSON string `json:"labels_json"`
AssigneesJSON string `json:"assignees_json"`
RawJSON string `json:"-"`
ContentHash string `json:"content_hash"`
IsDraft bool `json:"is_draft"`
CreatedAtGitHub string `json:"created_at_gh,omitempty"`
UpdatedAtGitHub string `json:"updated_at_gh,omitempty"`
ClosedAtGitHub string `json:"closed_at_gh,omitempty"`
MergedAtGitHub string `json:"merged_at_gh,omitempty"`
FirstPulledAt string `json:"first_pulled_at,omitempty"`
LastPulledAt string `json:"last_pulled_at,omitempty"`
UpdatedAt string `json:"updated_at"`
ClosedAtLocal string `json:"closed_at_local,omitempty"`
CloseReasonLocal string `json:"close_reason_local,omitempty"`
}
func (s *Store) UpsertThread(ctx context.Context, thread Thread) (int64, error) {
var id int64
err := s.q().QueryRowContext(ctx, `
insert into threads(
repo_id, github_id, number, kind, state, title, body, author_login, author_type, html_url,
labels_json, assignees_json, raw_json, content_hash, is_draft,
created_at_gh, updated_at_gh, closed_at_gh, merged_at_gh,
first_pulled_at, last_pulled_at, updated_at
)
values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
on conflict(repo_id, kind, number) do update set
github_id=excluded.github_id,
state=excluded.state,
title=excluded.title,
body=excluded.body,
author_login=excluded.author_login,
author_type=excluded.author_type,
html_url=excluded.html_url,
labels_json=excluded.labels_json,
assignees_json=excluded.assignees_json,
raw_json=excluded.raw_json,
content_hash=excluded.content_hash,
is_draft=excluded.is_draft,
created_at_gh=excluded.created_at_gh,
updated_at_gh=excluded.updated_at_gh,
closed_at_gh=excluded.closed_at_gh,
merged_at_gh=excluded.merged_at_gh,
last_pulled_at=excluded.last_pulled_at,
updated_at=excluded.updated_at
returning id
`, thread.RepoID, thread.GitHubID, thread.Number, thread.Kind, thread.State, thread.Title, nullString(thread.Body),
nullString(thread.AuthorLogin), nullString(thread.AuthorType), thread.HTMLURL, thread.LabelsJSON, thread.AssigneesJSON,
thread.RawJSON, thread.ContentHash, boolInt(thread.IsDraft), nullString(thread.CreatedAtGitHub), nullString(thread.UpdatedAtGitHub),
nullString(thread.ClosedAtGitHub), nullString(thread.MergedAtGitHub), nullString(thread.FirstPulledAt), nullString(thread.LastPulledAt),
thread.UpdatedAt).Scan(&id)
if err != nil {
return 0, fmt.Errorf("upsert thread: %w", err)
}
return id, nil
}
func (s *Store) MarkOpenThreadClosedFromGitHub(ctx context.Context, thread Thread) (bool, error) {
if thread.RepoID <= 0 {
return false, fmt.Errorf("repo id must be positive")
}
if thread.Number <= 0 {
return false, fmt.Errorf("thread number must be positive")
}
if thread.Kind == "" {
return false, fmt.Errorf("thread kind is required")
}
if thread.State == "" {
thread.State = "closed"
}
result, err := s.q().ExecContext(ctx, `
update threads
set github_id = ?,
state = ?,
title = ?,
body = ?,
author_login = ?,
author_type = ?,
html_url = ?,
labels_json = ?,
assignees_json = ?,
raw_json = ?,
content_hash = ?,
is_draft = ?,
created_at_gh = ?,
updated_at_gh = ?,
closed_at_gh = ?,
merged_at_gh = ?,
last_pulled_at = ?,
updated_at = ?
where repo_id = ?
and kind = ?
and number = ?
and state = 'open'
and closed_at_local is null
`, thread.GitHubID, thread.State, thread.Title, nullString(thread.Body), nullString(thread.AuthorLogin),
nullString(thread.AuthorType), thread.HTMLURL, thread.LabelsJSON, thread.AssigneesJSON, thread.RawJSON,
thread.ContentHash, boolInt(thread.IsDraft), nullString(thread.CreatedAtGitHub), nullString(thread.UpdatedAtGitHub),
nullString(thread.ClosedAtGitHub), nullString(thread.MergedAtGitHub), nullString(thread.LastPulledAt), thread.UpdatedAt,
thread.RepoID, thread.Kind, thread.Number)
if err != nil {
return false, fmt.Errorf("mark open thread closed from github: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return false, nil
}
return affected > 0, nil
}
func (s *Store) ListThreads(ctx context.Context, repoID int64, includeClosed bool) ([]Thread, error) {
return s.ListThreadsFiltered(ctx, ThreadListOptions{RepoID: repoID, IncludeClosed: includeClosed})
}
type ThreadListOptions struct {
RepoID int64
IncludeClosed bool
Numbers []int
Limit int
}
func (s *Store) ListThreadsFiltered(ctx context.Context, options ThreadListOptions) ([]Thread, error) {
where := `repo_id = ?`
args := []any{options.RepoID}
if !options.IncludeClosed {
where += ` and closed_at_local is null`
}
if len(options.Numbers) > 0 {
placeholders := make([]string, 0, len(options.Numbers))
for _, number := range options.Numbers {
placeholders = append(placeholders, "?")
args = append(args, number)
}
where += ` and number in (` + strings.Join(placeholders, ",") + `)`
}
limitSQL := ``
if options.Limit > 0 {
limitSQL = ` limit ?`
args = append(args, options.Limit)
}
rows, err := s.q().QueryContext(ctx, `
select `+s.threadSelectColumns(ctx, "")+`
from threads
where `+where+`
order by number`+limitSQL, args...)
if err != nil {
return nil, fmt.Errorf("list threads: %w", err)
}
defer rows.Close()
var out []Thread
for rows.Next() {
thread, err := scanThread(rows)
if err != nil {
return nil, err
}
out = append(out, thread)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("iterate threads: %w", err)
}
return out, nil
}
func (s *Store) CloseThreadLocally(ctx context.Context, repoID int64, number int, reason string) error {
if repoID <= 0 {
return fmt.Errorf("repo id must be positive")
}
if number <= 0 {
return fmt.Errorf("thread number must be positive")
}
reason = strings.TrimSpace(reason)
if reason == "" {
reason = "local close"
}
closedAt := time.Now().UTC().Format(timeLayout)
result, err := s.q().ExecContext(ctx, `
update threads
set closed_at_local = ?, close_reason_local = ?, updated_at = ?
where repo_id = ? and number = ?
`, closedAt, reason, closedAt, repoID, number)
if err != nil {
return fmt.Errorf("close thread locally: %w", err)
}
if affected, err := result.RowsAffected(); err == nil && affected == 0 {
return fmt.Errorf("thread #%d was not found", number)
}
return nil
}
func (s *Store) ReopenThreadLocally(ctx context.Context, repoID int64, number int) error {
if repoID <= 0 {
return fmt.Errorf("repo id must be positive")
}
if number <= 0 {
return fmt.Errorf("thread number must be positive")
}
updatedAt := time.Now().UTC().Format(timeLayout)
result, err := s.q().ExecContext(ctx, `
update threads
set closed_at_local = null, close_reason_local = null, updated_at = ?
where repo_id = ? and number = ?
`, updatedAt, repoID, number)
if err != nil {
return fmt.Errorf("reopen thread locally: %w", err)
}
if affected, err := result.RowsAffected(); err == nil && affected == 0 {
return fmt.Errorf("thread #%d was not found", number)
}
return nil
}
func scanThread(rows interface {
Scan(dest ...any) error
}) (Thread, error) {
var thread Thread
var body, authorLogin, authorType, rawJSON, createdAt, updatedAtGH, closedAt, mergedAt, firstPulled, lastPulled, closedLocal, closeReason sql.NullString
var isDraft int
if err := rows.Scan(&thread.ID, &thread.RepoID, &thread.GitHubID, &thread.Number, &thread.Kind, &thread.State, &thread.Title,
&body, &authorLogin, &authorType, &thread.HTMLURL, &thread.LabelsJSON, &thread.AssigneesJSON, &rawJSON,
&thread.ContentHash, &isDraft, &createdAt, &updatedAtGH, &closedAt, &mergedAt, &firstPulled, &lastPulled, &thread.UpdatedAt,
&closedLocal, &closeReason); err != nil {
return Thread{}, fmt.Errorf("scan thread: %w", err)
}
thread.Body = body.String
thread.AuthorLogin = authorLogin.String
thread.AuthorType = authorType.String
thread.CreatedAtGitHub = createdAt.String
thread.UpdatedAtGitHub = updatedAtGH.String
thread.ClosedAtGitHub = closedAt.String
thread.MergedAtGitHub = mergedAt.String
thread.FirstPulledAt = firstPulled.String
thread.LastPulledAt = lastPulled.String
thread.ClosedAtLocal = closedLocal.String
thread.CloseReasonLocal = closeReason.String
thread.RawJSON = rawJSON.String
thread.IsDraft = isDraft != 0
return thread, nil
}
func (s *Store) threadSelectColumns(ctx context.Context, alias string) string {
column := func(name string) string {
if alias == "" {
return name
}
return alias + "." + name
}
return strings.Join([]string{
column("id"),
column("repo_id"),
column("github_id"),
column("number"),
column("kind"),
column("state"),
column("title"),
s.threadBodyExpr(ctx, alias),
column("author_login"),
column("author_type"),
column("html_url"),
column("labels_json"),
column("assignees_json"),
s.threadRawJSONExpr(ctx, alias),
column("content_hash"),
column("is_draft"),
column("created_at_gh"),
column("updated_at_gh"),
column("closed_at_gh"),
column("merged_at_gh"),
column("first_pulled_at"),
column("last_pulled_at"),
column("updated_at"),
column("closed_at_local"),
column("close_reason_local"),
}, ", ")
}
func (s *Store) threadBodyExpr(ctx context.Context, alias string) string {
if s.hasColumn(ctx, "threads", "body") {
return qualifiedColumn(alias, "body")
}
if s.hasColumn(ctx, "threads", "body_excerpt") {
return qualifiedColumn(alias, "body_excerpt")
}
return "''"
}
func (s *Store) threadRawJSONExpr(ctx context.Context, alias string) string {
if s.hasColumn(ctx, "threads", "raw_json") {
return qualifiedColumn(alias, "raw_json")
}
return "''"
}
func qualifiedColumn(alias, name string) string {
if alias == "" {
return name
}
return alias + "." + name
}
func boolInt(value bool) int {
if value {
return 1
}
return 0
}