522 lines
14 KiB
Go
522 lines
14 KiB
Go
package syncer
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/openclaw/gitcrawl/internal/documents"
|
|
gh "github.com/openclaw/gitcrawl/internal/github"
|
|
"github.com/openclaw/gitcrawl/internal/store"
|
|
"github.com/vincentkoc/crawlkit/progress"
|
|
)
|
|
|
|
type GitHubClient interface {
|
|
GetRepo(ctx context.Context, owner, repo string, reporter gh.Reporter) (map[string]any, error)
|
|
GetIssue(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) (map[string]any, error)
|
|
GetPull(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) (map[string]any, error)
|
|
ListRepositoryIssues(ctx context.Context, owner, repo string, options gh.ListIssuesOptions, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListIssueComments(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListPullReviews(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListPullReviewComments(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListPullFiles(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListPullCommits(ctx context.Context, owner, repo string, number int, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListCommitCheckRuns(ctx context.Context, owner, repo, ref string, reporter gh.Reporter) ([]map[string]any, error)
|
|
ListWorkflowRuns(ctx context.Context, owner, repo string, options gh.ListWorkflowRunsOptions, reporter gh.Reporter) ([]map[string]any, error)
|
|
}
|
|
|
|
type Syncer struct {
|
|
client GitHubClient
|
|
store *store.Store
|
|
now func() time.Time
|
|
}
|
|
|
|
type Options struct {
|
|
Owner string
|
|
Repo string
|
|
State string
|
|
Since string
|
|
Limit int
|
|
Numbers []int
|
|
IncludeComments bool
|
|
IncludePRDetails bool
|
|
Reporter gh.Reporter
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
type Stats struct {
|
|
Repository string `json:"repository"`
|
|
ThreadsSynced int `json:"threads_synced"`
|
|
IssuesSynced int `json:"issues_synced"`
|
|
PullRequestsSynced int `json:"pull_requests_synced"`
|
|
CommentsSynced int `json:"comments_synced"`
|
|
PRDetailsSynced int `json:"pr_details_synced"`
|
|
PRFilesSynced int `json:"pr_files_synced"`
|
|
PRCommitsSynced int `json:"pr_commits_synced"`
|
|
PRChecksSynced int `json:"pr_checks_synced"`
|
|
WorkflowRunsSynced int `json:"workflow_runs_synced"`
|
|
ThreadsClosed int `json:"threads_closed"`
|
|
RequestedSince string `json:"requested_since,omitempty"`
|
|
Limit int `json:"limit,omitempty"`
|
|
Numbers []int `json:"numbers,omitempty"`
|
|
MetadataOnly bool `json:"metadata_only"`
|
|
StartedAt string `json:"started_at"`
|
|
FinishedAt string `json:"finished_at"`
|
|
}
|
|
|
|
func New(client GitHubClient, st *store.Store) *Syncer {
|
|
return &Syncer{
|
|
client: client,
|
|
store: st,
|
|
now: func() time.Time { return time.Now().UTC() },
|
|
}
|
|
}
|
|
|
|
func (s *Syncer) Sync(ctx context.Context, options Options) (Stats, error) {
|
|
started := s.now().Format(time.RFC3339Nano)
|
|
since, err := normalizeSince(options.Since, s.now())
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
state, err := normalizeState(options.State)
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
repoRaw, err := s.client.GetRepo(ctx, options.Owner, options.Repo, options.Reporter)
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
repoID, err := s.store.UpsertRepository(ctx, store.Repository{
|
|
Owner: options.Owner,
|
|
Name: options.Repo,
|
|
FullName: options.Owner + "/" + options.Repo,
|
|
GitHubRepoID: jsonID(repoRaw["id"]),
|
|
RawJSON: mustJSON(repoRaw),
|
|
UpdatedAt: s.now().Format(time.RFC3339Nano),
|
|
})
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
|
|
numbers := uniquePositiveNumbers(options.Numbers)
|
|
rows := make([]map[string]any, 0, len(numbers))
|
|
if len(numbers) > 0 {
|
|
for _, number := range numbers {
|
|
row, err := s.client.GetIssue(ctx, options.Owner, options.Repo, number, options.Reporter)
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
rows = append(rows, row)
|
|
}
|
|
} else {
|
|
var err error
|
|
rows, err = s.client.ListRepositoryIssues(ctx, options.Owner, options.Repo, gh.ListIssuesOptions{
|
|
State: state,
|
|
Since: since,
|
|
Limit: options.Limit,
|
|
ExpectedTotal: expectedIssueTotal(repoRaw, state, since, options.Limit),
|
|
}, options.Reporter)
|
|
if err != nil {
|
|
return Stats{}, err
|
|
}
|
|
}
|
|
|
|
stats := Stats{
|
|
Repository: options.Owner + "/" + options.Repo,
|
|
RequestedSince: since,
|
|
Limit: options.Limit,
|
|
Numbers: numbers,
|
|
MetadataOnly: !options.IncludeComments,
|
|
StartedAt: started,
|
|
}
|
|
tracker := progress.New(options.Logger, progress.Options{
|
|
Name: "sync",
|
|
Unit: "threads",
|
|
Total: int64(len(rows)),
|
|
Attrs: []any{
|
|
"repository", stats.Repository,
|
|
"state", state,
|
|
},
|
|
})
|
|
persist := func(st *store.Store) error {
|
|
for _, row := range rows {
|
|
thread := mapIssueToThread(repoID, row, s.now().Format(time.RFC3339Nano))
|
|
threadID, err := st.UpsertThread(ctx, thread)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
thread.ID = threadID
|
|
var comments []store.Comment
|
|
if options.IncludeComments {
|
|
var err error
|
|
comments, err = s.syncComments(ctx, options, thread)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stats.CommentsSynced += len(comments)
|
|
}
|
|
if options.IncludePRDetails && thread.Kind == "pull_request" {
|
|
detailStats, err := s.syncPullRequestDetails(ctx, st, options, thread)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stats.PRDetailsSynced++
|
|
stats.PRFilesSynced += detailStats.files
|
|
stats.PRCommitsSynced += detailStats.commits
|
|
stats.PRChecksSynced += detailStats.checks
|
|
stats.WorkflowRunsSynced += detailStats.runs
|
|
}
|
|
if _, err := st.UpsertDocument(ctx, documents.BuildWithComments(thread, comments)); err != nil {
|
|
return err
|
|
}
|
|
stats.ThreadsSynced++
|
|
if thread.Kind == "pull_request" {
|
|
stats.PullRequestsSynced++
|
|
} else {
|
|
stats.IssuesSynced++
|
|
}
|
|
tracker.Add(1,
|
|
"number", thread.Number,
|
|
"kind", thread.Kind,
|
|
"thread_state", thread.State,
|
|
)
|
|
}
|
|
if len(numbers) == 0 && state == "open" && since != "" && options.Limit <= 0 {
|
|
closed, err := s.applyClosedOverlapSweep(ctx, st, repoID, options, since)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
stats.ThreadsClosed = closed
|
|
}
|
|
stats.FinishedAt = s.now().Format(time.RFC3339Nano)
|
|
if _, err := st.RecordRun(ctx, store.RunRecord{
|
|
RepoID: repoID,
|
|
Kind: "sync",
|
|
Scope: syncRunScope(state, numbers),
|
|
Status: "success",
|
|
StartedAt: stats.StartedAt,
|
|
FinishedAt: stats.FinishedAt,
|
|
StatsJSON: mustJSON(stats),
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
if !options.IncludeComments {
|
|
if err := s.store.WithTx(ctx, persist); err != nil {
|
|
tracker.Finish(err)
|
|
return Stats{}, err
|
|
}
|
|
tracker.Finish(nil)
|
|
return stats, nil
|
|
}
|
|
if err := persist(s.store); err != nil {
|
|
tracker.Finish(err)
|
|
return Stats{}, err
|
|
}
|
|
tracker.Finish(nil)
|
|
return stats, nil
|
|
}
|
|
|
|
func uniquePositiveNumbers(numbers []int) []int {
|
|
if len(numbers) == 0 {
|
|
return nil
|
|
}
|
|
seen := make(map[int]struct{}, len(numbers))
|
|
out := make([]int, 0, len(numbers))
|
|
for _, number := range numbers {
|
|
if number <= 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[number]; ok {
|
|
continue
|
|
}
|
|
seen[number] = struct{}{}
|
|
out = append(out, number)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func syncRunScope(state string, numbers []int) string {
|
|
if len(numbers) == 0 {
|
|
return state
|
|
}
|
|
parts := make([]string, 0, len(numbers))
|
|
for _, number := range numbers {
|
|
parts = append(parts, strconv.Itoa(number))
|
|
}
|
|
return "numbers:" + strings.Join(parts, ",")
|
|
}
|
|
|
|
func normalizeState(value string) (string, error) {
|
|
value = strings.TrimSpace(strings.ToLower(value))
|
|
if value == "" {
|
|
return "open", nil
|
|
}
|
|
switch value {
|
|
case "open", "closed", "all":
|
|
return value, nil
|
|
default:
|
|
return "", fmt.Errorf("invalid state %q: use open, closed, or all", value)
|
|
}
|
|
}
|
|
|
|
func (s *Syncer) applyClosedOverlapSweep(ctx context.Context, st *store.Store, repoID int64, options Options, since string) (int, error) {
|
|
rows, err := s.client.ListRepositoryIssues(ctx, options.Owner, options.Repo, gh.ListIssuesOptions{
|
|
State: "closed",
|
|
Since: since,
|
|
}, options.Reporter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
closed := 0
|
|
for _, row := range rows {
|
|
thread := mapIssueToThread(repoID, row, s.now().Format(time.RFC3339Nano))
|
|
updated, err := st.MarkOpenThreadClosedFromGitHub(ctx, thread)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if updated {
|
|
closed++
|
|
}
|
|
}
|
|
if closed > 0 {
|
|
options.Reporter.Printf("[sync] closed overlap sweep matched %d stale open thread(s)", closed)
|
|
}
|
|
return closed, nil
|
|
}
|
|
|
|
func normalizeSince(value string, now time.Time) (string, error) {
|
|
value = strings.TrimSpace(value)
|
|
if value == "" {
|
|
return "", nil
|
|
}
|
|
if parsed, err := time.Parse(time.RFC3339Nano, value); err == nil {
|
|
return parsed.UTC().Format(time.RFC3339Nano), nil
|
|
}
|
|
units := []struct {
|
|
suffix string
|
|
scale time.Duration
|
|
}{
|
|
{"mo", 30 * 24 * time.Hour},
|
|
{"w", 7 * 24 * time.Hour},
|
|
{"d", 24 * time.Hour},
|
|
{"h", time.Hour},
|
|
{"m", time.Minute},
|
|
{"s", time.Second},
|
|
}
|
|
for _, unit := range units {
|
|
if !strings.HasSuffix(value, unit.suffix) {
|
|
continue
|
|
}
|
|
raw := strings.TrimSuffix(value, unit.suffix)
|
|
amount, err := strconv.Atoi(raw)
|
|
if err != nil || amount <= 0 {
|
|
return "", fmt.Errorf("invalid --since %q: expected ISO timestamp or relative duration like 15m, 2h, 7d, 1mo", value)
|
|
}
|
|
return now.Add(-time.Duration(amount) * unit.scale).UTC().Format(time.RFC3339Nano), nil
|
|
}
|
|
return "", fmt.Errorf("invalid --since %q: expected ISO timestamp or relative duration like 15m, 2h, 7d, 1mo", value)
|
|
}
|
|
|
|
func mapIssueToThread(repoID int64, row map[string]any, pulledAt string) store.Thread {
|
|
kind := "issue"
|
|
if _, ok := row["pull_request"]; ok {
|
|
kind = "pull_request"
|
|
}
|
|
labelsJSON := mustJSON(row["labels"])
|
|
if labelsJSON == "null" {
|
|
labelsJSON = "[]"
|
|
}
|
|
assigneesJSON := mustJSON(row["assignees"])
|
|
if assigneesJSON == "null" {
|
|
assigneesJSON = "[]"
|
|
}
|
|
title := stringValue(row["title"])
|
|
body := stringValue(row["body"])
|
|
return store.Thread{
|
|
RepoID: repoID,
|
|
GitHubID: jsonID(row["id"]),
|
|
Number: intValue(row["number"]),
|
|
Kind: kind,
|
|
State: stringValue(row["state"]),
|
|
Title: title,
|
|
Body: body,
|
|
AuthorLogin: loginFromUser(row["user"]),
|
|
AuthorType: typeFromUser(row["user"]),
|
|
HTMLURL: stringValue(row["html_url"]),
|
|
LabelsJSON: labelsJSON,
|
|
AssigneesJSON: assigneesJSON,
|
|
RawJSON: mustJSON(row),
|
|
ContentHash: contentHash(title, body, labelsJSON),
|
|
CreatedAtGitHub: stringValue(row["created_at"]),
|
|
UpdatedAtGitHub: stringValue(row["updated_at"]),
|
|
ClosedAtGitHub: stringValue(row["closed_at"]),
|
|
FirstPulledAt: pulledAt,
|
|
LastPulledAt: pulledAt,
|
|
UpdatedAt: pulledAt,
|
|
}
|
|
}
|
|
|
|
func (s *Syncer) syncComments(ctx context.Context, options Options, thread store.Thread) ([]store.Comment, error) {
|
|
var rows []commentRow
|
|
issueComments, err := s.client.ListIssueComments(ctx, options.Owner, options.Repo, thread.Number, options.Reporter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, row := range issueComments {
|
|
rows = append(rows, commentRow{kind: "issue_comment", raw: row})
|
|
}
|
|
if thread.Kind == "pull_request" {
|
|
reviews, err := s.client.ListPullReviews(ctx, options.Owner, options.Repo, thread.Number, options.Reporter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, row := range reviews {
|
|
rows = append(rows, commentRow{kind: "pull_review", raw: row})
|
|
}
|
|
reviewComments, err := s.client.ListPullReviewComments(ctx, options.Owner, options.Repo, thread.Number, options.Reporter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, row := range reviewComments {
|
|
rows = append(rows, commentRow{kind: "pull_review_comment", raw: row})
|
|
}
|
|
}
|
|
var comments []store.Comment
|
|
for _, row := range rows {
|
|
comment := mapComment(thread.ID, row.kind, row.raw)
|
|
if comment.Body == "" {
|
|
continue
|
|
}
|
|
if _, err := s.store.UpsertComment(ctx, comment); err != nil {
|
|
return nil, err
|
|
}
|
|
comments = append(comments, comment)
|
|
}
|
|
return comments, nil
|
|
}
|
|
|
|
type commentRow struct {
|
|
kind string
|
|
raw map[string]any
|
|
}
|
|
|
|
func mapComment(threadID int64, kind string, row map[string]any) store.Comment {
|
|
authorLogin := loginFromUser(row["user"])
|
|
authorType := typeFromUser(row["user"])
|
|
return store.Comment{
|
|
ThreadID: threadID,
|
|
GitHubID: jsonID(row["id"]),
|
|
CommentType: kind,
|
|
AuthorLogin: authorLogin,
|
|
AuthorType: authorType,
|
|
Body: stringValue(row["body"]),
|
|
IsBot: isBot(authorLogin, authorType),
|
|
RawJSON: mustJSON(row),
|
|
CreatedAtGitHub: stringValue(row["created_at"]),
|
|
UpdatedAtGitHub: stringValue(row["updated_at"]),
|
|
}
|
|
}
|
|
|
|
func isBot(login, authorType string) bool {
|
|
return strings.EqualFold(authorType, "Bot") || strings.HasSuffix(strings.ToLower(login), "[bot]")
|
|
}
|
|
|
|
func loginFromUser(value any) string {
|
|
user, ok := value.(map[string]any)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return stringValue(user["login"])
|
|
}
|
|
|
|
func typeFromUser(value any) string {
|
|
user, ok := value.(map[string]any)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
return stringValue(user["type"])
|
|
}
|
|
|
|
func contentHash(values ...string) string {
|
|
hash := sha256.New()
|
|
for _, value := range values {
|
|
_, _ = hash.Write([]byte(value))
|
|
_, _ = hash.Write([]byte{0})
|
|
}
|
|
return hex.EncodeToString(hash.Sum(nil))
|
|
}
|
|
|
|
func mustJSON(value any) string {
|
|
data, err := json.Marshal(value)
|
|
if err != nil {
|
|
return "{}"
|
|
}
|
|
return string(data)
|
|
}
|
|
|
|
func jsonID(value any) string {
|
|
switch typed := value.(type) {
|
|
case string:
|
|
return typed
|
|
case float64:
|
|
return strconv.FormatInt(int64(typed), 10)
|
|
case int:
|
|
return strconv.Itoa(typed)
|
|
case int64:
|
|
return strconv.FormatInt(typed, 10)
|
|
case json.Number:
|
|
return typed.String()
|
|
default:
|
|
return ""
|
|
}
|
|
}
|
|
|
|
func expectedIssueTotal(repoRaw map[string]any, state, since string, limit int) int {
|
|
if state != "open" || since != "" {
|
|
return 0
|
|
}
|
|
count := intValue(repoRaw["open_issues_count"])
|
|
if count <= 0 {
|
|
return 0
|
|
}
|
|
if limit > 0 && limit < count {
|
|
return limit
|
|
}
|
|
return count
|
|
}
|
|
|
|
func intValue(value any) int {
|
|
switch typed := value.(type) {
|
|
case float64:
|
|
return int(typed)
|
|
case int:
|
|
return typed
|
|
case int64:
|
|
return int(typed)
|
|
case json.Number:
|
|
parsed, _ := strconv.Atoi(typed.String())
|
|
return parsed
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
func stringValue(value any) string {
|
|
switch typed := value.(type) {
|
|
case string:
|
|
return typed
|
|
case fmt.Stringer:
|
|
return typed.String()
|
|
default:
|
|
return ""
|
|
}
|
|
}
|