From cfc1aed13a228e01386558a5d6511399e74ce5df Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Tue, 21 May 2019 13:56:28 -0700 Subject: [PATCH] add support for data waiters interface --- examples/datawaiters/main.go | 45 +++++++++++++++++ scoutfs.go | 95 +++++++++++++++++++++++++++++++++--- scoutfsdefs.go | 73 ++++++++++++++++++++++----- 3 files changed, 194 insertions(+), 19 deletions(-) create mode 100644 examples/datawaiters/main.go diff --git a/examples/datawaiters/main.go b/examples/datawaiters/main.go new file mode 100644 index 0000000..587e956 --- /dev/null +++ b/examples/datawaiters/main.go @@ -0,0 +1,45 @@ +// Copyright (c) 2018 Versity Software, Inc. +// +// Use of this source code is governed by a BSD-3-Clause license +// that can be found in the LICENSE file in the root of the source +// tree. + +package main + +import ( + "fmt" + "log" + "os" + "time" + + "github.com/versity/scoutfs-go" +) + +func main() { + if len(os.Args) != 2 || os.Args[1] == "-h" { + fmt.Fprintln(os.Stderr, "usage:", os.Args[0], "") + os.Exit(1) + } + + f, err := os.Open(os.Args[1]) + if err != nil { + log.Fatalf("Open %v: %v", os.Args[1], err) + } + defer f.Close() + + w := scoutfs.NewWaiters(f) + + for { + ents, err := w.Next() + if err != nil { + log.Fatalf("next(): %v", err) + } + if ents == nil { + time.Sleep(time.Second) + continue + } + for _, ent := range ents { + log.Printf("%+v", ent) + } + } +} diff --git a/scoutfs.go b/scoutfs.go index 240e119..db41597 100644 --- a/scoutfs.go +++ b/scoutfs.go @@ -34,7 +34,7 @@ type Time struct { Nsec uint32 } -// NewQuery creates a new scoutfs WalkHandle +// NewQuery creates a new scoutfs Query // Specify query type with By*() option // (only 1 allowed, last one wins) // and specify batching with WithBatchSize() @@ -95,16 +95,16 @@ func (q *Query) Next() ([]InodesEntry, error) { pack, err := query.pack() if err != nil { - return []InodesEntry{}, err + return nil, err } n, err := scoutfsctl(q.fsfd.Fd(), IOCQUERYINODES, uintptr(unsafe.Pointer(&pack))) if err != nil { - return []InodesEntry{}, err + return nil, err } if n == 0 { - return []InodesEntry{}, nil + return nil, nil } rbuf := bytes.NewReader(buf) @@ -116,15 +116,15 @@ func (q *Query) Next() ([]InodesEntry, error) { //unpacking each member individually err := binary.Read(rbuf, binary.LittleEndian, &e.Major) if err != nil { - return []InodesEntry{}, err + return nil, err } err = binary.Read(rbuf, binary.LittleEndian, &e.Minor) if err != nil { - return []InodesEntry{}, err + return nil, err } err = binary.Read(rbuf, binary.LittleEndian, &e.Ino) if err != nil { - return []InodesEntry{}, err + return nil, err } inodes = append(inodes, e) @@ -245,3 +245,84 @@ func FStageFile(f *os.File, version, offset uint64, b []byte) error { _, err := scoutfsctl(f.Fd(), IOCSTAGE, uintptr(unsafe.Pointer(&r))) return err } + +// Waiters to keep track of data waiters +type Waiters struct { + ino uint64 + iblock uint64 + batch uint16 + fsfd *os.File +} + +// NewWaiters creates a new scoutfs Waiters +// An open file within scoutfs is supplied for ioctls +// (usually just the base mount point directory) +func NewWaiters(f *os.File, opts ...WOption) *Waiters { + w := &Waiters{ + //default batch size is 128 + batch: 128, + fsfd: f, + } + + for _, opt := range opts { + opt(w) + } + + return w +} + +// WOption sets various options for NewWaiters +type WOption func(*Waiters) + +// WithWaitersCount sets the max number of inodes to be returned at a time +func WithWaitersCount(size uint16) WOption { + return func(w *Waiters) { + w.batch = size + } +} + +// Next gets the next batch of data waiters, returns nil, nil if no waiters +func (w *Waiters) Next() ([]DataWaitingEntry, error) { + buf := make([]byte, int(unsafe.Sizeof(DataWaitingEntry{}))*int(w.batch)) + dataWaiting := dataWaiting{ + afterIno: w.ino, + afterIblock: w.iblock, + entries: uintptr(unsafe.Pointer(&buf[0])), + count: w.batch, + } + + n, err := scoutfsctl(w.fsfd.Fd(), IOCDATAWAITING, uintptr(unsafe.Pointer(&dataWaiting))) + if err != nil { + return nil, err + } + + if n == 0 { + return nil, nil + } + + rbuf := bytes.NewReader(buf) + var inodes []DataWaitingEntry + + var e DataWaitingEntry + for i := 0; i < n; i++ { + err := binary.Read(rbuf, binary.LittleEndian, &e.Ino) + if err != nil { + return nil, err + } + err = binary.Read(rbuf, binary.LittleEndian, &e.Iblock) + if err != nil { + return nil, err + } + err = binary.Read(rbuf, binary.LittleEndian, &e.Op) + if err != nil { + return nil, err + } + + inodes = append(inodes, e) + } + + w.ino = inodes[n-1].Ino + w.iblock = inodes[n-1].Iblock + + return inodes, nil +} diff --git a/scoutfsdefs.go b/scoutfsdefs.go index 77be355..b3e6faa 100644 --- a/scoutfsdefs.go +++ b/scoutfsdefs.go @@ -13,24 +13,33 @@ import ( ) const ( - //IOCQUERYINODES scoutfs ioctl + // IOCQUERYINODES scoutfs ioctl IOCQUERYINODES = 0x40357301 - //IOCINOPATH scoutfs ioctl + // IOCINOPATH scoutfs ioctl IOCINOPATH = 0x40227302 - //IOCDATAVERSION scoutfs ioctl + // IOCDATAVERSION scoutfs ioctl IOCDATAVERSION = 0x40087304 - //IOCRELEASE scoutfs ioctl + // IOCRELEASE scoutfs ioctl IOCRELEASE = 0x40187305 - //IOCSTAGE scoutfs ioctl + // IOCSTAGE scoutfs ioctl IOCSTAGE = 0x401c7306 - //IOCSTATMORE scoutfs ioctl + // IOCSTATMORE scoutfs ioctl IOCSTATMORE = 0x40307307 + // IOCDATAWAITING scoutfs ioctl + IOCDATAWAITING = 0x40227309 - //QUERYINODESMETASEQ find inodes by metadata sequence + // QUERYINODESMETASEQ find inodes by metadata sequence QUERYINODESMETASEQ = '\u0000' - //QUERYINODESDATASEQ find inodes by data sequence + // QUERYINODESDATASEQ find inodes by data sequence QUERYINODESDATASEQ = '\u0001' + // DATAWAITOPREAD waiting operation read + DATAWAITOPREAD = 1 << 0 + // DATAWAITOPWRITE waiting operation write + DATAWAITOPWRITE = 1 << 1 + // DATAWAITOPCHANGESIZE waiting operation truncate + DATAWAITOPCHANGESIZE = 1 << 2 + pathmax = 1024 ) @@ -45,7 +54,7 @@ struct scoutfs_ioctl_walk_inodes_entry { }; */ -//InodesEntry is scoutfs entry for inode iteration +// InodesEntry is scoutfs entry for inode iteration type InodesEntry struct { Major uint64 Minor uint32 @@ -65,7 +74,7 @@ struct scoutfs_ioctl_walk_inodes { }; */ -//queryInodes is scoutfs request structure for IOCQUERYINODES +// queryInodes is scoutfs request structure for IOCQUERYINODES type queryInodes struct { first InodesEntry last InodesEntry @@ -74,8 +83,8 @@ type queryInodes struct { index uint8 } -//packed scoutfs_ioctl_walk_inodes requires packing queryInodes byhand -//the 53 size comes from pahole output above +// packed scoutfs_ioctl_walk_inodes requires packing queryInodes byhand +// the 53 size comes from pahole output above func (q queryInodes) pack() ([53]byte, error) { var pack [53]byte var pbuf = bytes.NewBuffer(make([]byte, 0, len(pack))) @@ -252,3 +261,43 @@ type FileHandle struct { HandleType int32 FID FileID } + +/* pahole for scoutfs_ioctl_data_waiting_entry +struct scoutfs_ioctl_data_waiting_entry { + __u64 ino; // 0 8 + __u64 iblock; // 8 8 + __u8 op; // 16 1 + + // size: 17, cachelines: 1, members: 3 + // last cacheline: 17 bytes +}; +*/ + +// DataWaitingEntry is an entry returned when a process is waiting on +// access of offline block +type DataWaitingEntry struct { + Ino uint64 + Iblock uint64 + Op uint8 +} + +/* pahole for scoutfs_ioctl_data_waiting +struct scoutfs_ioctl_data_waiting { + __u64 flags; // 0 8 + __u64 after_ino; // 8 8 + __u64 after_iblock; // 16 8 + __u64 ents_ptr; // 24 8 + __u16 ents_nr; // 32 2 + + / size: 34, cachelines: 1, members: 5 + / last cacheline: 34 bytes +}; +*/ + +type dataWaiting struct { + flags uint64 + afterIno uint64 + afterIblock uint64 + entries uintptr + count uint16 +}