libs: Call Flush() before rename #2428 (#2439)

* fix Group.RotateFile need call Flush() before rename. #2428
* fix some review issue. #2428
 refactor Group's config: replace  setting member with initial option
* fix a handwriting mistake
* fix a time window error between rename and write.
* fix a syntax mistake.
* change option name Get_ to With_
* fix review issue
* fix review issue
This commit is contained in:
goolAdapter
2018-09-25 19:22:45 +08:00
committed by Alexander Simmerl
parent 587116dae1
commit 110b07fb3f
7 changed files with 146 additions and 61 deletions

View File

@@ -39,13 +39,12 @@ func main() {
}
// Open Group
group, err := auto.OpenGroup(headPath)
group, err := auto.OpenGroup(headPath, auto.GroupHeadSizeLimit(chopSize), auto.GroupTotalSizeLimit(limitSize))
if err != nil {
fmt.Printf("logjack couldn't create output file %v\n", headPath)
os.Exit(1)
}
group.SetHeadSizeLimit(chopSize)
group.SetTotalSizeLimit(limitSize)
err = group.Start()
if err != nil {
fmt.Printf("logjack couldn't start with file %v\n", headPath)

View File

@@ -19,10 +19,10 @@ import (
)
const (
groupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
defaultGroupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
)
/*
@@ -56,16 +56,17 @@ assuming that marker lines are written occasionally.
type Group struct {
cmn.BaseService
ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker
mtx sync.Mutex
headSizeLimit int64
totalSizeLimit int64
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker
mtx sync.Mutex
headSizeLimit int64
totalSizeLimit int64
groupCheckDuration time.Duration
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
@@ -73,7 +74,7 @@ type Group struct {
// OpenGroup creates a new Group with head at headPath. It returns an error if
// it fails to open head file.
func OpenGroup(headPath string) (g *Group, err error) {
func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
dir := path.Dir(headPath)
head, err := OpenAutoFile(headPath)
if err != nil {
@@ -81,15 +82,21 @@ func OpenGroup(headPath string) (g *Group, err error) {
}
g = &Group{
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
minIndex: 0,
maxIndex: 0,
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
}
for _, option := range groupOptions {
option(g)
}
g.BaseService = *cmn.NewBaseService(nil, "Group", g)
gInfo := g.readGroupInfo()
@@ -98,10 +105,31 @@ func OpenGroup(headPath string) (g *Group, err error) {
return
}
// GroupCheckDuration allows you to overwrite default groupCheckDuration.
func GroupCheckDuration(duration time.Duration) func(*Group) {
return func(g *Group) {
g.groupCheckDuration = duration
}
}
// GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func GroupHeadSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.headSizeLimit = limit
}
}
// GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
func GroupTotalSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.totalSizeLimit = limit
}
}
// OnStart implements Service by starting the goroutine that checks file and
// group limits.
func (g *Group) OnStart() error {
g.ticker = time.NewTicker(groupCheckDuration)
g.ticker = time.NewTicker(g.groupCheckDuration)
go g.processTicks()
return nil
}
@@ -122,13 +150,6 @@ func (g *Group) Close() {
g.mtx.Unlock()
}
// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func (g *Group) SetHeadSizeLimit(limit int64) {
g.mtx.Lock()
g.headSizeLimit = limit
g.mtx.Unlock()
}
// HeadSizeLimit returns the current head size limit.
func (g *Group) HeadSizeLimit() int64 {
g.mtx.Lock()
@@ -136,14 +157,6 @@ func (g *Group) HeadSizeLimit() int64 {
return g.headSizeLimit
}
// SetTotalSizeLimit allows you to overwrite default total size limit of the
// group - 1GB.
func (g *Group) SetTotalSizeLimit(limit int64) {
g.mtx.Lock()
g.totalSizeLimit = limit
g.mtx.Unlock()
}
// TotalSizeLimit returns total size limit of the group.
func (g *Group) TotalSizeLimit() int64 {
g.mtx.Lock()
@@ -266,6 +279,14 @@ func (g *Group) RotateFile() {
headPath := g.Head.Path
if err := g.headBuf.Flush(); err != nil {
panic(err) //panic is used for consistent with below
}
if err := g.Head.Sync(); err != nil {
panic(err)
}
if err := g.Head.closeFile(); err != nil {
panic(err)
}
@@ -275,6 +296,12 @@ func (g *Group) RotateFile() {
panic(err)
}
//make sure head file exist, there is a window time between rename and next write
//when NewReader(maxIndex), lead to "open /tmp/wal058868562/wal: no such file or directory"
if err := g.Head.openFile(); err != nil {
panic(err)
}
g.maxIndex++
}

View File

@@ -23,12 +23,10 @@ func createTestGroupWithHeadSizeLimit(t *testing.T, headSizeLimit int64) *Group
require.NoError(t, err, "Error creating dir")
headPath := testDir + "/myfile"
g, err := OpenGroup(headPath)
g, err := OpenGroup(headPath, GroupHeadSizeLimit(headSizeLimit))
require.NoError(t, err, "Error opening Group")
require.NotEqual(t, nil, g, "Failed to create Group")
g.SetHeadSizeLimit(headSizeLimit)
return g
}