1
0
mirror of https://gitea.com/gitea/act_runner synced 2024-11-09 18:05:52 +01:00

update protocol

This commit is contained in:
Lunny Xiao 2022-06-20 16:37:28 +08:00 committed by Jason Song
parent 2babadbc94
commit 5903c08c14
3 changed files with 105 additions and 11 deletions

@ -4,7 +4,10 @@ import (
"context"
"encoding/json"
"errors"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/gorilla/websocket"
@ -16,12 +19,22 @@ type Message struct {
Version int //
Type int // message type, 1 register 2 error
RunnerUUID string // runner uuid
BuildUUID string // build uuid
ErrCode int // error code
ErrContent string // errors message
EventName string
EventPayload string
JobID string // only run the special job, empty means run all the jobs
}
const (
MsgTypeRegister = iota + 1 // register
MsgTypeError // error
MsgTypeRequestBuild // request build task
MsgTypeIdle // no task
MsgTypeBuildResult // build result
)
func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args []string) error {
log.Info().Msgf("Starting runner daemon")
@ -56,7 +69,7 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
// register the client
msg := Message{
Version: 1,
Type: 1,
Type: MsgTypeRegister,
RunnerUUID: "111111",
}
bs, err := json.Marshal(&msg)
@ -109,20 +122,80 @@ func runDaemon(ctx context.Context, input *Input) func(cmd *cobra.Command, args
switch msg.Version {
case 1:
switch msg.Type {
case 1:
log.Info().Msgf("received message: %s", message)
case 2:
case 4:
case MsgTypeRegister:
log.Info().Msgf("received registered success: %s", message)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeError:
log.Info().Msgf("received error msessage: %s", message)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeIdle:
log.Info().Msgf("received no task")
case 3:
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeRequestBuild,
RunnerUUID: msg.RunnerUUID,
})
case MsgTypeRequestBuild:
switch msg.EventName {
case "push":
input := Input{
forgeInstance: "github.com",
forgeInstance: "github.com",
reuseContainers: true,
}
if err := runTask(context.Background(), &input, ""); err != nil {
log.Error().Msgf("run task failed: %v", err)
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
done := make(chan error)
go func(chan error) {
done <- runTask(ctx, &input, "")
}(done)
c := time.NewTicker(time.Second)
defer c.Stop()
END:
for {
select {
case <-sigs:
cancel()
log.Info().Msgf("cancel task")
break END
case err := <-done:
if err != nil {
log.Error().Msgf("runTask failed: %v", err)
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
ErrCode: 1,
ErrContent: err.Error(),
})
} else {
log.Error().Msgf("runTask success")
conn.WriteJSON(&Message{
Version: 1,
Type: MsgTypeBuildResult,
RunnerUUID: msg.RunnerUUID,
BuildUUID: msg.BuildUUID,
})
}
break END
case <-c.C:
}
}
default:
log.Warn().Msgf("unknow event %s with payload %s", msg.EventName, msg.EventPayload)
}

@ -7,9 +7,11 @@ import (
"path/filepath"
"github.com/nektos/act/pkg/artifacts"
"github.com/nektos/act/pkg/common"
"github.com/nektos/act/pkg/model"
"github.com/nektos/act/pkg/runner"
"github.com/rs/zerolog/log"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
@ -60,7 +62,10 @@ func (i *Input) newPlatforms() map[string]string {
}
func Execute(ctx context.Context) {
input := Input{}
input := Input{
reuseContainers: true,
forgeInstance: "gitea.com",
}
rootCmd := &cobra.Command{
Use: "act [event name to run]\nIf no event name passed, will default to \"on: push\"",
@ -103,6 +108,17 @@ func getWorkflowsPath() (string, error) {
return p, nil
}
type StepHook struct{}
func (hook *StepHook) Levels() []logrus.Level {
return logrus.AllLevels
}
func (hook *StepHook) Fire(entry *logrus.Entry) error {
fmt.Printf("====== %#v \n ", entry)
return nil
}
func runTask(ctx context.Context, input *Input, jobID string) error {
workflowsPath, err := getWorkflowsPath()
if err != nil {
@ -177,6 +193,11 @@ func runTask(ctx context.Context, input *Input, jobID string) error {
return fmt.Errorf("New config failed: %v", err)
}
log := logrus.StandardLogger()
log.AddHook(&StepHook{})
ctx = common.WithLogger(ctx, log)
cancel := artifacts.Serve(ctx, input.artifactServerPath, input.artifactServerPort)
executor := r.NewPlanExecutor(plan).Finally(func(ctx context.Context) error {

2
go.mod

@ -6,6 +6,7 @@ require (
github.com/gorilla/websocket v1.4.2
github.com/nektos/act v0.2.26
github.com/rs/zerolog v1.26.1
github.com/sirupsen/logrus v1.8.1
github.com/spf13/cobra v1.4.0
)
@ -56,7 +57,6 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/xanzy/ssh-agent v0.3.1 // indirect
go.opencensus.io v0.23.0 // indirect