From 08c94bb564709d87fe62a094d07f30578b16f594 Mon Sep 17 00:00:00 2001 From: "Bo-Yi.Wu" Date: Sat, 29 Oct 2022 12:26:27 +0800 Subject: [PATCH] chore(runner): cancel task if get the cancel from server Signed-off-by: Bo-Yi.Wu --- go.mod | 2 +- go.sum | 2 ++ runtime/reporter.go | 28 +++++++++++++++---------- runtime/task.go | 50 +++++++++++++++------------------------------ runtime/taskmap.go | 12 ----------- 5 files changed, 36 insertions(+), 58 deletions(-) delete mode 100644 runtime/taskmap.go diff --git a/go.mod b/go.mod index 4fb7716..4407184 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner go 1.18 require ( - gitea.com/gitea/proto-go v0.0.0-20221014123629-9116865c883b + gitea.com/gitea/proto-go v0.0.0-20221028125601-35c4f6b05835 github.com/appleboy/com v0.1.6 github.com/avast/retry-go/v4 v4.1.0 github.com/bufbuild/connect-go v0.5.0 diff --git a/go.sum b/go.sum index 0d39d5c..dd34da3 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ gitea.com/gitea/act v0.0.0-20220922135643-52a5bba9e7fa h1:HHqlvfIvqFlny3sgJgAM1B gitea.com/gitea/act v0.0.0-20220922135643-52a5bba9e7fa/go.mod h1:9W/Nz16tjfnWp7O5DUo3EjZBnZFBI/5rlWstX4o7+hU= gitea.com/gitea/proto-go v0.0.0-20221014123629-9116865c883b h1:TSz7VRHfnM/5JwGPgIAjSlDIvcr4pTGfuRMtgMxttmg= gitea.com/gitea/proto-go v0.0.0-20221014123629-9116865c883b/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= +gitea.com/gitea/proto-go v0.0.0-20221028125601-35c4f6b05835 h1:27PhT7Nli/pgRo1bDYVZ+hlCKuF9cfFuo+y9muaPVJY= +gitea.com/gitea/proto-go v0.0.0-20221028125601-35c4f6b05835/go.mod h1:hD8YwSHusjwjEEgubW6XFvnZuNhMZTHz6lwjfltEt/Y= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= diff --git a/runtime/reporter.go b/runtime/reporter.go index b7f244e..0196421 100644 --- a/runtime/reporter.go +++ b/runtime/reporter.go @@ -18,7 +18,9 @@ import ( ) type Reporter struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc + closed bool client client.Client clientM sync.Mutex @@ -29,9 +31,10 @@ type Reporter struct { stateM sync.RWMutex } -func NewReporter(ctx context.Context, client client.Client, taskID int64) *Reporter { +func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.Client, taskID int64) *Reporter { return &Reporter{ ctx: ctx, + cancel: cancel, client: client, state: &runnerv1.TaskState{ Id: taskID, @@ -218,9 +221,14 @@ func (r *Reporter) ReportState() error { state := proto.Clone(r.state).(*runnerv1.TaskState) r.stateM.RUnlock() - _, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ + resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, })) + + if resp.Msg.State.Result == runnerv1.Result_RESULT_CANCELLED { + r.cancel() + } + return err } @@ -235,14 +243,12 @@ func (r *Reporter) duringSteps() bool { return true } -var ( - stringToResult = map[string]runnerv1.Result{ - "success": runnerv1.Result_RESULT_SUCCESS, - "failure": runnerv1.Result_RESULT_FAILURE, - "skipped": runnerv1.Result_RESULT_SKIPPED, - "cancelled": runnerv1.Result_RESULT_CANCELLED, - } -) +var stringToResult = map[string]runnerv1.Result{ + "success": runnerv1.Result_RESULT_SUCCESS, + "failure": runnerv1.Result_RESULT_FAILURE, + "skipped": runnerv1.Result_RESULT_SKIPPED, + "cancelled": runnerv1.Result_RESULT_CANCELLED, +} func (r *Reporter) parseResult(result interface{}) (runnerv1.Result, bool) { str := "" diff --git a/runtime/task.go b/runtime/task.go index fb15b5e..820374e 100644 --- a/runtime/task.go +++ b/runtime/task.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "gitea.com/gitea/act_runner/client" runnerv1 "gitea.com/gitea/proto-go/runner/v1" @@ -18,9 +19,11 @@ import ( log "github.com/sirupsen/logrus" ) +var globalTaskMap sync.Map + type TaskInput struct { repoDirectory string - actor string + // actor string // workdir string // workflowsPath string // autodetectEvent bool @@ -57,30 +60,10 @@ type TaskInput struct { EnvFile string } -type TaskState int - -const ( - // TaskStateUnknown is the default state - TaskStateUnknown TaskState = iota - // TaskStatePending is the pending state - // pending means task is received, parsing actions and preparing to run - TaskStatePending - // TaskStateRunning is the state when the task is running - // running means task is running - TaskStateRunning - // TaskStateSuccess is the state when the task is successful - // success means task is successful without any error - TaskStateSuccess - // TaskStateFailure is the state when the task is failed - // failure means task is failed with error - TaskStateFailure -) - type Task struct { BuildID int64 Input *TaskInput - state TaskState client client.Client log *log.Entry } @@ -94,7 +77,6 @@ func NewTask(forgeInstance string, buildID int64, client client.Client) *Task { }, BuildID: buildID, - state: TaskStatePending, client: client, log: log.WithField("buildID", buildID), } @@ -124,6 +106,8 @@ func demoPlatforms() map[string]string { } func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() _, exist := globalTaskMap.Load(task.Id) if exist { return fmt.Errorf("task %d already exists", task.Id) @@ -135,11 +119,10 @@ func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error { defer globalTaskMap.Delete(task.Id) lastWords := "" - reporter := NewReporter(ctx, t.client, task.Id) + reporter := NewReporter(ctx, cancel, t.client, task.Id) defer func() { _ = reporter.Close(lastWords) }() - reporter.RunDaemon() reporter.Logf("received task %v of job %v", task.Id, task.Context.Fields["job"].GetStringValue()) @@ -157,17 +140,16 @@ func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error { } var plan *model.Plan - if jobIDs := workflow.GetJobIDs(); len(jobIDs) != 1 { - err := fmt.Errorf("multiple jobs fould: %v", jobIDs) + jobIDs := workflow.GetJobIDs() + if len(jobIDs) != 1 { + err := fmt.Errorf("multiple jobs found: %v", jobIDs) lastWords = err.Error() return err - } else { - jobID := jobIDs[0] - plan = model.CombineWorkflowPlanner(workflow).PlanJob(jobID) - - job := workflow.GetJob(jobID) - reporter.ResetSteps(len(job.Steps)) } + jobID := jobIDs[0] + plan = model.CombineWorkflowPlanner(workflow).PlanJob(jobID) + job := workflow.GetJob(jobID) + reporter.ResetSteps(len(job.Steps)) log.Infof("plan: %+v", plan.Stages[0].Runs) @@ -234,11 +216,11 @@ func (t *Task) Run(ctx context.Context, task *runnerv1.Task) error { return err } - cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) + artifactCancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort) t.log.Debugf("artifacts server started at %s:%s", input.artifactServerPath, input.artifactServerPort) executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error { - cancel() + artifactCancel() return nil }) diff --git a/runtime/taskmap.go b/runtime/taskmap.go deleted file mode 100644 index ca79aa5..0000000 --- a/runtime/taskmap.go +++ /dev/null @@ -1,12 +0,0 @@ -package runtime - -import ( - "sync" -) - -var globalTaskMap sync.Map - -// finishTask removes the task from global map -func finishTask(buildID int64) { - globalTaskMap.Delete(buildID) -}