Cron jobs with ticker&waitgroup in GoLang
I wrote an simple cron job and inserted it to agent module of Telegraf. Here is the code.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package cron
import (
"context"
"runtime"
"sync"
"time"
"github.com/influxdata/telegraf/testutil"
"github.com/influxdata/telegraf/internal/config"
)
var wg sync.WaitGroup
var logger = testutil.Logger{Name: "cron"}
type JobFunc func(*config.Config) error
var intervalMap = map[string]int{
"Job1": 600,
"Job2": 60,
}
func StartCron(ctx context.Context, config *config.Config) {
if config.Agent.EnvTag == "product" {
logger.Infof("")
}
wg.Add(1)
go RunJob("Job1", job1, ctx, config)
if runtime.GOOS != "windows" {
wg.Add(1)
go RunJob("Job2", job2, ctx, config)
}
wg.Wait()
logger.Info("All cron jobs stopped gracefully.")
}
func RunJob(name string, fn JobFunc, ctx context.Context, c *config.Config) {
interval := intervalMap[name]
logger.Infof("Initialized cron job: %s. Interval: %d.\n", name, interval)
ticker := time.NewTicker(time.Duration(interval) * time.Second)
defer func() {
ticker.Stop()
wg.Done()
logger.Infof("Job `%s` ticker/wg stopped.\n", name)
}()
for {
select {
case <-ctx.Done():
logger.Infof("Job `%s` recived `ctx.Done` signal.\n", name)
return
case <-ticker.C:
err := fn(c)
if err != nil {
logger.Errorf("Job `%s` threw error: %s", name, err)
}
}
}
}
And in github.com/influxdata/telegraf/agent.go
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (a *Agent) Run(ctx context.Context) error {
// ...
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
cron.StartCron(ctx, a.Config)
}()
// ...
wg.wait()
// ...
}
The outter layered WaitGroup
is necessary that give StartCron
goroutine time to wait its jobs. With these code, the jobs can be gracefully shutdown and restarted along with agent itself.
This post is licensed under CC BY 4.0 by the author.