見出し画像

FluentbitとTWSNMP FCの連携開発3日目:FluentbitのInputプラグインが何となく動いて嬉しい

今朝は3時半から開発開始です。助手の猫さんは4時過ぎに起きてきましたが、ご飯を食べた後寝ています。

さて、昨日、TWSNMP FCの質問に回答したのでFluentbitの連携の開発はお休みしました。今朝は続きです。
いよいよ、Fluentbitのプラグインの開発です。

にあるサンプルを参考にしてInput Pluginを作っています。
2日前に作ったTWSNMP FC側のSSHサーバーに接続してログを受信する
プログラムです。

package main

/*
#include <stdlib.h>
*/
import "C"
import (
	"encoding/json"
	"fmt"
	"net"
	"os"
	"strconv"
	"strings"
	"time"
	"unsafe"

	"github.com/fluent/fluent-bit-go/input"
	"golang.org/x/crypto/ssh"
)

// Params
var twsnmp = ""
var privateKeyPath = ""
var hostKeyPath = ""
var logType = "syslog"

// local vars
var lastTime int64
var lastHostKey = ""

type logEnt struct {
	TimeStamp int64
	Data      *map[string]interface{}
}

var logList = []*logEnt{}

//export FLBPluginRegister
func FLBPluginRegister(def unsafe.Pointer) int {
	return input.FLBPluginRegister(def, "twsnmp", "Input plugin for TWSNMP")
}

// (fluentbit will call this)
// plugin (context) pointer to fluentbit context (state/ c code)
//
//export FLBPluginInit
func FLBPluginInit(plugin unsafe.Pointer) int {
	twsnmp = input.FLBPluginConfigKey(plugin, "twsnmp")
	privateKeyPath = input.FLBPluginConfigKey(plugin, "private_key")
	hostKeyPath = input.FLBPluginConfigKey(plugin, "host_key")
	logType = input.FLBPluginConfigKey(plugin, "log_type")
	if p := input.FLBPluginConfigKey(plugin, "send_all"); strings.ToLower(p) != "true" {
		lastTime = time.Now().UnixNano()
	}
	fmt.Printf("[in_twsnmp] twsnmp= '%s'\n", twsnmp)
	fmt.Printf("[in_twsnmp] privateKeyPath= '%s'\n", privateKeyPath)
	fmt.Printf("[in_twsnmp] hostKeyPath= '%s'\n", hostKeyPath)
	fmt.Printf("[in_twsnmp] logType= '%s'\n", logType)
	fmt.Printf("[in_twsnmp] lastTime = '%s'\n", time.Unix(0, lastTime).Format(time.RFC3339))
	if err := getTWSNMPLogs(); err != nil {
		fmt.Println(err)
		return input.FLB_ERROR
	}
	return input.FLB_OK
}

//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, size *C.size_t) int {
	if len(logList) < 1 {
		if err := getTWSNMPLogs(); err != nil || len(logList) < 1 {
			time.Sleep(time.Second * 5)
			fmt.Println("no data retry")
			return input.FLB_RETRY
		}
	}
	l := logList[0]
	logList = logList[1:]
	flb_time := input.FLBTime{time.Unix(0, l.TimeStamp)}

	entry := []interface{}{flb_time, l.Data}
	enc := input.NewEncoder()
	packed, err := enc.Encode(entry)
	if err != nil {
		fmt.Println(err)
		return input.FLB_ERROR
	}

	length := len(packed)
	*data = C.CBytes(packed)
	*size = C.size_t(length)
	return input.FLB_OK
}

//export FLBPluginInputCleanupCallback
func FLBPluginInputCleanupCallback(data unsafe.Pointer) int {
	return input.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
	return input.FLB_OK
}

func getTWSNMPLogs() error {
	privateKey, err := os.ReadFile(privateKeyPath)
	if err != nil {
		return err
	}
	signer, err := ssh.ParsePrivateKey(privateKey)
	if err != nil {
		return err
	}
	sshConfig := &ssh.ClientConfig{
		User:    "twsnmp",
		Auth:    []ssh.AuthMethod{},
		Timeout: time.Second * 1,
	}
	sshConfig.Auth = append(sshConfig.Auth, ssh.PublicKeys(signer))
	if hostKeyPath != "" {
		hostKey, err := os.ReadFile(hostKeyPath)
		if err != nil {
			return err
		}
		pubkey, _, _, _, err := ssh.ParseAuthorizedKey([]byte(hostKey))
		if err != nil {
			return err
		}
		sshConfig.HostKeyCallback = ssh.FixedHostKey(pubkey)
	} else {
		sshConfig.HostKeyCallback =
			func(hostname string, remote net.Addr, key ssh.PublicKey) error {
				newKey := strings.TrimSpace(string(ssh.MarshalAuthorizedKey(key)))
				if lastHostKey == "" {
					lastHostKey = newKey
				}
				if lastHostKey != newKey {
					return fmt.Errorf("host key changed")
				}
				return nil
			}
	}
	client, err := ssh.Dial("tcp", twsnmp, sshConfig)
	if err != nil {
		return err
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return err
	}
	defer session.Close()

	out, err := session.Output(fmt.Sprintf("get %s %d 1000", logType, lastTime))
	if err != nil {
		return err
	}
	for _, l := range strings.Split(string(out), "\n") {
		l = strings.TrimSpace(l)
		a := strings.SplitN(l, "\t", 2)
		if len(a) != 2 {
			continue
		}
		t, err := strconv.ParseInt(a[0], 10, 64)
		if err != nil {
			continue
		}
		d := new(map[string]interface{})
		err = json.Unmarshal([]byte(a[1]), d)
		if err != nil {
			continue
		}
		e := &logEnt{
			TimeStamp: t,
			Data:      d,
		}
		logList = append(logList, e)
		lastTime = t + 1
	}
	return nil
}

func main() {
}

のように作って、試してみたら、なんとなく動作しているようです。

かなり嬉しいです。もう少しテストしてからOutputプラグインも作ってみようと思いますが、今朝は時間切れです。

明日に続く

開発のための諸経費(機材、Appleの開発者、サーバー運用)に利用します。 ソフトウェアのマニュアルをnoteの記事で提供しています。 サポートによりnoteの運営にも貢献できるのでよろしくお願います。