Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions agent_runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"log"
"path"
"sync"
"time"

"flashcat.cloud/categraf/agent"
"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/pkg/reloadwatcher"
"flashcat.cloud/categraf/writer"
)

const configReloadDebounce = time.Second

type managedAgent interface {
Start()
Stop()
}

type configWatcher interface {
Close() error
}

type agentRuntimeDeps struct {
loadConfig func() (*config.ConfigType, error)
buildWriters func(*config.ConfigType) (map[string]writer.Writer, error)
applyConfig func(*config.ConfigType)
applyWriters func(map[string]writer.Writer)
newAgent func() (managedAgent, error)
initLog func(string)
startWatcher func(string, time.Duration, func()) (configWatcher, error)
}

type agentRuntime struct {
mu sync.Mutex
agent managedAgent
watcher configWatcher
deps agentRuntimeDeps
}

func newDefaultAgentRuntime(ag *agent.Agent, configDir string, debugLevel int, debugMode, testMode bool, interval int64, inputFilters string) *agentRuntime {
return &agentRuntime{
agent: ag,
deps: agentRuntimeDeps{
loadConfig: func() (*config.ConfigType, error) {
return config.LoadConfig(configDir, debugLevel, debugMode, testMode, interval, inputFilters)
},
buildWriters: func(conf *config.ConfigType) (map[string]writer.Writer, error) {
return writer.BuildWriters(conf.Writers)
},
applyConfig: func(conf *config.ConfigType) {
config.Config = conf
},
applyWriters: writer.ApplyWriters,
newAgent: func() (managedAgent, error) {
return agent.NewAgent()
},
initLog: initLog,
startWatcher: func(target string, debounce time.Duration, onChange func()) (configWatcher, error) {
return reloadwatcher.Start(target, debounce, onChange)
},
},
}
}

func (rt *agentRuntime) Start() {
rt.mu.Lock()
defer rt.mu.Unlock()

rt.agent.Start()
rt.reconcileWatcherLocked()
}

func (rt *agentRuntime) Stop() {
rt.mu.Lock()
defer rt.mu.Unlock()

rt.stopWatcherLocked()
rt.agent.Stop()
}

func (rt *agentRuntime) Reload(reason string) error {
return rt.reload(reason)
}

func (rt *agentRuntime) reload(reason string) error {
rt.mu.Lock()
defer rt.mu.Unlock()

log.Println("I! agent runtime reloading:", reason)
nextConfig, err := rt.deps.loadConfig()
if err != nil {
return err
}

nextWriters, err := rt.deps.buildWriters(nextConfig)
if err != nil {
return err
}

currentConfig := config.Config
currentAgent := rt.agent
rt.deps.applyConfig(nextConfig)

nextAgent, err := rt.deps.newAgent()
if err != nil {
rt.deps.applyConfig(currentConfig)
return err
}

currentAgent.Stop()
rt.deps.applyWriters(nextWriters)
rt.agent = nextAgent
rt.deps.initLog(nextConfig.Log.FileName)
rt.agent.Start()
rt.reconcileWatcherLocked()
Comment on lines +103 to +118
log.Println("I! agent runtime reloaded:", reason)
return nil
}

func (rt *agentRuntime) reconcileWatcherLocked() {
if config.Config == nil || !config.Config.Global.ReloadOnChange {
rt.stopWatcherLocked()
return
}
if rt.watcher != nil {
return
}

target := path.Join(config.Config.ConfigDir, "config.toml")
watcher, err := rt.deps.startWatcher(target, configReloadDebounce, func() {
if err := rt.Reload("config.toml changed"); err != nil {
log.Println("E! reload config.toml failed:", err)
}
})
if err != nil {
log.Println("E! watch config.toml failed:", err)
return
}
rt.watcher = watcher
log.Println("I! watching config.toml for changes:", target)
}

func (rt *agentRuntime) stopWatcherLocked() {
if rt.watcher == nil {
return
}
if err := rt.watcher.Close(); err != nil {
log.Println("E! stop config.toml watcher failed:", err)
}
rt.watcher = nil
}
136 changes: 136 additions & 0 deletions agent_runtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package main

import (
"errors"
"testing"

"flashcat.cloud/categraf/config"
"flashcat.cloud/categraf/writer"
)

type fakeRuntimeAgent struct {
starts int
stops int
}

func (a *fakeRuntimeAgent) Start() {
a.starts++
}

func (a *fakeRuntimeAgent) Stop() {
a.stops++
}

func TestAgentRuntimeReloadStartsNewAgentAfterSuccessfulConfigLoad(t *testing.T) {
oldConfig := config.Config
defer func() {
config.Config = oldConfig
}()

currentConfig := &config.ConfigType{Log: config.Log{FileName: "stdout"}}
nextConfig := &config.ConfigType{Log: config.Log{FileName: "stderr"}}
config.Config = currentConfig

oldAgent := &fakeRuntimeAgent{}
nextAgent := &fakeRuntimeAgent{}
initLogFile := ""
appliedWriters := false

rt := &agentRuntime{
agent: oldAgent,
deps: agentRuntimeDeps{
loadConfig: func() (*config.ConfigType, error) {
return nextConfig, nil
},
buildWriters: func(*config.ConfigType) (map[string]writer.Writer, error) {
return map[string]writer.Writer{}, nil
},
applyConfig: func(c *config.ConfigType) {
config.Config = c
},
applyWriters: func(map[string]writer.Writer) {
appliedWriters = true
},
newAgent: func() (managedAgent, error) {
return nextAgent, nil
},
initLog: func(file string) {
initLogFile = file
},
},
}

if err := rt.reload("test"); err != nil {
t.Fatalf("reload error = %v", err)
}
if oldAgent.stops != 1 {
t.Fatalf("old agent stops = %d, want 1", oldAgent.stops)
}
if nextAgent.starts != 1 {
t.Fatalf("next agent starts = %d, want 1", nextAgent.starts)
}
if rt.agent != nextAgent {
t.Fatal("runtime did not install next agent")
}
if config.Config != nextConfig {
t.Fatal("runtime did not install next config")
}
if !appliedWriters {
t.Fatal("runtime did not apply writers")
}
if initLogFile != "stderr" {
t.Fatalf("initLog file = %q, want stderr", initLogFile)
}
}

func TestAgentRuntimeReloadKeepsOldAgentAndConfigWhenNewAgentFails(t *testing.T) {
oldConfig := config.Config
defer func() {
config.Config = oldConfig
}()

currentConfig := &config.ConfigType{Log: config.Log{FileName: "stdout"}}
nextConfig := &config.ConfigType{Log: config.Log{FileName: "stderr"}}
config.Config = currentConfig

oldAgent := &fakeRuntimeAgent{}
appliedWriters := false

rt := &agentRuntime{
agent: oldAgent,
deps: agentRuntimeDeps{
loadConfig: func() (*config.ConfigType, error) {
return nextConfig, nil
},
buildWriters: func(*config.ConfigType) (map[string]writer.Writer, error) {
return map[string]writer.Writer{}, nil
},
applyConfig: func(c *config.ConfigType) {
config.Config = c
},
applyWriters: func(map[string]writer.Writer) {
appliedWriters = true
},
newAgent: func() (managedAgent, error) {
return nil, errors.New("new agent failed")
},
initLog: func(string) {},
},
}

if err := rt.reload("test"); err == nil {
t.Fatal("reload error = nil, want error")
}
if oldAgent.stops != 0 {
t.Fatalf("old agent stops = %d, want 0", oldAgent.stops)
}
if rt.agent != oldAgent {
t.Fatal("runtime replaced old agent after failed reload")
}
if config.Config != currentConfig {
t.Fatal("runtime did not restore current config after failed reload")
}
if appliedWriters {
t.Fatal("runtime applied writers after failed reload")
}
}
3 changes: 3 additions & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# whether print configs
print_configs = false

# reload config.toml and restart agent modules when config.toml changes
reload_on_change = false

# add label(agent_hostname) to series
# "" -> auto detect hostname
# "xx" -> use specified string xx
Expand Down
Loading
Loading