add support for data waiters interface

This commit is contained in:
Ben McClelland
2019-05-21 13:56:28 -07:00
parent 7074d96d13
commit cfc1aed13a
3 changed files with 194 additions and 19 deletions

View File

@@ -0,0 +1,45 @@
// Copyright (c) 2018 Versity Software, Inc.
//
// Use of this source code is governed by a BSD-3-Clause license
// that can be found in the LICENSE file in the root of the source
// tree.
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/versity/scoutfs-go"
)
func main() {
if len(os.Args) != 2 || os.Args[1] == "-h" {
fmt.Fprintln(os.Stderr, "usage:", os.Args[0], "<scoutfs mount point>")
os.Exit(1)
}
f, err := os.Open(os.Args[1])
if err != nil {
log.Fatalf("Open %v: %v", os.Args[1], err)
}
defer f.Close()
w := scoutfs.NewWaiters(f)
for {
ents, err := w.Next()
if err != nil {
log.Fatalf("next(): %v", err)
}
if ents == nil {
time.Sleep(time.Second)
continue
}
for _, ent := range ents {
log.Printf("%+v", ent)
}
}
}

View File

@@ -34,7 +34,7 @@ type Time struct {
Nsec uint32
}
// NewQuery creates a new scoutfs WalkHandle
// NewQuery creates a new scoutfs Query
// Specify query type with By*() option
// (only 1 allowed, last one wins)
// and specify batching with WithBatchSize()
@@ -95,16 +95,16 @@ func (q *Query) Next() ([]InodesEntry, error) {
pack, err := query.pack()
if err != nil {
return []InodesEntry{}, err
return nil, err
}
n, err := scoutfsctl(q.fsfd.Fd(), IOCQUERYINODES, uintptr(unsafe.Pointer(&pack)))
if err != nil {
return []InodesEntry{}, err
return nil, err
}
if n == 0 {
return []InodesEntry{}, nil
return nil, nil
}
rbuf := bytes.NewReader(buf)
@@ -116,15 +116,15 @@ func (q *Query) Next() ([]InodesEntry, error) {
//unpacking each member individually
err := binary.Read(rbuf, binary.LittleEndian, &e.Major)
if err != nil {
return []InodesEntry{}, err
return nil, err
}
err = binary.Read(rbuf, binary.LittleEndian, &e.Minor)
if err != nil {
return []InodesEntry{}, err
return nil, err
}
err = binary.Read(rbuf, binary.LittleEndian, &e.Ino)
if err != nil {
return []InodesEntry{}, err
return nil, err
}
inodes = append(inodes, e)
@@ -245,3 +245,84 @@ func FStageFile(f *os.File, version, offset uint64, b []byte) error {
_, err := scoutfsctl(f.Fd(), IOCSTAGE, uintptr(unsafe.Pointer(&r)))
return err
}
// Waiters to keep track of data waiters
type Waiters struct {
ino uint64
iblock uint64
batch uint16
fsfd *os.File
}
// NewWaiters creates a new scoutfs Waiters
// An open file within scoutfs is supplied for ioctls
// (usually just the base mount point directory)
func NewWaiters(f *os.File, opts ...WOption) *Waiters {
w := &Waiters{
//default batch size is 128
batch: 128,
fsfd: f,
}
for _, opt := range opts {
opt(w)
}
return w
}
// WOption sets various options for NewWaiters
type WOption func(*Waiters)
// WithWaitersCount sets the max number of inodes to be returned at a time
func WithWaitersCount(size uint16) WOption {
return func(w *Waiters) {
w.batch = size
}
}
// Next gets the next batch of data waiters, returns nil, nil if no waiters
func (w *Waiters) Next() ([]DataWaitingEntry, error) {
buf := make([]byte, int(unsafe.Sizeof(DataWaitingEntry{}))*int(w.batch))
dataWaiting := dataWaiting{
afterIno: w.ino,
afterIblock: w.iblock,
entries: uintptr(unsafe.Pointer(&buf[0])),
count: w.batch,
}
n, err := scoutfsctl(w.fsfd.Fd(), IOCDATAWAITING, uintptr(unsafe.Pointer(&dataWaiting)))
if err != nil {
return nil, err
}
if n == 0 {
return nil, nil
}
rbuf := bytes.NewReader(buf)
var inodes []DataWaitingEntry
var e DataWaitingEntry
for i := 0; i < n; i++ {
err := binary.Read(rbuf, binary.LittleEndian, &e.Ino)
if err != nil {
return nil, err
}
err = binary.Read(rbuf, binary.LittleEndian, &e.Iblock)
if err != nil {
return nil, err
}
err = binary.Read(rbuf, binary.LittleEndian, &e.Op)
if err != nil {
return nil, err
}
inodes = append(inodes, e)
}
w.ino = inodes[n-1].Ino
w.iblock = inodes[n-1].Iblock
return inodes, nil
}

View File

@@ -13,24 +13,33 @@ import (
)
const (
//IOCQUERYINODES scoutfs ioctl
// IOCQUERYINODES scoutfs ioctl
IOCQUERYINODES = 0x40357301
//IOCINOPATH scoutfs ioctl
// IOCINOPATH scoutfs ioctl
IOCINOPATH = 0x40227302
//IOCDATAVERSION scoutfs ioctl
// IOCDATAVERSION scoutfs ioctl
IOCDATAVERSION = 0x40087304
//IOCRELEASE scoutfs ioctl
// IOCRELEASE scoutfs ioctl
IOCRELEASE = 0x40187305
//IOCSTAGE scoutfs ioctl
// IOCSTAGE scoutfs ioctl
IOCSTAGE = 0x401c7306
//IOCSTATMORE scoutfs ioctl
// IOCSTATMORE scoutfs ioctl
IOCSTATMORE = 0x40307307
// IOCDATAWAITING scoutfs ioctl
IOCDATAWAITING = 0x40227309
//QUERYINODESMETASEQ find inodes by metadata sequence
// QUERYINODESMETASEQ find inodes by metadata sequence
QUERYINODESMETASEQ = '\u0000'
//QUERYINODESDATASEQ find inodes by data sequence
// QUERYINODESDATASEQ find inodes by data sequence
QUERYINODESDATASEQ = '\u0001'
// DATAWAITOPREAD waiting operation read
DATAWAITOPREAD = 1 << 0
// DATAWAITOPWRITE waiting operation write
DATAWAITOPWRITE = 1 << 1
// DATAWAITOPCHANGESIZE waiting operation truncate
DATAWAITOPCHANGESIZE = 1 << 2
pathmax = 1024
)
@@ -45,7 +54,7 @@ struct scoutfs_ioctl_walk_inodes_entry {
};
*/
//InodesEntry is scoutfs entry for inode iteration
// InodesEntry is scoutfs entry for inode iteration
type InodesEntry struct {
Major uint64
Minor uint32
@@ -65,7 +74,7 @@ struct scoutfs_ioctl_walk_inodes {
};
*/
//queryInodes is scoutfs request structure for IOCQUERYINODES
// queryInodes is scoutfs request structure for IOCQUERYINODES
type queryInodes struct {
first InodesEntry
last InodesEntry
@@ -74,8 +83,8 @@ type queryInodes struct {
index uint8
}
//packed scoutfs_ioctl_walk_inodes requires packing queryInodes byhand
//the 53 size comes from pahole output above
// packed scoutfs_ioctl_walk_inodes requires packing queryInodes byhand
// the 53 size comes from pahole output above
func (q queryInodes) pack() ([53]byte, error) {
var pack [53]byte
var pbuf = bytes.NewBuffer(make([]byte, 0, len(pack)))
@@ -252,3 +261,43 @@ type FileHandle struct {
HandleType int32
FID FileID
}
/* pahole for scoutfs_ioctl_data_waiting_entry
struct scoutfs_ioctl_data_waiting_entry {
__u64 ino; // 0 8
__u64 iblock; // 8 8
__u8 op; // 16 1
// size: 17, cachelines: 1, members: 3
// last cacheline: 17 bytes
};
*/
// DataWaitingEntry is an entry returned when a process is waiting on
// access of offline block
type DataWaitingEntry struct {
Ino uint64
Iblock uint64
Op uint8
}
/* pahole for scoutfs_ioctl_data_waiting
struct scoutfs_ioctl_data_waiting {
__u64 flags; // 0 8
__u64 after_ino; // 8 8
__u64 after_iblock; // 16 8
__u64 ents_ptr; // 24 8
__u16 ents_nr; // 32 2
/ size: 34, cachelines: 1, members: 5
/ last cacheline: 34 bytes
};
*/
type dataWaiting struct {
flags uint64
afterIno uint64
afterIblock uint64
entries uintptr
count uint16
}