当今世界,任何计算机系统每天都会生成大量日志或数据。 随着系统的发展,将调试数据存储到数据库中是不可行的,因为它们是不可变的,仅用于分析和故障排除。 因此,组织倾向于将其存储在驻留在本地磁盘存储中的文件中。
我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,其中包含数百万行。
上代码…!
首先打开文件。 我们将对任何文件 io 使用标准的Go os.File。
f, err := os.Open(fileName)
defer file.Close() //Do not forget to close the file
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
打开文件后,我们有以下两个选项可继续进行
· 逐行读取文件,这有助于减少内存负担,但在IO中会花费更多时间。
· 立即将整个文件读取到内存中并处理该文件,这将消耗更多的内存,但会大大增加时间。
由于文件太大(即16 GB),因此无法将整个文件加载到内存中。 但是第一种选择对我们也不可行,因为我们想在几秒钟内处理文件。
但是你猜怎么着,还有第三种选择。 瞧…! 取而代之的是,使用Go中可用的bufio.NewReader()将整个文件加载到内存中。
r := bufio.NewReader(f)
for {
buf := make([] byte ,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err == nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}
一旦有了块,我们将派生一个线程,即Go例程,以与其他块同时处理每个块。 上面的代码将更改为-
//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err == nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
//process each chunk concurrently
//start -> log start time, end -> log end time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done()
}()
}
wg.Wait()
}
上面的代码引入了两个新的优化:
· sync.Pool是一个强大的实例池,可以重复使用这些实例以减轻垃圾收集器的压力。 我们将继续分配给各个片的内存。 它可以帮助我们减少内存消耗,并使我们的工作大大加快。
· Go例程有助于我们同时处理缓冲区块,从而显着提高了处理速度。
现在,我们实现ProcessChunk函数,该函数将处理日志行,其格式为
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,…\n
我们将根据命令行提供的时间戳提取日志。
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]
if len(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil
}
上面的代码使用16 GB的日志文件进行基准测试。
提取日志所需的时间约为25秒。
完整的工作代码如下。
您可以通过ohm.patel1997@gmail.com与我联系。
如有任何疑问和改进,。
(本文翻译自Ohm Patel的文章《Reading 16GB file in seconds, Golang》,参考: