Telegraf Input插件开发
自定义插件开发
Telegraf 是 InfluxData 的一款开源的采集器,Telegraf 是面向 InfluxDB 生态的,采集的监控数据推给 InfluxDB 非常合适。
Telegraf 的采集 CPU、内存、中间件等数据是通过 Input 插件完成的,推送数据给 Prometheus、Victoriametrics、Thanos 这些时序库是通过 Output 插件完成的。
开发说明
此处自定义开发 input 插件 demo,就是开发采集插件
首先要先从社区拉下 telegraf 代码
git clone git@github.com:influxdata/telegraf.git
通过 ls 命令查看,有一个 plugins/ 文件夹里面有一个 inputs/ 文件夹,开发任务主要在 inputs 里完成,并且里面有许多已经完成的输入插件。
# ls
-rw-r--r-- 1 54225 UsersGrp 430278 Jul 12 14:10 CHANGELOG.md
-rw-r--r-- 1 54225 UsersGrp 3059 Jul 12 14:10 CODE_OF_CONDUCT.md
-rw-r--r-- 1 54225 UsersGrp 5934 Jul 12 14:10 CONTRIBUTING.md
-rw-r--r-- 1 54225 UsersGrp 5330 Jul 12 14:10 EXTERNAL_PLUGINS.md
-rw-r--r-- 1 54225 UsersGrp 1088 Jul 12 14:10 LICENSE
-rw-r--r-- 1 54225 UsersGrp 16203 Jul 12 14:10 Makefile
-rw-r--r-- 1 54225 UsersGrp 6794 Jul 12 14:10 README.md
-rw-r--r-- 1 54225 UsersGrp 524 Jul 12 14:10 SECURITY.md
-rw-r--r-- 1 54225 UsersGrp 2650 Jul 12 14:10 accumulator.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 agent
-rw-r--r-- 1 54225 UsersGrp 530 Jul 12 14:10 aggregator.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 assets
-rw-r--r-- 1 54225 UsersGrp 6 Jul 12 14:10 build_version.txt
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 cmd
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 config
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 docs
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 etc
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 filter
-rw-r--r-- 1 54225 UsersGrp 24718 Jul 12 14:10 go.mod
-rw-r--r-- 1 54225 UsersGrp 216681 Jul 12 14:10 go.sum
-rw-r--r-- 1 54225 UsersGrp 463 Jul 12 14:10 info.plist
-rw-r--r-- 1 54225 UsersGrp 656 Jul 12 14:10 input.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 internal
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 logger
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 metric
-rw-r--r-- 1 54225 UsersGrp 3839 Jul 12 14:10 metric.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 migrations
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 models
-rw-r--r-- 1 54225 UsersGrp 1001 Jul 12 14:10 output.go
-rw-r--r-- 1 54225 UsersGrp 1160 Jul 12 14:10 parser.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 persister
-rw-r--r-- 1 54225 UsersGrp 3594 Jul 12 14:10 plugin.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 plugins
-rw-r--r-- 1 54225 UsersGrp 1822 Jul 12 14:10 processor.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 scripts
-rw-r--r-- 1 54225 UsersGrp 797 Jul 12 14:10 secretstore.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 selfstat
-rw-r--r-- 1 54225 UsersGrp 1071 Jul 12 14:10 serializer.go
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 testutil
drwxr-xr-x 1 54225 UsersGrp 0 Jul 12 14:10 tools
按照开发文档的格式进行开发 myexample 插件
- 需要在 inputs 文件夹下创建一个 myexample 插件文件夹
- 文件夹下需要最基本的三个文件,README.md、myexample.go、sample.conf
- 需要在
github.com/influxdata/telegraf/plugins/inputs/all
文件夹下创建一个跟插件同名的文件进行注册操作 - myexample.go 里必须要有 SampleConfig、Gather、Init 方法,必须要有 init 初始化函数
- 该有的代码生成器不能少
开发内容
此处开发一个 example 的插件,使插件输入为:
在 inputs 文件夹下创建 myexample 文件夹,创建 myexample.go
//go:generate ../../../tools/readme_config_includer/generator
package myexample
import (
_ "embed"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
// 此处的代码生成器意思是启动是把 sample.conf 里的配置加载到代码里,所以上面的代码生成器不能丢。
//go:embed sample.conf
var sampleConfig string
// 此处是 toml 格式,对应的是 sample.conf 里的 toml 格式,注意因为要被 all 里注册,所以结构体都需要大写。
type Simple struct {
MyName string `toml:"myname"`
Age int `toml:"age"`
Ok bool `toml:"ok"`
Log telegraf.Logger `toml:"-"`
}
func (*Simple) SampleConfig() string {
return sampleConfig
}
// Init is for setup, and validating config.
func (s *Simple) Init() error {
return nil
}
// Gather 是展示输入的具体信息
// 如果想开发完采集器后把数据发送给时序数据库需要注意,值不能是 string 或者 bool
// 需要注意的是 acc 有多种监控指标类型 AddGauge、AddCounter 需要根据实际情况使用
func (s *Simple) Gather(acc telegraf.Accumulator) error {
if s.Ok {
acc.AddFields("myexample", map[string]interface{}{"姓名": s.MyName}, nil)
} else {
acc.AddFields("myexample", map[string]interface{}{"年龄": s.Age}, nil)
}
return nil
}
// init 函数是固定格式,目的是注册插件,add 里面名字与结构体可根据详细情况修改
func init() {
inputs.Add("myexample", func() telegraf.Input { return &Simple{} })
}
上面代码注册了配置文件 sample.conf 里的结构,所以需要创建 sample.conf 文件并填入 toml 格式文件
# cat sample.conf
[[inputs.myexample]]
ok = true
myname = "季星星"
age = 18
必须要有三个文件,所以要创建 README.md,可以什么都不写
在github.com/influxdata/telegraf/plugins/inputs/all
下注册插件,创建 myexample.go,代码生成器不能掉
//go:build !custom || inputs || inputs.myexample
package all
import _ "github.com/influxdata/telegraf/plugins/inputs/myexample" // register plugin
编写完成后到根目录执行 make all
构建项目
[root@localhost telegraf]# make all
fatal: Not a git repository (or any of the parent directories): .git
fatal: Not a git repository (or any of the parent directories): .git
go mod download -x
env -u GOOS -u GOARCH -u GOARM -- go build -o ./tools/custom_builder/custom_builder ./tools/custom_builder
env -u GOOS -u GOARCH -u GOARM -- go build -o ./tools/license_checker/license_checker ./tools/license_checker
env -u GOOS -u GOARCH -u GOARM -- go build -o ./tools/readme_config_includer/generator ./tools/readme_config_includer/generator.go
env -u GOOS -u GOARCH -u GOARM -- go build -o ./tools/config_includer/generator ./tools/config_includer/generator.go
env -u GOOS -u GOARCH -u GOARM -- go build -o ./tools/readme_linter/readme_linter ./tools/readme_linter
go generate -run="tools/config_includer/generator" ./plugins/inputs/...
go generate -run="tools/readme_config_includer/generator" ./plugins/inputs/...
go generate -run="tools/config_includer/generator" ./plugins/outputs/...
go generate -run="tools/readme_config_includer/generator" ./plugins/outputs/...
go generate -run="tools/config_includer/generator" ./plugins/processors/...
go generate -run="tools/readme_config_includer/generator" ./plugins/processors/...
go generate -run="tools/config_includer/generator" ./plugins/aggregators/...
go generate -run="tools/readme_config_includer/generator" ./plugins/aggregators/...
go generate -run="tools/config_includer/generator" ./plugins/secretstores/...
go generate -run="tools/readme_config_includer/generator" ./plugins/secretstores/...
CGO_ENABLED=0 go build -tags "" -ldflags " -X github.com/influxdata/telegraf/internal.Commit= -X github.com/influxdata/telegraf/internal.Branch= -X github.com/influxdata/telegraf/internal.Version=1.28.0-" ./cmd/telegraf
构建完成后通过--input-list
查看
[root@localhost telegraf]# ./telegraf --input-list |grep myex
myexample
通过 config 命令生成 conf 文件,此时打开文件 conf 文件发现存在插件选项,可以添加参数也可以不加,参数就是开发时的结构体
[root@localhost telegraf]# ./telegraf config > telegraf.conf
[root@localhost telegraf]# more telegraf.conf | grep myexamp
[[inputs.myexample]]
通过 --test 执行测试,查看结果
[root@localhost telegraf]# ./telegraf --config telegraf.conf --input-filter myexample --output-filter influxdb --test
2023-07-12T07:04:18Z I! Loading config: telegraf.conf
2023-07-12T07:04:18Z I! Starting Telegraf 1.28.0-
2023-07-12T07:04:18Z I! Available plugins: 238 inputs, 9 aggregators, 28 processors, 24 parsers, 59 outputs, 4 secret-stores
2023-07-12T07:04:18Z I! Loaded inputs: myexample
2023-07-12T07:04:18Z I! Loaded aggregators:
2023-07-12T07:04:18Z I! Loaded processors:
2023-07-12T07:04:18Z I! Loaded secretstores:
2023-07-12T07:04:18Z W! Outputs are not used in testing mode!
2023-07-12T07:04:18Z I! Tags enabled: host=localhost.localdomain
> myexample,host=localhost.localdomain 姓名="季星星" 1689145458000000000
FAQ
修改代码追加全局 tag,config 文件夹下 config.go 文件最后添加两个函数,LoadConfigData
方法中追加 c.Tags["instancd_id"] = instanceId
// LoadConfigData loads TOML-formatted config data
func (c *Config) LoadConfigData(data []byte) error {
...
if !c.Agent.OmitHostname {
if c.Agent.Hostname == "" {
hostname, err := os.Hostname()
if err != nil {
return err
}
c.Agent.Hostname = hostname
}
c.Tags["host"] = c.Agent.Hostname
}
if checkPath() {
instanceId, err := getInstanceId()
if err != nil {
log.Printf("Error getting InstanceId from Hosts %d", err)
}
c.Tags["instance_id"] = instanceId
}
...
}
func checkPath() bool {
instance := "/var/lib/cloud/instance"
instances := "/var/lib/cloud/instances"
// 检查文件或目录是否存在
status := 0
if _, err := os.Stat(instance); !os.IsNotExist(err) {
status += 1
}
if _, err := os.Stat(instances); !os.IsNotExist(err) {
status += 1
}
if status != 2 {
return false
}
return true
}
func getInstanceId() (string, error) {
cmdStr := "ls -l /var/lib/cloud/instance | awk -F '/var/lib/cloud/instances/' '{print $2}'"
// 使用bash -c执行命令字符串
cmd := exec.Command("bash", "-c", cmdStr)
// 执行命令并获取输出结果
output, err := cmd.Output()
if err != nil {
return "", err
}
trimmedOutput := strings.TrimRight(string(output), "\n")
return trimmedOutput, nil
}