diff --git a/output/file-transport/config.go b/output/file-transport/config.go deleted file mode 100644 index c494cfef..00000000 --- a/output/file-transport/config.go +++ /dev/null @@ -1,16 +0,0 @@ -package file_transport - -//Config filelog-Transporter所需配置 -type Config struct { - Dir string - File string - Expire int - Period LogPeriod -} - -func (c *Config) IsUpdate(cfg *Config) bool { - if cfg.File != c.File || cfg.Dir != c.Dir || cfg.Period != c.Period || cfg.Expire != c.Expire { - return true - } - return false -} diff --git a/output/file-transport/file.go b/output/file-transport/file.go deleted file mode 100644 index 823227e7..00000000 --- a/output/file-transport/file.go +++ /dev/null @@ -1,49 +0,0 @@ -package file_transport - -import ( - "fmt" - "os" - "path/filepath" - "time" -) - -type FileController struct { - expire time.Duration - dir string - file string - period LogPeriod -} - -func (w *FileController) timeTag(t time.Time) string { - - tag := t.Format(w.period.FormatLayout()) - - return filepath.Join(w.dir, fmt.Sprintf("%s-%s.log", w.file, tag)) -} -func (w *FileController) fileName() string { - return filepath.Join(w.dir, fmt.Sprintf("%s.log", w.file)) -} -func (w *FileController) history(history string) { - - path := w.fileName() - os.Rename(path, history) - -} - -func (w *FileController) dropHistory() { - - expireTime := time.Now().Add(-w.expire) - pathPatten := filepath.Join(w.dir, fmt.Sprintf("%s-*", w.file)) - files, err := filepath.Glob(pathPatten) - if err == nil { - for _, f := range files { - if info, e := os.Stat(f); e == nil { - - if expireTime.After(info.ModTime()) { - _ = os.Remove(f) - } - } - - } - } -} diff --git a/output/file-transport/period.go b/output/file-transport/period.go deleted file mode 100644 index 61c872cb..00000000 --- a/output/file-transport/period.go +++ /dev/null @@ -1,71 +0,0 @@ -package file_transport - -import ( - "strings" -) - -//LogPeriod 日志周期 -type LogPeriod interface { - String() string - FormatLayout() string -} - -//LogPeriodType 日志周期类型 -type LogPeriodType int - -//ParsePeriod 解析周期 -func ParsePeriod(v string) LogPeriod { - switch strings.ToLower(v) { - //case "month": - // return PeriodMonth, nil - case "day": - return PeriodDay - case "hour": - return PeriodHour - default: - return PeriodDay - } - - //return nil, fmt.Errorf("not a valid period: %q", v) -} -func (period LogPeriodType) String() string { - switch period { - //case PeriodMonth: - // return "month" - case PeriodDay: - return "day" - case PeriodHour: - return "hour" - default: - return "unknown" - } -} - -const ( - //PeriodMonth 月 - PeriodMonth LogPeriodType = iota - //PeriodDay 日 - PeriodDay - //PeriodHour 时 - PeriodHour -) - -//FormatLayout 格式化 -func (period LogPeriodType) FormatLayout() string { - switch period { - case PeriodHour: - { - return "2006-01-02-15" - } - case PeriodDay: - { - return "2006-01-02" - } - case PeriodMonth: - { - return "2006-01" - } - default: - return "2006-01-02-15" - } -} diff --git a/output/file-transport/writer.go b/output/file-transport/writer.go deleted file mode 100644 index 21415e88..00000000 --- a/output/file-transport/writer.go +++ /dev/null @@ -1,241 +0,0 @@ -package file_transport - -import ( - "bufio" - "bytes" - "context" - "github.com/eolinker/eosc/log" - "os" - "sync" - "time" -) - -// MaxBuffer buffer最大值 -const MaxBuffer = 1024 * 500 - -var ( - bufferPool = &sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, - } -) - -// FileWriterByPeriod 文件周期写入 -type FileWriterByPeriod struct { - wC chan *bytes.Buffer - - enable bool - cancelFunc context.CancelFunc - locker sync.RWMutex - wg sync.WaitGroup - resetChan chan FileController -} - -// NewFileWriteByPeriod 获取新的FileWriterByPeriod -func NewFileWriteByPeriod(cfg *Config) *FileWriterByPeriod { - w := &FileWriterByPeriod{ - locker: sync.RWMutex{}, - wg: sync.WaitGroup{}, - enable: false, - resetChan: make(chan FileController), - } - - w.Open(&FileController{ - dir: cfg.Dir, - file: cfg.File, - period: cfg.Period, - expire: time.Duration(cfg.Expire) * 24 * time.Hour, - }) - return w -} - -func (w *FileWriterByPeriod) Reset(cfg *Config) { - w.resetChan <- FileController{ - dir: cfg.Dir, - file: cfg.File, - period: cfg.Period, - expire: time.Duration(cfg.Expire) * 24 * time.Hour, - } -} - -// Open 打开 -func (w *FileWriterByPeriod) Open(config *FileController) { - w.locker.Lock() - defer w.locker.Unlock() - - if w.enable { - return - } - - ctx, cancel := context.WithCancel(context.Background()) - w.cancelFunc = cancel - w.wC = make(chan *bytes.Buffer, 100) - - w.enable = true - go func() { - w.wg.Add(1) - w.do(ctx, config) - w.wg.Done() - }() -} - -// Close 关闭 -func (w *FileWriterByPeriod) Close() { - - isClose := false - w.locker.Lock() - if !w.enable { - w.locker.Unlock() - return - } - - if w.cancelFunc != nil { - isClose = true - w.cancelFunc() - w.cancelFunc = nil - } - w.enable = false - w.locker.Unlock() - if isClose { - w.wg.Wait() - } -} -func (w *FileWriterByPeriod) isEnable() bool { - w.locker.Lock() - defer w.locker.Unlock() - return w.enable -} -func (w *FileWriterByPeriod) Write(p []byte) (n int, err error) { - - l := len(p) - - if l == 0 { - return - } - if !w.isEnable() { - return l, nil - } - buffer := bufferPool.Get().(*bytes.Buffer) - buffer.Reset() - buffer.Write(p) - if p[l-1] != '\n' { - buffer.WriteByte('\n') - } - w.wC <- buffer - return l, nil -} - -func (w *FileWriterByPeriod) do(ctx context.Context, config *FileController) { - fileController := *config - fileController.initFile() - f, lastTag, e := fileController.openFile() - if e != nil { - log.Errorf("open log file:%s\n", e.Error()) - return - } - - buf := bufio.NewWriter(f) - t := time.NewTicker(time.Second * 5) - defer t.Stop() - tFlush := time.NewTimer(time.Second) - - resetFunc := func(controller FileController) { - if lastTag != fileController.timeTag(time.Now()) { - if buf.Buffered() > 0 { - buf.Flush() - tFlush.Reset(time.Second) - } - f.Close() - fileController.history(lastTag) - fnew, tag, err := fileController.openFile() - if err != nil { - return - } - lastTag = tag - f = fnew - buf.Reset(f) - - go fileController.dropHistory() - } - } - - for { - select { - case <-ctx.Done(): - { - for len(w.wC) > 0 { - p := <-w.wC - buf.Write(p.Bytes()) - bufferPool.Put(p) - } - buf.Flush() - f.Close() - t.Stop() - //w.wg.Done() - return - } - - case <-t.C: - { - - resetFunc(fileController) - - } - case <-tFlush.C: - { - if buf.Buffered() > 0 { - buf.Flush() - } - tFlush.Reset(time.Second) - } - case p := <-w.wC: - { - buf.Write(p.Bytes()) - bufferPool.Put(p) - if buf.Buffered() > MaxBuffer { - buf.Flush() - } - tFlush.Reset(time.Second) - } - case controller, ok := <-w.resetChan: - { - if ok { - resetFunc(controller) - fileController = controller - } - } - } - } -} - -func (w *FileController) initFile() { - err := os.MkdirAll(w.dir, 0666) - if err != nil { - log.Error(err) - } - path := w.fileName() - nowHistoryName := w.timeTag(time.Now()) - if info, e := os.Stat(path); e == nil { - - timeTag := w.timeTag(info.ModTime()) - if timeTag != nowHistoryName { - w.history(timeTag) - } - } - - w.dropHistory() - -} - -func (w *FileController) openFile() (*os.File, string, error) { - path := w.fileName() - nowTag := w.timeTag(time.Now()) - f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) - - if err != nil { - return nil, "", err - } - return f, nowTag, err - -}