diff --git a/go.mod b/go.mod index 1a4062a..881faf2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.com/gitea/act_runner go 1.20 require ( - code.gitea.io/actions-proto-go v0.3.0 + code.gitea.io/actions-proto-go v0.3.1 code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 github.com/avast/retry-go/v4 v4.3.1 github.com/bufbuild/connect-go v1.3.1 diff --git a/go.sum b/go.sum index 01521fd..6b82b6a 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -code.gitea.io/actions-proto-go v0.3.0 h1:9Tvg8+TaaCXPKi6EnWl9vVgs2VZsj1Cs5afnsHa4AmM= -code.gitea.io/actions-proto-go v0.3.0/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A= +code.gitea.io/actions-proto-go v0.3.1 h1:PMyiQtBKb8dNnpEO2R5rcZdXSis+UQZVo/SciMtR1aU= +code.gitea.io/actions-proto-go v0.3.1/go.mod h1:00ys5QDo1iHN1tHNvvddAcy2W/g+425hQya1cCSvq9A= code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5 h1:daBEK2GQeqGikJESctP5Cu1i33z5ztAD4kyQWiw185M= code.gitea.io/gitea-vet v0.2.3-0.20230113022436-2b1561217fa5/go.mod h1:zcNbT/aJEmivCAhfmkHOlT645KNOf9W2KnkLgFjGGfE= gitea.com/gitea/act v0.246.2-0.20230717034634-cdc6d4bc6a38 h1:whUEO/qPkYfpbL1he9TuIIzz2P4v6xEwb2lT6E/4F7A= diff --git a/internal/app/poll/poller.go b/internal/app/poll/poller.go index 2ee2b8a..f79e98e 100644 --- a/internal/app/poll/poller.go +++ b/internal/app/poll/poller.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" "github.com/bufbuild/connect-go" @@ -20,9 +21,10 @@ import ( ) type Poller struct { - client client.Client - runner *run.Runner - cfg *config.Config + client client.Client + runner *run.Runner + cfg *config.Config + tasksVersion atomic.Int64 // tasksVersion used to store the version of the last task fetched from the Gitea. } func New(cfg *config.Config, client client.Client, runner *run.Runner) *Poller { @@ -77,7 +79,11 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { reqCtx, cancel := context.WithTimeout(ctx, p.cfg.Runner.FetchTimeout) defer cancel() - resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{})) + // Load the version value that was in the cache when the request was sent. + v := p.tasksVersion.Load() + resp, err := p.client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{ + TasksVersion: v, + })) if errors.Is(err, context.DeadlineExceeded) { err = nil } @@ -86,8 +92,20 @@ func (p *Poller) fetchTask(ctx context.Context) (*runnerv1.Task, bool) { return nil, false } - if resp == nil || resp.Msg == nil || resp.Msg.Task == nil { + if resp == nil || resp.Msg == nil { return nil, false } + + if resp.Msg.TasksVersion > v { + p.tasksVersion.CompareAndSwap(v, resp.Msg.TasksVersion) + } + + if resp.Msg.Task == nil { + return nil, false + } + + // got a task, set `tasksVersion` to zero to focre query db in next request. + p.tasksVersion.CompareAndSwap(resp.Msg.TasksVersion, 0) + return resp.Msg.Task, true }