fix: sync islo workspaces before run
This commit is contained in:
parent
6a45e46b1b
commit
c3c111ba35
@ -7,6 +7,7 @@
|
||||
- Fixed Tailscale exit-node bootstrap paths to prefer tailnet metadata and fail clearly when remote exit-node egress is not active.
|
||||
- Fixed `run --no-sync` timing summaries so they report `sync_skipped=true`.
|
||||
- Fixed native Windows command output so first-use PowerShell progress records do not leak CLIXML into run logs.
|
||||
- Fixed Islo provider sync so `crabbox run --provider islo` uploads the local workspace, uses the correct `/workspace/<workdir>`, and falls back to chunked exec upload while the archive API returns server errors.
|
||||
|
||||
## 0.6.0 - 2026-05-07
|
||||
|
||||
|
||||
@ -299,8 +299,9 @@ short-lived Daytona SSH tokens and redacts those tokens from output. Daytona
|
||||
auth can come from `DAYTONA_API_KEY` / `DAYTONA_JWT_TOKEN` env or an
|
||||
authenticated Daytona CLI profile created by `daytona login --api-key`. With
|
||||
`provider: islo`, Crabbox delegates sandbox setup and command execution to the
|
||||
Islo Go SDK; sync is delegated and `--sync-only`, `--checksum`, and
|
||||
`--force-sync-large` are unsupported.
|
||||
Islo Go SDK, uploads the Crabbox sync manifest as a gzipped archive into the
|
||||
Islo workdir, and rejects only the SSH/rsync-specific `--sync-only` and
|
||||
`--checksum` modes.
|
||||
|
||||
## Exit Codes
|
||||
|
||||
|
||||
@ -7,10 +7,11 @@ Read when:
|
||||
- reviewing delegated provider behavior.
|
||||
|
||||
`provider: islo` delegates sandbox setup and command execution to Islo. Crabbox
|
||||
uses the Islo Go SDK for auth, sandbox lifecycle, list, status, and stop. The
|
||||
SDK's current exec stream helper coalesces output, so Crabbox keeps a small SSE
|
||||
reader for `POST /sandboxes/{name}/exec/stream` while still using the SDK auth
|
||||
provider.
|
||||
uses the Islo Go SDK for auth, sandbox lifecycle, list, status, and stop. It
|
||||
builds the normal Crabbox sync manifest and uploads it as a gzipped archive into
|
||||
the sandbox workdir before executing the command. The SDK's current exec stream
|
||||
helper coalesces output, so Crabbox keeps a small SSE reader for
|
||||
`POST /sandboxes/{name}/exec/stream` while still using the SDK auth provider.
|
||||
|
||||
## Auth
|
||||
|
||||
@ -49,11 +50,12 @@ crabbox stop --provider islo <slug>
|
||||
|
||||
- `warmup` creates a `crabbox-...` Islo sandbox and stores a local lease ID of
|
||||
the form `isb_<crabbox-sandbox-name>` plus a Crabbox slug.
|
||||
- `run` creates or reuses a sandbox, streams stdout/stderr from Islo's SSE exec
|
||||
- `run` creates or reuses a sandbox, syncs the local Git-managed working set
|
||||
into `/workspace/<islo.workdir>`, streams stdout/stderr from Islo's SSE exec
|
||||
endpoint, and returns the remote exit code.
|
||||
- Sync is delegated to Islo. `--sync-only`, `--checksum`, and
|
||||
`--force-sync-large` are rejected because Crabbox cannot honor those local
|
||||
rsync options.
|
||||
- `--sync-only` and `--checksum` are rejected because Islo does not expose a
|
||||
Crabbox SSH/rsync target. Large-sync guardrails still apply, and
|
||||
`--force-sync-large` is honored for intentional large archive syncs.
|
||||
- `list`, `status`, and `stop` use the Islo SDK and return core-rendered
|
||||
Crabbox views for Crabbox-created sandboxes only.
|
||||
|
||||
|
||||
@ -8,8 +8,9 @@ Read when:
|
||||
|
||||
Islo is a delegated run provider. Crabbox uses the Islo SDK for sandbox
|
||||
lifecycle and a streaming exec endpoint for command output. Islo owns sandbox
|
||||
state and workspace setup; Crabbox owns local config, repo claims, slugs,
|
||||
timing summaries, and normalized list/status rendering.
|
||||
state and command transport; Crabbox owns local config, repo claims, sync
|
||||
manifests and guardrails, slugs, timing summaries, and normalized list/status
|
||||
rendering.
|
||||
|
||||
## When To Use
|
||||
|
||||
@ -69,23 +70,27 @@ Provider flags:
|
||||
|
||||
1. Create or resolve a Crabbox-owned Islo sandbox.
|
||||
2. Store a local lease ID with the `isb_` prefix and a friendly slug.
|
||||
3. Execute commands through Islo's streaming exec endpoint.
|
||||
4. Require an exit event before treating a stream as successful.
|
||||
5. Delete the sandbox on release unless kept.
|
||||
3. Build the Crabbox sync manifest and upload a gzipped archive into
|
||||
`/workspace/<islo.workdir>`.
|
||||
4. Execute commands through Islo's streaming exec endpoint in that workdir.
|
||||
5. Require an exit event before treating a stream as successful.
|
||||
6. Delete the sandbox on release unless kept.
|
||||
|
||||
## Capabilities
|
||||
|
||||
- SSH: no.
|
||||
- Crabbox sync: no.
|
||||
- Provider sync: yes, Islo-owned.
|
||||
- Crabbox sync: yes, archive sync through the Islo API or chunked exec fallback.
|
||||
- Provider sync: no separate Islo CLI sync.
|
||||
- Desktop/browser/code: no Crabbox VNC/code surface.
|
||||
- Actions hydration: no.
|
||||
- Coordinator: no.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- `--sync-only`, `--checksum`, and `--force-sync-large` are rejected because
|
||||
Crabbox cannot apply local rsync semantics.
|
||||
- `--sync-only` and `--checksum` are rejected because Islo does not expose a
|
||||
Crabbox SSH/rsync target.
|
||||
- Large-sync guardrails still apply. Use `--force-sync-large` when a large Islo
|
||||
archive sync is intentional.
|
||||
- `--shell` passes the raw shell string to the remote shell path.
|
||||
- IDs can be Crabbox slugs, `isb_...` lease IDs, or Crabbox-created sandbox
|
||||
names. Non-Crabbox Islo sandboxes are rejected.
|
||||
|
||||
@ -142,7 +142,7 @@ func (b *isloBackend) Warmup(ctx context.Context, req WarmupRequest) error {
|
||||
}
|
||||
|
||||
func (b *isloBackend) Run(ctx context.Context, req RunRequest) (RunResult, error) {
|
||||
if err := rejectDelegatedSyncOptions(isloProvider, req); err != nil {
|
||||
if err := rejectIsloSyncOptions(req); err != nil {
|
||||
return RunResult{}, err
|
||||
}
|
||||
started := b.now()
|
||||
@ -176,8 +176,21 @@ func (b *isloBackend) Run(ctx context.Context, req RunRequest) (RunResult, error
|
||||
}()
|
||||
}
|
||||
fmt.Fprintf(b.rt.Stderr, "provider=islo lease=%s sandbox=%s\n", leaseID, name)
|
||||
syncDuration := time.Duration(0)
|
||||
syncPhases := []timingPhase{{Name: "sync", Skipped: true, Reason: "--no-sync"}}
|
||||
workspace := isloWorkspacePath(b.cfg)
|
||||
if !req.NoSync {
|
||||
var err error
|
||||
syncPhases, syncDuration, err = b.syncWorkspace(ctx, client, name, req)
|
||||
if err != nil {
|
||||
return RunResult{}, err
|
||||
}
|
||||
fmt.Fprintf(b.rt.Stderr, "sync complete in %s\n", syncDuration.Round(time.Millisecond))
|
||||
} else if err := b.prepareWorkspace(ctx, client, name, workspace); err != nil {
|
||||
return RunResult{}, err
|
||||
}
|
||||
commandStart := b.now()
|
||||
exitCode, runErr := b.exec(ctx, client, name, req.Command, req.ShellMode)
|
||||
exitCode, runErr := b.exec(ctx, client, name, workspace, req.Command, req.ShellMode)
|
||||
commandDuration := b.now().Sub(commandStart)
|
||||
result := RunResult{
|
||||
ExitCode: exitCode,
|
||||
@ -185,13 +198,19 @@ func (b *isloBackend) Run(ctx context.Context, req RunRequest) (RunResult, error
|
||||
Total: b.now().Sub(started),
|
||||
SyncDelegated: true,
|
||||
}
|
||||
fmt.Fprintf(b.rt.Stderr, "islo run summary command=%s total=%s exit=%d\n", result.Command.Round(time.Millisecond), result.Total.Round(time.Millisecond), exitCode)
|
||||
if req.NoSync {
|
||||
fmt.Fprintf(b.rt.Stderr, "islo run summary sync_skipped=true command=%s total=%s exit=%d\n", result.Command.Round(time.Millisecond), result.Total.Round(time.Millisecond), exitCode)
|
||||
} else {
|
||||
fmt.Fprintf(b.rt.Stderr, "islo run summary sync=%s command=%s total=%s exit=%d\n", syncDuration.Round(time.Millisecond), result.Command.Round(time.Millisecond), result.Total.Round(time.Millisecond), exitCode)
|
||||
}
|
||||
if req.TimingJSON {
|
||||
if err := writeTimingJSON(b.rt.Stderr, timingReport{
|
||||
Provider: isloProvider,
|
||||
LeaseID: leaseID,
|
||||
SyncDelegated: true,
|
||||
SyncPhases: []timingPhase{{Name: "delegated", Skipped: true, Reason: "islo owns sandbox state"}},
|
||||
SyncMs: syncDuration.Milliseconds(),
|
||||
SyncPhases: syncPhases,
|
||||
SyncSkipped: req.NoSync,
|
||||
CommandMs: result.Command.Milliseconds(),
|
||||
TotalMs: result.Total.Milliseconds(),
|
||||
ExitCode: exitCode,
|
||||
@ -318,14 +337,14 @@ func (b *isloBackend) createSandbox(ctx context.Context, client isloAPI, repo Re
|
||||
return leaseID, sandbox.GetName(), slug, nil
|
||||
}
|
||||
|
||||
func (b *isloBackend) exec(ctx context.Context, client isloAPI, name string, command []string, shellMode bool) (int, error) {
|
||||
func (b *isloBackend) exec(ctx context.Context, client isloAPI, name, workdir string, command []string, shellMode bool) (int, error) {
|
||||
execCommand, err := isloExecCommand(command, shellMode)
|
||||
if err != nil {
|
||||
return 2, err
|
||||
}
|
||||
req := &gosdk.ExecRequest{Command: execCommand}
|
||||
if b.cfg.Islo.Workdir != "" {
|
||||
req.Workdir = stringValue(b.cfg.Islo.Workdir)
|
||||
if workdir != "" {
|
||||
req.Workdir = stringValue(workdir)
|
||||
}
|
||||
return client.ExecStream(ctx, name, req, b.rt.Stdout, b.rt.Stderr)
|
||||
}
|
||||
|
||||
@ -1,14 +1,23 @@
|
||||
package islo
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"os/exec"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
gosdk "github.com/islo-labs/go-sdk"
|
||||
)
|
||||
|
||||
func TestParseIsloSSE(t *testing.T) {
|
||||
@ -114,6 +123,100 @@ func TestResolveIsloLeaseIDRejectsUnclaimedRawSandbox(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsloWorkspacePathDefaultsUnderWorkspace(t *testing.T) {
|
||||
if got := isloWorkspacePath(Config{}); got != "/workspace/crabbox" {
|
||||
t.Fatalf("workspace=%q", got)
|
||||
}
|
||||
if got := isloWorkspacePath(Config{Islo: IsloConfig{Workdir: "repo"}}); got != "/workspace/repo" {
|
||||
t.Fatalf("workspace=%q", got)
|
||||
}
|
||||
if got := isloWorkspacePath(Config{Islo: IsloConfig{Workdir: "/work/repo"}}); got != "/work/repo" {
|
||||
t.Fatalf("workspace=%q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsloSyncWorkspaceUploadsRepoArchive(t *testing.T) {
|
||||
if _, err := exec.LookPath("git"); err != nil {
|
||||
t.Skip("git not available")
|
||||
}
|
||||
if _, err := exec.LookPath("tar"); err != nil {
|
||||
t.Skip("tar not available")
|
||||
}
|
||||
root := t.TempDir()
|
||||
if err := os.WriteFile(root+"/go.mod", []byte("module example.test/repo\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cmd := exec.Command("git", "init")
|
||||
cmd.Dir = root
|
||||
if out, err := cmd.CombinedOutput(); err != nil {
|
||||
t.Fatalf("git init: %v\n%s", err, out)
|
||||
}
|
||||
client := &fakeIsloSyncClient{}
|
||||
backend := &isloBackend{
|
||||
cfg: Config{Islo: IsloConfig{Workdir: "repo"}},
|
||||
rt: Runtime{Stderr: io.Discard},
|
||||
}
|
||||
_, _, err := backend.syncWorkspace(context.Background(), client, "crabbox-test", RunRequest{
|
||||
Repo: Repo{Root: root, Name: "repo"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if client.uploadPath != "/workspace/repo" {
|
||||
t.Fatalf("upload path=%q", client.uploadPath)
|
||||
}
|
||||
if len(client.prepareCommands) != 1 || !strings.Contains(client.prepareCommands[0], "mkdir -p '/workspace/repo'") {
|
||||
t.Fatalf("prepare commands=%#v", client.prepareCommands)
|
||||
}
|
||||
if !tarGzipContains(t, client.uploaded.Bytes(), "go.mod") {
|
||||
t.Fatal("uploaded archive missing go.mod")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsloSyncWorkspaceFallsBackToExecUpload(t *testing.T) {
|
||||
if _, err := exec.LookPath("git"); err != nil {
|
||||
t.Skip("git not available")
|
||||
}
|
||||
if _, err := exec.LookPath("tar"); err != nil {
|
||||
t.Skip("tar not available")
|
||||
}
|
||||
root := t.TempDir()
|
||||
if err := os.WriteFile(root+"/go.mod", []byte("module example.test/repo\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cmd := exec.Command("git", "init")
|
||||
cmd.Dir = root
|
||||
if out, err := cmd.CombinedOutput(); err != nil {
|
||||
t.Fatalf("git init: %v\n%s", err, out)
|
||||
}
|
||||
client := &fakeIsloSyncClient{uploadErr: errors.New("api upload failed"), closeUploadReader: true}
|
||||
backend := &isloBackend{
|
||||
cfg: Config{Islo: IsloConfig{Workdir: "repo"}},
|
||||
rt: Runtime{Stderr: io.Discard},
|
||||
}
|
||||
_, _, err := backend.syncWorkspace(context.Background(), client, "crabbox-test", RunRequest{
|
||||
Repo: Repo{Root: root, Name: "repo"},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !client.commandContains("base64 -d") || !client.commandContains("tar -xzf") {
|
||||
t.Fatalf("fallback commands=%#v", client.prepareCommands)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRejectIsloSyncOptionsAllowsForceSyncLarge(t *testing.T) {
|
||||
if err := rejectIsloSyncOptions(RunRequest{ForceSyncLarge: true}); err != nil {
|
||||
t.Fatalf("force sync large should be honored by Islo archive sync: %v", err)
|
||||
}
|
||||
if err := rejectIsloSyncOptions(RunRequest{SyncOnly: true}); err == nil || !strings.Contains(err.Error(), "--sync-only") {
|
||||
t.Fatalf("sync-only err=%v", err)
|
||||
}
|
||||
if err := rejectIsloSyncOptions(RunRequest{ChecksumSync: true}); err == nil || !strings.Contains(err.Error(), "--checksum") {
|
||||
t.Fatalf("checksum err=%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewIsloSandboxNameUsesCrabboxPrefix(t *testing.T) {
|
||||
name := newIsloSandboxName(Repo{Name: "repo"})
|
||||
if !strings.HasPrefix(name, "crabbox-repo-") {
|
||||
@ -179,3 +282,125 @@ func TestIsloSDKClientListUsesInjectedHTTPAndPaginates(t *testing.T) {
|
||||
t.Fatalf("authHits=%d listHits=%d", authHits, listHits)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsloSDKClientUploadArchiveStreamsRawTarball(t *testing.T) {
|
||||
authHits := 0
|
||||
uploadHits := 0
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/auth/token":
|
||||
authHits++
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"session_token": "jwt-from-test",
|
||||
"cookie_max_age": 3600,
|
||||
})
|
||||
case "/sandboxes/crabbox-test/files-archive":
|
||||
uploadHits++
|
||||
if got := r.Header.Get("Authorization"); got != "Bearer jwt-from-test" {
|
||||
t.Fatalf("Authorization=%q", got)
|
||||
}
|
||||
if got := r.Header.Get("Content-Type"); got != "application/gzip" {
|
||||
t.Fatalf("Content-Type=%q", got)
|
||||
}
|
||||
if got := r.URL.Query().Get("path"); got != "/workspace/repo" {
|
||||
t.Fatalf("path=%q", got)
|
||||
}
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(body) != "archive" {
|
||||
t.Fatalf("body=%q", string(body))
|
||||
}
|
||||
_, _ = w.Write([]byte(`{}`))
|
||||
default:
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
api, err := newIsloClient(Config{Islo: IsloConfig{APIKey: "ak_test", BaseURL: srv.URL}}, Runtime{HTTP: srv.Client()})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := api.UploadArchive(t.Context(), "crabbox-test", "/workspace/repo", strings.NewReader("archive")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if authHits != 1 || uploadHits != 1 {
|
||||
t.Fatalf("authHits=%d uploadHits=%d", authHits, uploadHits)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeIsloSyncClient struct {
|
||||
prepareCommands []string
|
||||
uploadPath string
|
||||
uploaded bytes.Buffer
|
||||
uploadErr error
|
||||
closeUploadReader bool
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) CreateSandbox(context.Context, *gosdk.SandboxCreate) (*gosdk.SandboxResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) GetSandbox(context.Context, string) (*gosdk.SandboxResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) ListSandboxes(context.Context) ([]*gosdk.SandboxResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) DeleteSandbox(context.Context, string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) UploadArchive(_ context.Context, _ string, targetPath string, archive io.Reader) error {
|
||||
f.uploadPath = targetPath
|
||||
_, err := io.Copy(&f.uploaded, archive)
|
||||
if f.closeUploadReader {
|
||||
if closer, ok := archive.(io.Closer); ok {
|
||||
_ = closer.Close()
|
||||
}
|
||||
}
|
||||
if f.uploadErr != nil {
|
||||
return f.uploadErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) ExecStream(_ context.Context, _ string, req *gosdk.ExecRequest, _, _ io.Writer) (int, error) {
|
||||
f.prepareCommands = append(f.prepareCommands, strings.Join(req.GetCommand(), " "))
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (f *fakeIsloSyncClient) commandContains(value string) bool {
|
||||
for _, command := range f.prepareCommands {
|
||||
if strings.Contains(command, value) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func tarGzipContains(t *testing.T, data []byte, name string) bool {
|
||||
t.Helper()
|
||||
gz, err := gzip.NewReader(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer gz.Close()
|
||||
tr := tar.NewReader(gz)
|
||||
for {
|
||||
header, err := tr.Next()
|
||||
if err == io.EOF {
|
||||
return false
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if header.Name == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@ -23,6 +24,7 @@ type isloAPI interface {
|
||||
GetSandbox(context.Context, string) (*gosdk.SandboxResponse, error)
|
||||
ListSandboxes(context.Context) ([]*gosdk.SandboxResponse, error)
|
||||
DeleteSandbox(context.Context, string) error
|
||||
UploadArchive(context.Context, string, string, io.Reader) error
|
||||
ExecStream(context.Context, string, *gosdk.ExecRequest, io.Writer, io.Writer) (int, error)
|
||||
}
|
||||
|
||||
@ -93,6 +95,38 @@ func (c *isloSDKClient) DeleteSandbox(ctx context.Context, name string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *isloSDKClient) UploadArchive(ctx context.Context, name, targetPath string, archive io.Reader) error {
|
||||
u, err := url.Parse(c.baseURL + "/sandboxes/" + url.PathEscape(name) + "/files-archive")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q := u.Query()
|
||||
q.Set("path", targetPath)
|
||||
u.RawQuery = q.Encode()
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), archive)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
token, err := c.auth.Token(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("islo auth: %w", err)
|
||||
}
|
||||
httpReq.Header.Set("Authorization", "Bearer "+token)
|
||||
httpReq.Header.Set("Content-Type", "application/gzip")
|
||||
httpReq.Header.Set("Accept", "application/json")
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("islo upload archive: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
if resp.StatusCode >= 400 {
|
||||
snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
||||
return fmt.Errorf("islo upload archive %s: %s", resp.Status, strings.TrimSpace(string(snippet)))
|
||||
}
|
||||
_, _ = io.Copy(io.Discard, resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *isloSDKClient) ExecStream(ctx context.Context, name string, req *gosdk.ExecRequest, stdout, stderr io.Writer) (int, error) {
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
|
||||
@ -57,3 +57,19 @@ func resolveLeaseClaim(identifier string) (core.LeaseClaim, bool, error) {
|
||||
func removeLeaseClaim(leaseID string) {
|
||||
core.RemoveLeaseClaim(leaseID)
|
||||
}
|
||||
|
||||
func syncExcludes(root string, cfg Config) ([]string, error) {
|
||||
return core.SyncExcludes(root, cfg)
|
||||
}
|
||||
|
||||
func syncManifest(root string, excludes []string) (core.SyncManifest, error) {
|
||||
return core.BuildSyncManifest(root, excludes)
|
||||
}
|
||||
|
||||
func checkSyncPreflight(manifest core.SyncManifest, cfg Config, force bool, stderr io.Writer) error {
|
||||
return core.CheckSyncPreflight(manifest, cfg, force, stderr)
|
||||
}
|
||||
|
||||
func shellQuote(s string) string {
|
||||
return core.ShellQuote(s)
|
||||
}
|
||||
|
||||
174
internal/providers/islo/sync.go
Normal file
174
internal/providers/islo/sync.go
Normal file
@ -0,0 +1,174 @@
|
||||
package islo
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
gosdk "github.com/islo-labs/go-sdk"
|
||||
core "github.com/openclaw/crabbox/internal/cli"
|
||||
)
|
||||
|
||||
type SyncManifest = core.SyncManifest
|
||||
|
||||
func rejectIsloSyncOptions(req RunRequest) error {
|
||||
if req.SyncOnly {
|
||||
return exit(2, "%s uses Islo archive sync; --sync-only is not supported", isloProvider)
|
||||
}
|
||||
if req.ChecksumSync {
|
||||
return exit(2, "%s uses Islo archive sync; --checksum is not supported", isloProvider)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *isloBackend) syncWorkspace(ctx context.Context, client isloAPI, name string, req RunRequest) ([]timingPhase, time.Duration, error) {
|
||||
start := b.now()
|
||||
excludes, err := syncExcludes(req.Repo.Root, b.cfg)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
manifestStarted := b.now()
|
||||
manifest, err := syncManifest(req.Repo.Root, excludes)
|
||||
if err != nil {
|
||||
return nil, 0, exit(6, "build sync file list: %v", err)
|
||||
}
|
||||
manifestDuration := b.now().Sub(manifestStarted)
|
||||
preflightStarted := b.now()
|
||||
if err := checkSyncPreflight(manifest, b.cfg, req.ForceSyncLarge, b.rt.Stderr); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
preflightDuration := b.now().Sub(preflightStarted)
|
||||
workspace := isloWorkspacePath(b.cfg)
|
||||
prepareStarted := b.now()
|
||||
if err := b.prepareWorkspace(ctx, client, name, workspace); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
prepareDuration := b.now().Sub(prepareStarted)
|
||||
archiveStarted := b.now()
|
||||
archive, err := createIsloSyncArchive(ctx, req.Repo, manifest, b.rt.Stderr)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer os.Remove(archive.Name())
|
||||
defer archive.Close()
|
||||
archiveDuration := b.now().Sub(archiveStarted)
|
||||
uploadStarted := b.now()
|
||||
if _, err := archive.Seek(0, 0); err != nil {
|
||||
return nil, 0, fmt.Errorf("islo rewind archive: %w", err)
|
||||
}
|
||||
if err := client.UploadArchive(ctx, name, workspace, struct{ io.Reader }{archive}); err != nil {
|
||||
fmt.Fprintf(b.rt.Stderr, "warning: islo archive API upload failed; falling back to exec upload: %v\n", err)
|
||||
if _, seekErr := archive.Seek(0, 0); seekErr != nil {
|
||||
return nil, 0, fmt.Errorf("islo rewind archive for fallback: %w", seekErr)
|
||||
}
|
||||
if fallbackErr := b.uploadArchiveViaExec(ctx, client, name, workspace, archive); fallbackErr != nil {
|
||||
return nil, 0, fallbackErr
|
||||
}
|
||||
}
|
||||
uploadDuration := b.now().Sub(uploadStarted)
|
||||
total := b.now().Sub(start)
|
||||
return []timingPhase{
|
||||
{Name: "manifest", Ms: manifestDuration.Milliseconds()},
|
||||
{Name: "preflight", Ms: preflightDuration.Milliseconds()},
|
||||
{Name: "prepare", Ms: prepareDuration.Milliseconds()},
|
||||
{Name: "archive", Ms: archiveDuration.Milliseconds()},
|
||||
{Name: "upload", Ms: uploadDuration.Milliseconds()},
|
||||
{Name: "islo_sync", Ms: total.Milliseconds()},
|
||||
}, total, nil
|
||||
}
|
||||
|
||||
func (b *isloBackend) prepareWorkspace(ctx context.Context, client isloAPI, name, workspace string) error {
|
||||
command := "mkdir -p " + shellQuote(workspace)
|
||||
if b.cfg.Sync.Delete {
|
||||
command = "rm -rf " + shellQuote(workspace) + " && " + command
|
||||
}
|
||||
return b.execShell(ctx, client, name, command, io.Discard)
|
||||
}
|
||||
|
||||
func (b *isloBackend) uploadArchiveViaExec(ctx context.Context, client isloAPI, name, workspace string, archive io.Reader) error {
|
||||
suffix := isloRandomSuffix()
|
||||
remoteB64 := path.Join("/tmp", "crabbox-"+suffix+".tgz.b64")
|
||||
remoteArchive := path.Join("/tmp", "crabbox-"+suffix+".tgz")
|
||||
if err := b.execShell(ctx, client, name, "rm -f "+shellQuote(remoteB64)+" "+shellQuote(remoteArchive), io.Discard); err != nil {
|
||||
return err
|
||||
}
|
||||
buf := make([]byte, 48*1024)
|
||||
for {
|
||||
n, readErr := archive.Read(buf)
|
||||
if n > 0 {
|
||||
chunk := base64.StdEncoding.EncodeToString(buf[:n])
|
||||
command := "printf %s " + shellQuote(chunk) + " >> " + shellQuote(remoteB64)
|
||||
if err := b.execShell(ctx, client, name, command, io.Discard); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if readErr == io.EOF {
|
||||
break
|
||||
}
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("islo read archive for fallback upload: %w", readErr)
|
||||
}
|
||||
}
|
||||
extract := strings.Join([]string{
|
||||
"if base64 -d " + shellQuote(remoteB64) + " > " + shellQuote(remoteArchive) + " 2>/dev/null; then :; else base64 --decode " + shellQuote(remoteB64) + " > " + shellQuote(remoteArchive) + "; fi",
|
||||
"tar -xzf " + shellQuote(remoteArchive) + " -C " + shellQuote(workspace),
|
||||
"rm -f " + shellQuote(remoteB64) + " " + shellQuote(remoteArchive),
|
||||
}, " && ")
|
||||
return b.execShell(ctx, client, name, extract, io.Discard)
|
||||
}
|
||||
|
||||
func (b *isloBackend) execShell(ctx context.Context, client isloAPI, name, command string, stdout io.Writer) error {
|
||||
code, err := client.ExecStream(ctx, name, &gosdk.ExecRequest{Command: []string{"bash", "-lc", command}}, stdout, b.rt.Stderr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("islo exec %q: %w", command, err)
|
||||
}
|
||||
if code != 0 {
|
||||
return exit(code, "islo exec %q exited %d", command, code)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func createIsloSyncArchive(ctx context.Context, repo Repo, manifest SyncManifest, stderr io.Writer) (*os.File, error) {
|
||||
var input bytes.Buffer
|
||||
input.Write(manifest.NUL())
|
||||
archive, err := os.CreateTemp("", "crabbox-islo-sync-*.tgz")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create sync archive temp file: %w", err)
|
||||
}
|
||||
keep := false
|
||||
defer func() {
|
||||
if !keep {
|
||||
name := archive.Name()
|
||||
_ = archive.Close()
|
||||
_ = os.Remove(name)
|
||||
}
|
||||
}()
|
||||
cmd := exec.CommandContext(ctx, "tar", "--no-xattrs", "-czf", "-", "-C", repo.Root, "--null", "-T", "-")
|
||||
cmd.Stdin = &input
|
||||
cmd.Env = append(os.Environ(), "COPYFILE_DISABLE=1")
|
||||
cmd.Stdout = archive
|
||||
cmd.Stderr = stderr
|
||||
if err := cmd.Run(); err != nil {
|
||||
return nil, exit(6, "create sync archive: %v", err)
|
||||
}
|
||||
keep = true
|
||||
return archive, nil
|
||||
}
|
||||
|
||||
func isloWorkspacePath(cfg Config) string {
|
||||
workdir := strings.TrimSpace(cfg.Islo.Workdir)
|
||||
if workdir == "" {
|
||||
workdir = "crabbox"
|
||||
}
|
||||
if strings.HasPrefix(workdir, "/") {
|
||||
return path.Clean(workdir)
|
||||
}
|
||||
return path.Join("/workspace", workdir)
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user