GoBroke/runner/funcs.go
2025-02-04 12:14:08 -05:00

334 lines
8.8 KiB
Go

package runner
import (
`bytes`
`os`
`os/exec`
`sync`
`time`
`github.com/chigopher/pathlib`
`github.com/google/uuid`
`r00t2.io/gobroke/conf`
`r00t2.io/gobroke/tunnelbroker`
`r00t2.io/goutils/logging`
`r00t2.io/goutils/multierr`
)
// Run takes a conf.Config, applies checks/updates, and any templating/commands if needed.
func Run(cfg *conf.Config, log logging.Logger) (results []*TunnelResult, changed bool, updated bool, err error) {
var wg sync.WaitGroup
var errChan chan error
var doneChan chan bool
var tunChan chan *TunnelResult
var numJobs int
var tmpTun *TunnelResult
var cmd *exec.Cmd
var cmdId uuid.UUID
var stdout *bytes.Buffer = new(bytes.Buffer)
var stderr *bytes.Buffer = new(bytes.Buffer)
var mErr *multierr.MultiError = multierr.NewMultiError(nil)
if cfg.Tunnels == nil || len(cfg.Tunnels) == 0 {
return
}
log.Debug("runner.Run: Running check/update for %d tunnels.", len(cfg.Tunnels))
if !cfg.SingleTunnel {
numJobs = len(cfg.Tunnels)
errChan = make(chan error, numJobs)
tunChan = make(chan *TunnelResult, numJobs)
doneChan = make(chan bool, 1)
log.Debug("runner.Run: Single-tunnel disabled; running async tunnel checks/updates.")
for _, tun := range cfg.Tunnels {
wg.Add(1)
go runAsync(tun, &wg, tunChan, log, errChan)
}
go func() {
wg.Wait()
close(tunChan)
close(errChan)
doneChan <- true
}()
<-doneChan
for i := 0; i < numJobs; i++ {
if err = <-errChan; err != nil {
mErr.AddError(err)
err = nil
}
if tmpTun = <-tunChan; tmpTun != nil {
results = append(results, tmpTun)
if tmpTun.Changed {
changed = true
}
if tmpTun.Updated {
updated = true
}
}
}
} else {
log.Debug("runner.Run: Single-tunnel enabled; running sequential tunnel checks/updates.")
for _, tun := range cfg.Tunnels {
if tmpTun, err = run(tun, log); err != nil {
mErr.AddError(err)
err = nil
}
if tmpTun == nil {
continue
}
results = append(results, tmpTun)
if tmpTun.Changed {
changed = true
}
if tmpTun.Updated {
updated = true
}
}
}
if !mErr.IsEmpty() {
log.Err("runner.Run: Received error(s) running tunnels:\n%v", mErr.Error())
err = mErr
return
}
if cfg.Cmds != nil && len(cfg.Cmds) > 0 {
log.Debug("runner.Run: Running %d commands.", len(cfg.Cmds))
for _, cmdSpec := range cfg.Cmds {
if cmdSpec == nil {
continue
}
if cmdSpec.OnChanges == nil || *cmdSpec.OnChanges == changed {
if cmd, err = cmdSpec.ToCmd(); err != nil {
return
}
cmdId = uuid.New()
stdout.Reset()
stderr.Reset()
cmd.Stdout = stdout
cmd.Stderr = stderr
log.Debug("runner.Run: Command '%s': %s", cmdId.String(), cmd.String())
if err = cmd.Run(); err != nil {
mErr.AddError(err)
err = nil
}
if stdout.Len() > 0 {
log.Debug("runner.run: Command '%s' STDOUT:\n%s", cmdId.String(), stdout.String())
}
if stderr.Len() > 0 {
log.Err("runner.run: Command '%s' STDERR:\n%s", cmdId.String(), stderr.String())
}
}
}
}
if !mErr.IsEmpty() {
log.Err("runner.Run: Received error(s) running commands:\n%v", mErr.Error())
err = mErr
return
}
log.Debug("runner.Run: Finished check/update successfully.")
return
}
// run actually does the thing. This is used if conf.Config.SingleTunnel is true, and wrapped by runAsync.
func run(t *conf.Tunnel, log logging.Logger) (result *TunnelResult, err error) {
var b []byte
var cmd *exec.Cmd
var destPath *pathlib.Path
var destDir *pathlib.Path
var destExists bool
var dirExists bool
var tplChanged bool
var cmdId uuid.UUID
var stdout *bytes.Buffer = new(bytes.Buffer)
var stderr *bytes.Buffer = new(bytes.Buffer)
var tplBuf *bytes.Buffer = new(bytes.Buffer)
var res TunnelResult = TunnelResult{
Config: t,
TunnelBefore: nil,
TunnelAfter: nil,
Updated: false,
Changed: false,
RunTimestamp: time.Now(),
}
log.Debug("runner.run: Running tunnel ID %d.", t.TunnelID)
if res.TunnelBefore, err = tunnelbroker.GetTunnel(t, t.IsDebug()); err != nil {
log.Err("runner.run: Received error getting upstream tunnel configuration for tunnel %d: %v", t.TunnelID, err)
return
}
if res.Updated, err = res.TunnelBefore.Update(); err != nil {
log.Err("runner.run: Received error checking/updating tunnel configuration for tunnel %d: %v", t.TunnelID, err)
return
}
if res.Updated {
log.Debug("runner.run: Tunnel %d is changed.", t.TunnelID)
if res.TunnelAfter, err = tunnelbroker.GetTunnel(t, t.IsDebug()); err != nil {
log.Err("runner.run: Received error getting upstream tunnel configuration for tunnel %d (post-update): %v", t.TunnelID, err)
return
}
} else {
log.Debug("runner.run: Tunnel %d is not changed.", t.TunnelID)
res.TunnelAfter = res.TunnelBefore
}
if t.TemplateConfigs != nil && len(t.TemplateConfigs) > 0 {
log.Debug("runner.run: Running %d templates for tunnel %d.", len(t.TemplateConfigs), t.TunnelID)
for tplIdx, tplSpec := range t.TemplateConfigs {
if tplSpec == nil {
continue
}
log.Debug("runner.run: Running template %d ('%s') for tunnel %d.", tplIdx, tplSpec.Template, t.TunnelID)
tplBuf.Reset()
b = nil
tplChanged = false
if err = tplSpec.Tpl.Execute(tplBuf, res); err != nil {
return
}
destPath = pathlib.NewPath(tplSpec.Dest)
destDir = destPath.Parent()
if destExists, err = destPath.Exists(); err != nil {
return
}
if dirExists, err = destDir.Exists(); err != nil {
return
}
if destExists {
if b, err = os.ReadFile(tplSpec.Dest); err != nil {
return
}
}
if !destExists || !bytes.Equal(b, tplBuf.Bytes()) {
// Doesn't exist or it's a mismatch.
if !dirExists {
// Parent doesn't exist.
if err = destDir.MkdirAllMode(*tplSpec.Perms.ParentDir.Mode); err != nil {
return
}
}
if err = os.WriteFile(tplSpec.Dest, tplBuf.Bytes(), *tplSpec.Perms.File.Mode); err != nil {
return
}
res.Changed = true
tplChanged = true
}
// This is safe to blindly do, as "no-change" support is cooked in.
if err = tplSpec.Perms.Chown(destPath.String()); err != nil {
return
}
if err = tplSpec.Perms.Chmod(destPath.String(), !destExists); err != nil {
return
}
if err = tplSpec.Perms.Chown(destDir.String()); err != nil {
return
}
if err = tplSpec.Perms.Chmod(destDir.String(), !dirExists); err != nil {
return
}
if tplSpec.Cmds != nil && len(tplSpec.Cmds) > 0 {
log.Debug(
"runner.run: Running %d commands for template %d ('%s') for tunnel %d.",
len(tplSpec.Cmds), tplIdx, tplSpec.Template, t.TunnelID,
)
for cmdIdx, cmdSpec := range tplSpec.Cmds {
if cmdSpec == nil {
continue
}
log.Debug(
"runner.run: Command %d for template %d ('%s') for tunnel %d",
cmdIdx, tplIdx, tplSpec.Template, t.TunnelID,
)
if cmdSpec.OnChanges == nil || *cmdSpec.OnChanges == tplChanged {
if cmd, err = cmdSpec.ToCmd(&res); err != nil {
return
}
cmdId = uuid.New()
stdout.Reset()
stderr.Reset()
cmd.Stdout = stdout
cmd.Stderr = stderr
log.Debug("runner.run: Tunnel %d, Template %d '%s': Command '%s': %s", t.TunnelID, tplIdx, tplSpec.Template, cmdId.String(), cmd.String())
if err = cmd.Run(); err != nil {
return
}
if stdout.Len() > 0 {
log.Debug("runner.run: Command '%s' STDOUT:\n%s", cmdId.String(), stdout.String())
}
if stderr.Len() > 0 {
log.Err("runner.run: Command '%s' STDERR:\n%s", cmdId.String(), stderr.String())
}
}
}
}
}
}
if t.Cmds != nil && len(t.Cmds) > 0 {
log.Debug("runner.run: Running %d commands for tunnel %d.", len(t.Cmds), t.TunnelID)
for _, cmdSpec := range t.Cmds {
if cmdSpec == nil {
continue
}
if cmdSpec.OnChanges == nil || *cmdSpec.OnChanges == res.Changed {
if cmd, err = cmdSpec.ToCmd(&res); err != nil {
return
}
cmdId = uuid.New()
stdout.Reset()
stderr.Reset()
cmd.Stdout = stdout
cmd.Stderr = stderr
log.Debug("runner.run: Tunnel %d: Command '%s': %s", t.TunnelID, cmdId.String(), cmd.String())
if err = cmd.Run(); err != nil {
return
}
if stdout.Len() > 0 {
log.Debug("runner.run: Command '%s' STDOUT:\n%s", cmdId.String(), stdout.String())
}
if stderr.Len() > 0 {
log.Err("runner.run: Command '%s' STDERR:\n%s", cmdId.String(), stderr.String())
}
}
}
}
result = &res
log.Debug("runner.run: Finished tunnel %d successfully.", t.TunnelID)
return
}
// runAsync is intended to be used with goroutines. This is used if conf.Config.SingleTunnel is false.
func runAsync(t *conf.Tunnel, wg *sync.WaitGroup, tunChan chan *TunnelResult, log logging.Logger, errChan chan error) {
var err error
var result *TunnelResult
defer wg.Done()
if result, err = run(t, log); err != nil {
errChan <- err
return
}
tunChan <- result
return
}