perf(backup): parallelize drive collaboration scan
This commit is contained in:
parent
a4f8bed267
commit
198407c252
@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"google.golang.org/api/drive/v3"
|
||||
gapi "google.golang.org/api/googleapi"
|
||||
@ -36,37 +37,57 @@ type driveBackupRevision struct {
|
||||
|
||||
func fetchBackupDriveCollaboration(ctx context.Context, svc *drive.Service, files []driveBackupFile) (driveBackupCollaboration, map[string]int) {
|
||||
var out driveBackupCollaboration
|
||||
jobs := make(chan string)
|
||||
results := make(chan driveBackupCollaboration, len(files))
|
||||
workers := 8
|
||||
if len(files) < workers {
|
||||
workers = len(files)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
for range workers {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for fileID := range jobs {
|
||||
results <- fetchBackupDriveFileCollaboration(ctx, svc, fileID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
go func() {
|
||||
defer close(jobs)
|
||||
for _, row := range files {
|
||||
if row.File == nil || strings.TrimSpace(row.File.Id) == "" {
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case jobs <- row.File.Id:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(results)
|
||||
}()
|
||||
errors := 0
|
||||
for _, row := range files {
|
||||
if row.File == nil || strings.TrimSpace(row.File.Id) == "" {
|
||||
continue
|
||||
}
|
||||
fileID := row.File.Id
|
||||
permissions, err := fetchBackupDrivePermissions(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
errors++
|
||||
out.Permissions = append(out.Permissions, driveBackupPermission{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, permission := range permissions {
|
||||
out.Permissions = append(out.Permissions, driveBackupPermission{FileID: fileID, Permission: permission})
|
||||
for result := range results {
|
||||
out.Permissions = append(out.Permissions, result.Permissions...)
|
||||
out.Comments = append(out.Comments, result.Comments...)
|
||||
out.Revisions = append(out.Revisions, result.Revisions...)
|
||||
for _, permission := range result.Permissions {
|
||||
if permission.Error != "" {
|
||||
errors++
|
||||
}
|
||||
}
|
||||
comments, err := fetchBackupDriveComments(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
errors++
|
||||
out.Comments = append(out.Comments, driveBackupComment{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, comment := range comments {
|
||||
out.Comments = append(out.Comments, driveBackupComment{FileID: fileID, Comment: comment})
|
||||
for _, comment := range result.Comments {
|
||||
if comment.Error != "" {
|
||||
errors++
|
||||
}
|
||||
}
|
||||
revisions, err := fetchBackupDriveRevisions(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
errors++
|
||||
out.Revisions = append(out.Revisions, driveBackupRevision{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, revision := range revisions {
|
||||
out.Revisions = append(out.Revisions, driveBackupRevision{FileID: fileID, Revision: revision})
|
||||
for _, revision := range result.Revisions {
|
||||
if revision.Error != "" {
|
||||
errors++
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -96,6 +117,35 @@ func fetchBackupDriveCollaboration(ctx context.Context, svc *drive.Service, file
|
||||
}
|
||||
}
|
||||
|
||||
func fetchBackupDriveFileCollaboration(ctx context.Context, svc *drive.Service, fileID string) driveBackupCollaboration {
|
||||
var out driveBackupCollaboration
|
||||
permissions, err := fetchBackupDrivePermissions(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
out.Permissions = append(out.Permissions, driveBackupPermission{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, permission := range permissions {
|
||||
out.Permissions = append(out.Permissions, driveBackupPermission{FileID: fileID, Permission: permission})
|
||||
}
|
||||
}
|
||||
comments, err := fetchBackupDriveComments(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
out.Comments = append(out.Comments, driveBackupComment{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, comment := range comments {
|
||||
out.Comments = append(out.Comments, driveBackupComment{FileID: fileID, Comment: comment})
|
||||
}
|
||||
}
|
||||
revisions, err := fetchBackupDriveRevisions(ctx, svc, fileID)
|
||||
if err != nil {
|
||||
out.Revisions = append(out.Revisions, driveBackupRevision{FileID: fileID, Error: err.Error()})
|
||||
} else {
|
||||
for _, revision := range revisions {
|
||||
out.Revisions = append(out.Revisions, driveBackupRevision{FileID: fileID, Revision: revision})
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func fetchBackupDrivePermissions(ctx context.Context, svc *drive.Service, fileID string) ([]*drive.Permission, error) {
|
||||
var out []*drive.Permission
|
||||
pageToken := ""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user