diff --git a/cmd/postpolicyform.go b/cmd/postpolicyform.go
index d4d1c214a..2ced4fbd0 100644
--- a/cmd/postpolicyform.go
+++ b/cmd/postpolicyform.go
@@ -29,10 +29,10 @@ import (
"strings"
"time"
- "github.com/bcicen/jstream"
"github.com/minio/minio-go/v7/pkg/encrypt"
"github.com/minio/minio-go/v7/pkg/set"
xhttp "github.com/minio/minio/internal/http"
+ "github.com/minio/minio/internal/s3select/jstream"
)
// startWithConds - map which indicates if a given condition supports starts-with policy operator
@@ -140,7 +140,7 @@ type PostPolicyForm struct {
func sanitizePolicy(r io.Reader) (io.Reader, error) {
var buf bytes.Buffer
e := json.NewEncoder(&buf)
- d := jstream.NewDecoder(r, 0).ObjectAsKVS()
+ d := jstream.NewDecoder(r, 0).ObjectAsKVS().MaxDepth(10)
sset := set.NewStringSet()
for mv := range d.Stream() {
var kvs jstream.KVS
diff --git a/go.mod b/go.mod
index 7d655fc34..98914f144 100644
--- a/go.mod
+++ b/go.mod
@@ -9,7 +9,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/IBM/sarama v1.43.3
github.com/alecthomas/participle v0.7.1
- github.com/bcicen/jstream v1.0.1
github.com/beevik/ntp v1.4.3
github.com/buger/jsonparser v1.1.1
github.com/cespare/xxhash/v2 v2.3.0
diff --git a/go.sum b/go.sum
index 2c09537a0..c58b70129 100644
--- a/go.sum
+++ b/go.sum
@@ -62,8 +62,6 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
github.com/aymanbagabas/go-udiff v0.2.0 h1:TK0fH4MteXUDspT88n8CKzvK0X9O2xu9yQjWpi6yML8=
github.com/aymanbagabas/go-udiff v0.2.0/go.mod h1:RE4Ex0qsGkTAJoQdQQCA0uG+nAzJO/pI/QwceO5fgrA=
-github.com/bcicen/jstream v1.0.1 h1:BXY7Cu4rdmc0rhyTVyT3UkxAiX3bnLpKLas9btbH5ck=
-github.com/bcicen/jstream v1.0.1/go.mod h1:9ielPxqFry7Y4Tg3j4BfjPocfJ3TbsRtXOAYXYmRuAQ=
github.com/beevik/ntp v1.4.3 h1:PlbTvE5NNy4QHmA4Mg57n7mcFTmr1W1j3gcK7L1lqho=
github.com/beevik/ntp v1.4.3/go.mod h1:Unr8Zg+2dRn7d8bHFuehIMSvvUYssHMxW3Q5Nx4RW5Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
diff --git a/internal/s3select/csv/record.go b/internal/s3select/csv/record.go
index 18e467834..8c38f6767 100644
--- a/internal/s3select/csv/record.go
+++ b/internal/s3select/csv/record.go
@@ -25,8 +25,8 @@ import (
"strconv"
"strings"
- "github.com/bcicen/jstream"
csv "github.com/minio/csvparser"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
)
diff --git a/internal/s3select/json/preader.go b/internal/s3select/json/preader.go
index fa2cf8481..d8d016f78 100644
--- a/internal/s3select/json/preader.go
+++ b/internal/s3select/json/preader.go
@@ -24,7 +24,7 @@ import (
"runtime"
"sync"
- "github.com/bcicen/jstream"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
)
@@ -185,7 +185,7 @@ func (r *PReader) startReaders() {
dst = make([]jstream.KVS, 0, 1000)
}
- d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS()
+ d := jstream.NewDecoder(bytes.NewBuffer(in.input), 0).ObjectAsKVS().MaxDepth(100)
stream := d.Stream()
all := dst[:0]
for mv := range stream {
diff --git a/internal/s3select/json/reader.go b/internal/s3select/json/reader.go
index 52eda1404..70a758d92 100644
--- a/internal/s3select/json/reader.go
+++ b/internal/s3select/json/reader.go
@@ -21,9 +21,8 @@ import (
"io"
"sync"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
-
- "github.com/bcicen/jstream"
)
// Limit single document size to 10MiB, 10x the AWS limit:
@@ -84,7 +83,7 @@ func (r *Reader) Close() error {
// NewReader - creates new JSON reader using readCloser.
func NewReader(readCloser io.ReadCloser, args *ReaderArgs) *Reader {
readCloser = &syncReadCloser{rc: readCloser}
- d := jstream.NewDecoder(io.LimitReader(readCloser, maxDocumentSize), 0).ObjectAsKVS()
+ d := jstream.NewDecoder(io.LimitReader(readCloser, maxDocumentSize), 0).ObjectAsKVS().MaxDepth(100)
return &Reader{
args: args,
decoder: d,
diff --git a/internal/s3select/json/record.go b/internal/s3select/json/record.go
index 7b6ddad76..65462e863 100644
--- a/internal/s3select/json/record.go
+++ b/internal/s3select/json/record.go
@@ -26,8 +26,8 @@ import (
"strconv"
"strings"
- "github.com/bcicen/jstream"
csv "github.com/minio/csvparser"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
)
diff --git a/internal/s3select/jstream/LICENSE b/internal/s3select/jstream/LICENSE
new file mode 100644
index 000000000..1c5d82df6
--- /dev/null
+++ b/internal/s3select/jstream/LICENSE
@@ -0,0 +1,22 @@
+The MIT License (MIT)
+
+Copyright (c) 2018 Bradley Cicenas
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
+
diff --git a/internal/s3select/jstream/README.md b/internal/s3select/jstream/README.md
new file mode 100644
index 000000000..2797b3ba6
--- /dev/null
+++ b/internal/s3select/jstream/README.md
@@ -0,0 +1,116 @@
+

+
+#
+
+[](https://godoc.org/github.com/bcicen/jstream)
+
+
+`jstream` is a streaming JSON parser and value extraction library for Go.
+
+Unlike most JSON parsers, `jstream` is document position- and depth-aware -- this enables the extraction of values at a specified depth, eliminating the overhead of allocating encompassing arrays or objects; e.g:
+
+Using the below example document:
+
+
+we can choose to extract and act only the objects within the top-level array:
+```go
+f, _ := os.Open("input.json")
+decoder := jstream.NewDecoder(f, 1) // extract JSON values at a depth level of 1
+for mv := range decoder.Stream() {
+ fmt.Printf("%v\n ", mv.Value)
+}
+```
+
+output:
+```
+map[desc:RGB colors:[red green blue]]
+map[desc:CMYK colors:[cyan magenta yellow black]]
+```
+
+likewise, increasing depth level to `3` yields:
+```
+red
+green
+blue
+cyan
+magenta
+yellow
+black
+```
+
+optionally, kev:value pairs can be emitted as an individual struct:
+```go
+decoder := jstream.NewDecoder(f, 2).EmitKV() // enable KV streaming at a depth level of 2
+```
+
+```
+jstream.KV{desc RGB}
+jstream.KV{colors [red green blue]}
+jstream.KV{desc CMYK}
+jstream.KV{colors [cyan magenta yellow black]}
+```
+
+## Installing
+
+```bash
+go get github.com/bcicen/jstream
+```
+
+## Commandline
+
+`jstream` comes with a cli tool for quick viewing of parsed values from JSON input:
+
+```bash
+jstream -d 1 < input.json
+```
+
+```json
+{"colors":["red","green","blue"],"desc":"RGB"}
+{"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}
+```
+
+detailed output with `-v` option:
+```bash
+cat input.json | jstream -v -d -1
+
+depth start end type | value
+2 018 023 string | "RGB"
+3 041 046 string | "red"
+3 048 055 string | "green"
+3 057 063 string | "blue"
+2 039 065 array | ["red","green","blue"]
+1 004 069 object | {"colors":["red","green","blue"],"desc":"RGB"}
+2 087 093 string | "CMYK"
+3 111 117 string | "cyan"
+3 119 128 string | "magenta"
+3 130 138 string | "yellow"
+3 140 147 string | "black"
+2 109 149 array | ["cyan","magenta","yellow","black"]
+1 073 153 object | {"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}
+0 000 155 array | [{"colors":["red","green","blue"],"desc":"RGB"},{"colors":["cyan","magenta","yellow","black"],"desc":"CMYK"}]
+```
+
+### Options
+
+Opt | Description
+--- | ---
+-d \ | emit values at depth n. if n < 0, all values will be emitted
+-kv | output inner key value pairs as newly formed objects
+-v | output depth and offset details for each value
+-h | display help dialog
+
+## Benchmarks
+
+Obligatory benchmarks performed on files with arrays of objects, where the decoded objects are to be extracted.
+
+Two file sizes are used -- regular (1.6mb, 1000 objects) and large (128mb, 100000 objects)
+
+input size | lib | MB/s | Allocated
+--- | --- | --- | ---
+regular | standard | 97 | 3.6MB
+regular | jstream | 175 | 2.1MB
+large | standard | 92 | 305MB
+large | jstream | 404 | 69MB
+
+In a real world scenario, including initialization and reader overhead from varying blob sizes, performance can be expected as below:
+
diff --git a/internal/s3select/jstream/decoder.go b/internal/s3select/jstream/decoder.go
new file mode 100644
index 000000000..abd2c5d51
--- /dev/null
+++ b/internal/s3select/jstream/decoder.go
@@ -0,0 +1,675 @@
+package jstream
+
+import (
+ "bytes"
+ "encoding/json"
+ "io"
+ "strconv"
+ "sync/atomic"
+ "unicode/utf16"
+)
+
+// ValueType - defines the type of each JSON value
+type ValueType int
+
+// Different types of JSON value
+const (
+ Unknown ValueType = iota
+ Null
+ String
+ Number
+ Boolean
+ Array
+ Object
+)
+
+// MetaValue wraps a decoded interface value with the document
+// position and depth at which the value was parsed
+type MetaValue struct {
+ Offset int
+ Length int
+ Depth int
+ Value interface{}
+ ValueType ValueType
+}
+
+// KV contains a key and value pair parsed from a decoded object
+type KV struct {
+ Key string `json:"key"`
+ Value interface{} `json:"value"`
+}
+
+// KVS - represents key values in an JSON object
+type KVS []KV
+
+// MarshalJSON - implements converting a KVS datastructure into a JSON
+// object with multiple keys and values.
+func (kvs KVS) MarshalJSON() ([]byte, error) {
+ b := new(bytes.Buffer)
+ b.Write([]byte("{"))
+ for i, kv := range kvs {
+ b.Write([]byte("\"" + kv.Key + "\"" + ":"))
+ valBuf, err := json.Marshal(kv.Value)
+ if err != nil {
+ return nil, err
+ }
+ b.Write(valBuf)
+ if i < len(kvs)-1 {
+ b.Write([]byte(","))
+ }
+ }
+ b.Write([]byte("}"))
+ return b.Bytes(), nil
+}
+
+// Decoder wraps an io.Reader to provide incremental decoding of
+// JSON values
+type Decoder struct {
+ *scanner
+ emitDepth int
+ maxDepth int
+ emitKV bool
+ emitRecursive bool
+ objectAsKVS bool
+
+ depth int
+ scratch *scratch
+ metaCh chan *MetaValue
+ err error
+
+ // follow line position to add context to errors
+ lineNo int
+ lineStart int64
+}
+
+// NewDecoder creates new Decoder to read JSON values at the provided
+// emitDepth from the provider io.Reader.
+// If emitDepth is < 0, values at every depth will be emitted.
+func NewDecoder(r io.Reader, emitDepth int) *Decoder {
+ d := &Decoder{
+ scanner: newScanner(r),
+ emitDepth: emitDepth,
+ scratch: &scratch{data: make([]byte, 1024)},
+ metaCh: make(chan *MetaValue, 128),
+ }
+ if emitDepth < 0 {
+ d.emitDepth = 0
+ d.emitRecursive = true
+ }
+ return d
+}
+
+// ObjectAsKVS - by default JSON returns map[string]interface{} this
+// is usually fine in most cases, but when you need to preserve the
+// input order its not a right data structure. To preserve input
+// order please use this option.
+func (d *Decoder) ObjectAsKVS() *Decoder {
+ d.objectAsKVS = true
+ return d
+}
+
+// EmitKV enables emitting a jstream.KV struct when the items(s) parsed
+// at configured emit depth are within a JSON object. By default, only
+// the object values are emitted.
+func (d *Decoder) EmitKV() *Decoder {
+ d.emitKV = true
+ return d
+}
+
+// Recursive enables emitting all values at a depth higher than the
+// configured emit depth; e.g. if an array is found at emit depth, all
+// values within the array are emitted to the stream, then the array
+// containing those values is emitted.
+func (d *Decoder) Recursive() *Decoder {
+ d.emitRecursive = true
+ return d
+}
+
+// Stream begins decoding from the underlying reader and returns a
+// streaming MetaValue channel for JSON values at the configured emitDepth.
+func (d *Decoder) Stream() chan *MetaValue {
+ go d.decode()
+ return d.metaCh
+}
+
+// Pos returns the number of bytes consumed from the underlying reader
+func (d *Decoder) Pos() int { return int(d.pos) }
+
+// Err returns the most recent decoder error if any, or nil
+func (d *Decoder) Err() error { return d.err }
+
+// MaxDepth will set the maximum recursion depth.
+// If the maximum depth is exceeded, ErrMaxDepth is returned.
+// Less than or 0 means no limit (default).
+func (d *Decoder) MaxDepth(n int) *Decoder {
+ d.maxDepth = n
+ return d
+}
+
+// Decode parses the JSON-encoded data and returns an interface value
+func (d *Decoder) decode() {
+ defer close(d.metaCh)
+ d.skipSpaces()
+ for d.remaining() > 0 {
+ _, err := d.emitAny()
+ if err != nil {
+ d.err = err
+ break
+ }
+ d.skipSpaces()
+ }
+}
+
+func (d *Decoder) emitAny() (interface{}, error) {
+ if d.pos >= atomic.LoadInt64(&d.end) {
+ return nil, d.mkError(ErrUnexpectedEOF)
+ }
+ offset := d.pos - 1
+ i, t, err := d.any()
+ if d.willEmit() {
+ d.metaCh <- &MetaValue{
+ Offset: int(offset),
+ Length: int(d.pos - offset),
+ Depth: d.depth,
+ Value: i,
+ ValueType: t,
+ }
+ }
+ return i, err
+}
+
+// return whether, at the current depth, the value being decoded will
+// be emitted to stream
+func (d *Decoder) willEmit() bool {
+ if d.emitRecursive {
+ return d.depth >= d.emitDepth
+ }
+ return d.depth == d.emitDepth
+}
+
+// any used to decode any valid JSON value, and returns an
+// interface{} that holds the actual data
+func (d *Decoder) any() (interface{}, ValueType, error) {
+ c := d.cur()
+
+ switch c {
+ case '"':
+ i, err := d.string()
+ return i, String, err
+ case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
+ i, err := d.number()
+ return i, Number, err
+ case '-':
+ if c = d.next(); c < '0' || c > '9' {
+ return nil, Unknown, d.mkError(ErrSyntax, "in negative numeric literal")
+ }
+ n, err := d.number()
+ if err != nil {
+ return nil, Unknown, err
+ }
+ return -n, Number, nil
+ case 'f':
+ if d.remaining() < 4 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ //nolint:gocritic
+ if d.next() == 'a' && d.next() == 'l' && d.next() == 's' && d.next() == 'e' {
+ return false, Boolean, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal false")
+ case 't':
+ if d.remaining() < 3 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ //nolint:gocritic
+ if d.next() == 'r' && d.next() == 'u' && d.next() == 'e' {
+ return true, Boolean, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal true")
+ case 'n':
+ if d.remaining() < 3 {
+ return nil, Unknown, d.mkError(ErrUnexpectedEOF)
+ }
+ //nolint:gocritic
+ if d.next() == 'u' && d.next() == 'l' && d.next() == 'l' {
+ return nil, Null, nil
+ }
+ return nil, Unknown, d.mkError(ErrSyntax, "in literal null")
+ case '[':
+ i, err := d.array()
+ return i, Array, err
+ case '{':
+ var i interface{}
+ var err error
+ if d.objectAsKVS {
+ i, err = d.objectOrdered()
+ } else {
+ i, err = d.object()
+ }
+ return i, Object, err
+ default:
+ return nil, Unknown, d.mkError(ErrSyntax, "looking for beginning of value")
+ }
+}
+
+// string called by `any` or `object`(for map keys) after reading `"`
+func (d *Decoder) string() (string, error) {
+ d.scratch.reset()
+ c := d.next()
+
+scan:
+ for {
+ switch {
+ case c == '"':
+ return string(d.scratch.bytes()), nil
+ case c == '\\':
+ c = d.next()
+ goto scan_esc
+ case c < 0x20:
+ return "", d.mkError(ErrSyntax, "in string literal")
+ // Coerce to well-formed UTF-8.
+ default:
+ d.scratch.add(c)
+ if d.remaining() == 0 {
+ return "", d.mkError(ErrSyntax, "in string literal")
+ }
+ c = d.next()
+ }
+ }
+
+scan_esc:
+ switch c {
+ case '"', '\\', '/', '\'':
+ d.scratch.add(c)
+ case 'u':
+ goto scan_u
+ case 'b':
+ d.scratch.add('\b')
+ case 'f':
+ d.scratch.add('\f')
+ case 'n':
+ d.scratch.add('\n')
+ case 'r':
+ d.scratch.add('\r')
+ case 't':
+ d.scratch.add('\t')
+ default:
+ return "", d.mkError(ErrSyntax, "in string escape code")
+ }
+ c = d.next()
+ goto scan
+
+scan_u:
+ r := d.u4()
+ if r < 0 {
+ return "", d.mkError(ErrSyntax, "in unicode escape sequence")
+ }
+
+ // check for proceeding surrogate pair
+ c = d.next()
+ if !utf16.IsSurrogate(r) || c != '\\' {
+ d.scratch.addRune(r)
+ goto scan
+ }
+ if c = d.next(); c != 'u' {
+ d.scratch.addRune(r)
+ goto scan_esc
+ }
+
+ r2 := d.u4()
+ if r2 < 0 {
+ return "", d.mkError(ErrSyntax, "in unicode escape sequence")
+ }
+
+ // write surrogate pair
+ d.scratch.addRune(utf16.DecodeRune(r, r2))
+ c = d.next()
+ goto scan
+}
+
+// u4 reads four bytes following a \u escape
+func (d *Decoder) u4() rune {
+ // logic taken from:
+ // github.com/buger/jsonparser/blob/master/escape.go#L20
+ var h [4]int
+ for i := 0; i < 4; i++ {
+ c := d.next()
+ switch {
+ case c >= '0' && c <= '9':
+ h[i] = int(c - '0')
+ case c >= 'A' && c <= 'F':
+ h[i] = int(c - 'A' + 10)
+ case c >= 'a' && c <= 'f':
+ h[i] = int(c - 'a' + 10)
+ default:
+ return -1
+ }
+ }
+ return rune(h[0]<<12 + h[1]<<8 + h[2]<<4 + h[3])
+}
+
+// number called by `any` after reading number between 0 to 9
+func (d *Decoder) number() (float64, error) {
+ d.scratch.reset()
+
+ var (
+ c = d.cur()
+ n float64
+ isFloat bool
+ )
+
+ // digits first
+ switch {
+ case c == '0':
+ d.scratch.add(c)
+ c = d.next()
+ case '1' <= c && c <= '9':
+ for ; c >= '0' && c <= '9'; c = d.next() {
+ n = 10*n + float64(c-'0')
+ d.scratch.add(c)
+ }
+ }
+
+ // . followed by 1 or more digits
+ if c == '.' {
+ isFloat = true
+ d.scratch.add(c)
+
+ // first char following must be digit
+ if c = d.next(); c < '0' || c > '9' {
+ return 0, d.mkError(ErrSyntax, "after decimal point in numeric literal")
+ }
+ d.scratch.add(c)
+
+ for {
+ if d.remaining() == 0 {
+ return 0, d.mkError(ErrUnexpectedEOF)
+ }
+ if c = d.next(); c < '0' || c > '9' {
+ break
+ }
+ d.scratch.add(c)
+ }
+ }
+
+ // e or E followed by an optional - or + and
+ // 1 or more digits.
+ if c == 'e' || c == 'E' {
+ isFloat = true
+ d.scratch.add(c)
+
+ if c = d.next(); c == '+' || c == '-' {
+ d.scratch.add(c)
+ if c = d.next(); c < '0' || c > '9' {
+ return 0, d.mkError(ErrSyntax, "in exponent of numeric literal")
+ }
+ d.scratch.add(c)
+ }
+ for ; c >= '0' && c <= '9'; c = d.next() {
+ d.scratch.add(c)
+ }
+ }
+
+ if isFloat {
+ var (
+ err error
+ sn string
+ )
+ sn = string(d.scratch.bytes())
+ if n, err = strconv.ParseFloat(sn, 64); err != nil {
+ return 0, err
+ }
+ }
+
+ d.back()
+ return n, nil
+}
+
+// array accept valid JSON array value
+func (d *Decoder) array() ([]interface{}, error) {
+ d.depth++
+ if d.maxDepth > 0 && d.depth > d.maxDepth {
+ return nil, ErrMaxDepth
+ }
+
+ var (
+ c byte
+ v interface{}
+ err error
+ array = make([]interface{}, 0)
+ )
+
+ // look ahead for ] - if the array is empty.
+ if c = d.skipSpaces(); c == ']' {
+ goto out
+ }
+
+scan:
+ if v, err = d.emitAny(); err != nil {
+ goto out
+ }
+
+ if d.depth > d.emitDepth { // skip alloc for array if it won't be emitted
+ array = append(array, v)
+ }
+
+ // next token must be ',' or ']'
+ switch c = d.skipSpaces(); c {
+ case ',':
+ d.skipSpaces()
+ goto scan
+ case ']':
+ goto out
+ default:
+ err = d.mkError(ErrSyntax, "after array element")
+ }
+
+out:
+ d.depth--
+ return array, err
+}
+
+// object accept valid JSON array value
+func (d *Decoder) object() (map[string]interface{}, error) {
+ d.depth++
+ if d.maxDepth > 0 && d.depth > d.maxDepth {
+ return nil, ErrMaxDepth
+ }
+
+ var (
+ c byte
+ k string
+ v interface{}
+ t ValueType
+ err error
+ obj map[string]interface{}
+ )
+
+ // skip allocating map if it will not be emitted
+ if d.depth > d.emitDepth {
+ obj = make(map[string]interface{})
+ }
+
+ // if the object has no keys
+ if c = d.skipSpaces(); c == '}' {
+ goto out
+ }
+
+scan:
+ for {
+ offset := d.pos - 1
+
+ // read string key
+ if c != '"' {
+ err = d.mkError(ErrSyntax, "looking for beginning of object key string")
+ break
+ }
+ if k, err = d.string(); err != nil {
+ break
+ }
+
+ // read colon before value
+ if c = d.skipSpaces(); c != ':' {
+ err = d.mkError(ErrSyntax, "after object key")
+ break
+ }
+
+ // read value
+ d.skipSpaces()
+ if d.emitKV {
+ if v, t, err = d.any(); err != nil {
+ break
+ }
+ if d.willEmit() {
+ d.metaCh <- &MetaValue{
+ Offset: int(offset),
+ Length: int(d.pos - offset),
+ Depth: d.depth,
+ Value: KV{k, v},
+ ValueType: t,
+ }
+ }
+ } else {
+ if v, err = d.emitAny(); err != nil {
+ break
+ }
+ }
+
+ if obj != nil {
+ obj[k] = v
+ }
+
+ // next token must be ',' or '}'
+ switch c = d.skipSpaces(); c {
+ case '}':
+ goto out
+ case ',':
+ c = d.skipSpaces()
+ goto scan
+ default:
+ err = d.mkError(ErrSyntax, "after object key:value pair")
+ goto out
+ }
+ }
+
+out:
+ d.depth--
+ return obj, err
+}
+
+// object (ordered) accept valid JSON array value
+func (d *Decoder) objectOrdered() (KVS, error) {
+ d.depth++
+ if d.maxDepth > 0 && d.depth > d.maxDepth {
+ return nil, ErrMaxDepth
+ }
+
+ var (
+ c byte
+ k string
+ v interface{}
+ t ValueType
+ err error
+ obj KVS
+ )
+
+ // skip allocating map if it will not be emitted
+ if d.depth > d.emitDepth {
+ obj = make(KVS, 0)
+ }
+
+ // if the object has no keys
+ if c = d.skipSpaces(); c == '}' {
+ goto out
+ }
+
+scan:
+ for {
+ offset := d.pos - 1
+
+ // read string key
+ if c != '"' {
+ err = d.mkError(ErrSyntax, "looking for beginning of object key string")
+ break
+ }
+ if k, err = d.string(); err != nil {
+ break
+ }
+
+ // read colon before value
+ if c = d.skipSpaces(); c != ':' {
+ err = d.mkError(ErrSyntax, "after object key")
+ break
+ }
+
+ // read value
+ d.skipSpaces()
+ if d.emitKV {
+ if v, t, err = d.any(); err != nil {
+ break
+ }
+ if d.willEmit() {
+ d.metaCh <- &MetaValue{
+ Offset: int(offset),
+ Length: int(d.pos - offset),
+ Depth: d.depth,
+ Value: KV{k, v},
+ ValueType: t,
+ }
+ }
+ } else {
+ if v, err = d.emitAny(); err != nil {
+ break
+ }
+ }
+
+ if obj != nil {
+ obj = append(obj, KV{k, v})
+ }
+
+ // next token must be ',' or '}'
+ switch c = d.skipSpaces(); c {
+ case '}':
+ goto out
+ case ',':
+ c = d.skipSpaces()
+ goto scan
+ default:
+ err = d.mkError(ErrSyntax, "after object key:value pair")
+ goto out
+ }
+ }
+
+out:
+ d.depth--
+ return obj, err
+}
+
+// returns the next char after white spaces
+func (d *Decoder) skipSpaces() byte {
+ for d.pos < atomic.LoadInt64(&d.end) {
+ switch c := d.next(); c {
+ case '\n':
+ d.lineStart = d.pos
+ d.lineNo++
+ continue
+ case ' ', '\t', '\r':
+ continue
+ default:
+ return c
+ }
+ }
+ return 0
+}
+
+// create syntax errors at current position, with optional context
+func (d *Decoder) mkError(err DecoderError, context ...string) error {
+ if len(context) > 0 {
+ err.context = context[0]
+ }
+ err.atChar = d.cur()
+ err.pos[0] = d.lineNo + 1
+ err.pos[1] = int(d.pos - d.lineStart)
+ err.readerErr = d.readerErr
+ return err
+}
diff --git a/internal/s3select/jstream/decoder_test.go b/internal/s3select/jstream/decoder_test.go
new file mode 100644
index 000000000..8c876fc9d
--- /dev/null
+++ b/internal/s3select/jstream/decoder_test.go
@@ -0,0 +1,276 @@
+package jstream
+
+import (
+ "bytes"
+ "testing"
+)
+
+func mkReader(s string) *bytes.Reader { return bytes.NewReader([]byte(s)) }
+
+func TestDecoderSimple(t *testing.T) {
+ var (
+ counter int
+ mv *MetaValue
+ body = `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`
+ )
+
+ decoder := NewDecoder(mkReader(body), 1)
+
+ for mv = range decoder.Stream() {
+ counter++
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+}
+
+func TestDecoderNested(t *testing.T) {
+ var (
+ counter int
+ mv *MetaValue
+ body = `{
+ "1": {
+ "bio": "bada bing bada boom",
+ "id": 0,
+ "name": "Roberto",
+ "nested1": {
+ "bio": "utf16 surrogate (\ud834\udcb2)\n\u201cutf 8\u201d",
+ "id": 1.5,
+ "name": "Roberto*Maestro",
+ "nested2": { "nested2arr": [0,1,2], "nested3": {
+ "nested4": { "depth": "recursion" }}
+ }
+ }
+ },
+ "2": {
+ "nullfield": null,
+ "id": -2
+ }
+}`
+ )
+
+ decoder := NewDecoder(mkReader(body), 2)
+
+ for mv = range decoder.Stream() {
+ counter++
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+}
+
+func TestDecoderFlat(t *testing.T) {
+ var (
+ counter int
+ mv *MetaValue
+ body = `[
+ "1st test string",
+ "Roberto*Maestro", "Charles",
+ 0, null, false,
+ 1, 2.5
+]`
+ expected = []struct {
+ Value interface{}
+ ValueType ValueType
+ }{
+ {
+ "1st test string",
+ String,
+ },
+ {
+ "Roberto*Maestro",
+ String,
+ },
+ {
+ "Charles",
+ String,
+ },
+ {
+ 0.0,
+ Number,
+ },
+ {
+ nil,
+ Null,
+ },
+ {
+ false,
+ Boolean,
+ },
+ {
+ 1.0,
+ Number,
+ },
+ {
+ 2.5,
+ Number,
+ },
+ }
+ )
+
+ decoder := NewDecoder(mkReader(body), 1)
+
+ for mv = range decoder.Stream() {
+ if mv.Value != expected[counter].Value {
+ t.Fatalf("got %v, expected: %v", mv.Value, expected[counter])
+ }
+ if mv.ValueType != expected[counter].ValueType {
+ t.Fatalf("got %v value type, expected: %v value type", mv.ValueType, expected[counter].ValueType)
+ }
+ counter++
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+}
+
+func TestDecoderMultiDoc(t *testing.T) {
+ var (
+ counter int
+ mv *MetaValue
+ body = `{ "bio": "bada bing bada boom", "id": 1, "name": "Charles" }
+{ "bio": "bada bing bada boom", "id": 2, "name": "Charles" }
+{ "bio": "bada bing bada boom", "id": 3, "name": "Charles" }
+{ "bio": "bada bing bada boom", "id": 4, "name": "Charles" }
+{ "bio": "bada bing bada boom", "id": 5, "name": "Charles" }
+`
+ )
+
+ decoder := NewDecoder(mkReader(body), 0)
+
+ for mv = range decoder.Stream() {
+ if mv.ValueType != Object {
+ t.Fatalf("got %v value type, expected: Object value type", mv.ValueType)
+ }
+ counter++
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+ if counter != 5 {
+ t.Fatalf("expected 5 items, got %d", counter)
+ }
+
+ // test at depth level 1
+ counter = 0
+ kvcounter := 0
+ decoder = NewDecoder(mkReader(body), 1)
+
+ for mv = range decoder.Stream() {
+ switch mv.Value.(type) {
+ case KV:
+ kvcounter++
+ default:
+ counter++
+ }
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+ if kvcounter != 0 {
+ t.Fatalf("expected 0 keyvalue items, got %d", kvcounter)
+ }
+ if counter != 15 {
+ t.Fatalf("expected 15 items, got %d", counter)
+ }
+
+ // test at depth level 1 w/ emitKV
+ counter = 0
+ kvcounter = 0
+ decoder = NewDecoder(mkReader(body), 1).EmitKV()
+
+ for mv = range decoder.Stream() {
+ switch mv.Value.(type) {
+ case KV:
+ kvcounter++
+ default:
+ counter++
+ }
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+ if err := decoder.Err(); err != nil {
+ t.Fatalf("decoder error: %s", err)
+ }
+ if kvcounter != 15 {
+ t.Fatalf("expected 15 keyvalue items, got %d", kvcounter)
+ }
+ if counter != 0 {
+ t.Fatalf("expected 0 items, got %d", counter)
+ }
+}
+
+func TestDecoderReaderFailure(t *testing.T) {
+ var (
+ failAfter = 900
+ mockData = byte('[')
+ )
+
+ r := newMockReader(failAfter, mockData)
+ decoder := NewDecoder(r, -1)
+
+ for mv := range decoder.Stream() {
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+
+ err := decoder.Err()
+ t.Logf("got error: %s", err)
+ if err == nil {
+ t.Fatalf("missing expected decoder error")
+ }
+
+ derr, ok := err.(DecoderError)
+ if !ok {
+ t.Fatalf("expected error of type DecoderError, got %T", err)
+ }
+
+ if derr.ReaderErr() == nil {
+ t.Fatalf("missing expected underlying reader error")
+ }
+}
+
+func TestDecoderMaxDepth(t *testing.T) {
+ tests := []struct {
+ input string
+ maxDepth int
+ mustFail bool
+ }{
+ // No limit
+ {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 0, mustFail: false},
+ // Array + object = depth 2 = false
+ {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 1, mustFail: true},
+ // Depth 2 = ok
+ {input: `[{"bio":"bada bing bada boom","id":1,"name":"Charles","falseVal":false}]`, maxDepth: 2, mustFail: false},
+ // Arrays:
+ {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 2, mustFail: true},
+ {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 10, mustFail: true},
+ {input: `[[[[[[[[[[[[[[[[[[[[[["ok"]]]]]]]]]]]]]]]]]]]]]]`, maxDepth: 100, mustFail: false},
+ // Objects:
+ {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 2, mustFail: true},
+ {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 10, mustFail: true},
+ {input: `{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"a":{"ok":false}}}}}}}}}}}}}}}}}}}}}}`, maxDepth: 100, mustFail: false},
+ }
+
+ for _, test := range tests {
+ decoder := NewDecoder(mkReader(test.input), 0).MaxDepth(test.maxDepth)
+ var mv *MetaValue
+ for mv = range decoder.Stream() {
+ t.Logf("depth=%d offset=%d len=%d (%v)", mv.Depth, mv.Offset, mv.Length, mv.Value)
+ }
+
+ err := decoder.Err()
+ if test.mustFail && err != ErrMaxDepth {
+ t.Fatalf("missing expected decoder error, got %q", err)
+ }
+ if !test.mustFail && err != nil {
+ t.Fatalf("unexpected error: %q", err)
+ }
+ }
+}
diff --git a/internal/s3select/jstream/errors.go b/internal/s3select/jstream/errors.go
new file mode 100644
index 000000000..52a0e5f62
--- /dev/null
+++ b/internal/s3select/jstream/errors.go
@@ -0,0 +1,52 @@
+package jstream
+
+import (
+ "fmt"
+ "strconv"
+)
+
+// Predefined errors
+var (
+ ErrSyntax = DecoderError{msg: "invalid character"}
+ ErrUnexpectedEOF = DecoderError{msg: "unexpected end of JSON input"}
+ ErrMaxDepth = DecoderError{msg: "maximum recursion depth exceeded"}
+)
+
+type errPos [2]int // line number, byte offset where error occurred
+
+// DecoderError contains a detailed decoding error.
+type DecoderError struct {
+ msg string // description of error
+ context string // additional error context
+ pos errPos
+ atChar byte
+ readerErr error // underlying reader error, if any
+}
+
+// ReaderErr returns the underlying error.
+func (e DecoderError) ReaderErr() error { return e.readerErr }
+
+// Error returns a string representation of the error.
+func (e DecoderError) Error() string {
+ loc := fmt.Sprintf("%s [%d,%d]", quoteChar(e.atChar), e.pos[0], e.pos[1])
+ s := fmt.Sprintf("%s %s: %s", e.msg, e.context, loc)
+ if e.readerErr != nil {
+ s += "\nreader error: " + e.readerErr.Error()
+ }
+ return s
+}
+
+// quoteChar formats c as a quoted character literal
+func quoteChar(c byte) string {
+ // special cases - different from quoted strings
+ if c == '\'' {
+ return `'\''`
+ }
+ if c == '"' {
+ return `'"'`
+ }
+
+ // use quoted string with different quotation marks
+ s := strconv.Quote(string(c))
+ return "'" + s[1:len(s)-1] + "'"
+}
diff --git a/internal/s3select/jstream/scanner.go b/internal/s3select/jstream/scanner.go
new file mode 100644
index 000000000..a8e5be7db
--- /dev/null
+++ b/internal/s3select/jstream/scanner.go
@@ -0,0 +1,114 @@
+package jstream
+
+import (
+ "io"
+ "sync/atomic"
+)
+
+const (
+ chunk = 4095 // ~4k
+ maxUint = ^uint(0)
+ maxInt = int64(maxUint >> 1)
+ nullByte = byte(0)
+)
+
+type scanner struct {
+ pos int64 // position in reader
+ ipos int64 // internal buffer position
+ ifill int64 // internal buffer fill
+ end int64
+ buf [chunk + 1]byte // internal buffer (with a lookback size of 1)
+ nbuf [chunk]byte // next internal buffer
+ fillReq chan struct{}
+ fillReady chan int64
+ readerErr error // underlying reader error, if any
+}
+
+func newScanner(r io.Reader) *scanner {
+ sr := &scanner{
+ end: maxInt,
+ fillReq: make(chan struct{}),
+ fillReady: make(chan int64),
+ }
+
+ go func() {
+ var rpos int64 // total bytes read into buffer
+
+ defer func() {
+ atomic.StoreInt64(&sr.end, rpos)
+ close(sr.fillReady)
+ }()
+
+ for range sr.fillReq {
+ scan:
+ n, err := r.Read(sr.nbuf[:])
+
+ if n == 0 {
+ switch err {
+ case io.EOF: // reader is exhausted
+ return
+ case nil: // no data and no error, retry fill
+ goto scan
+ default: // unexpected reader error
+ sr.readerErr = err
+ return
+ }
+ }
+
+ rpos += int64(n)
+ sr.fillReady <- int64(n)
+ }
+ }()
+
+ sr.fillReq <- struct{}{} // initial fill
+
+ return sr
+}
+
+// remaining returns the number of unread bytes
+// if EOF for the underlying reader has not yet been found,
+// maximum possible integer value will be returned
+func (s *scanner) remaining() int64 {
+ if atomic.LoadInt64(&s.end) == maxInt {
+ return maxInt
+ }
+ return atomic.LoadInt64(&s.end) - s.pos
+}
+
+// read byte at current position (without advancing)
+func (s *scanner) cur() byte { return s.buf[s.ipos] }
+
+// read next byte
+func (s *scanner) next() byte {
+ if s.pos >= atomic.LoadInt64(&s.end) {
+ return nullByte
+ }
+ s.ipos++
+
+ if s.ipos > s.ifill { // internal buffer is exhausted
+ s.ifill = <-s.fillReady
+
+ s.buf[0] = s.buf[len(s.buf)-1] // copy current last item to guarantee lookback
+ copy(s.buf[1:], s.nbuf[:]) // copy contents of pre-filled next buffer
+ s.ipos = 1 // move to beginning of internal buffer
+
+ // request next fill to be prepared
+ if s.end == maxInt {
+ s.fillReq <- struct{}{}
+ }
+ }
+
+ s.pos++
+ return s.buf[s.ipos]
+}
+
+// back undoes a previous call to next(), moving backward one byte in the internal buffer.
+// as we only guarantee a lookback buffer size of one, any subsequent calls to back()
+// before calling next() may panic
+func (s *scanner) back() {
+ if s.ipos <= 0 {
+ panic("back buffer exhausted")
+ }
+ s.ipos--
+ s.pos--
+}
diff --git a/internal/s3select/jstream/scanner_test.go b/internal/s3select/jstream/scanner_test.go
new file mode 100644
index 000000000..a3df2d8d6
--- /dev/null
+++ b/internal/s3select/jstream/scanner_test.go
@@ -0,0 +1,170 @@
+package jstream
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "sync/atomic"
+ "testing"
+)
+
+var (
+ smallInput = make([]byte, 1024*12) // 12K
+ mediumInput = make([]byte, 1024*1024*12) // 12MB
+ largeInput = make([]byte, 1024*1024*128) // 128MB
+)
+
+func TestScanner(t *testing.T) {
+ data := []byte("abcdefghijklmnopqrstuvwxyz0123456789")
+
+ var i int
+ r := bytes.NewReader(data)
+ scanner := newScanner(r)
+ for scanner.pos < atomic.LoadInt64(&scanner.end) {
+ c := scanner.next()
+ if scanner.readerErr != nil {
+ t.Fatal(scanner.readerErr)
+ }
+ if c != data[i] {
+ t.Fatalf("expected %s, got %s", string(data[i]), string(c))
+ }
+ t.Logf("pos=%d remaining=%d (%s)", i, r.Len(), string(c))
+ i++
+ }
+}
+
+type mockReader struct {
+ pos int
+ mockData byte
+ failAfter int
+}
+
+func newMockReader(failAfter int, data byte) *mockReader {
+ return &mockReader{0, data, failAfter}
+}
+
+func (r *mockReader) Read(p []byte) (n int, err error) {
+ if r.pos >= r.failAfter {
+ return 0, fmt.Errorf("intentionally unexpected reader error")
+ }
+ r.pos++
+ p[0] = r.mockData
+ return 1, nil
+}
+
+func TestScannerFailure(t *testing.T) {
+ var (
+ i int
+ failAfter = 900
+ mockData = byte(32)
+ )
+
+ r := newMockReader(failAfter, mockData)
+ scanner := newScanner(r)
+
+ for i < 1000 {
+ c := scanner.next()
+ if c == byte(0) {
+ break
+ }
+ if c != mockData {
+ t.Fatalf("expected \"%s\", got \"%s\"", string(mockData), string(c))
+ }
+ i++
+ }
+ c := scanner.next()
+ if scanner.readerErr == nil {
+ t.Fatalf("failed to receive expected error after %d bytes", failAfter)
+ }
+ if c != byte(0) {
+ t.Fatalf("expected null byte, got %v", c)
+ }
+}
+
+func BenchmarkBufioScanner(b *testing.B) {
+ b.Run("small", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioScanner(smallInput)
+ }
+ })
+ b.Run("medium", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioScanner(mediumInput)
+ }
+ })
+ b.Run("large", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioScanner(largeInput)
+ }
+ })
+}
+
+func benchmarkBufioScanner(b []byte) {
+ s := bufio.NewScanner(bytes.NewReader(b))
+ s.Split(bufio.ScanBytes)
+ for s.Scan() {
+ s.Bytes()
+ }
+}
+
+func BenchmarkBufioReader(b *testing.B) {
+ b.Run("small", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioReader(smallInput)
+ }
+ })
+ b.Run("medium", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioReader(mediumInput)
+ }
+ })
+ b.Run("large", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkBufioReader(largeInput)
+ }
+ })
+}
+
+func benchmarkBufioReader(b []byte) {
+ br := bufio.NewReader(bytes.NewReader(b))
+loop:
+ for {
+ _, err := br.ReadByte()
+ switch err {
+ case nil:
+ continue loop
+ case io.EOF:
+ break loop
+ default:
+ panic(err)
+ }
+ }
+}
+
+func BenchmarkScanner(b *testing.B) {
+ b.Run("small", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkScanner(smallInput)
+ }
+ })
+ b.Run("medium", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkScanner(mediumInput)
+ }
+ })
+ b.Run("large", func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ benchmarkScanner(largeInput)
+ }
+ })
+}
+
+func benchmarkScanner(b []byte) {
+ r := bytes.NewReader(b)
+
+ scanner := newScanner(r)
+ for scanner.remaining() > 0 {
+ scanner.next()
+ }
+}
diff --git a/internal/s3select/jstream/scratch.go b/internal/s3select/jstream/scratch.go
new file mode 100644
index 000000000..75bc6c435
--- /dev/null
+++ b/internal/s3select/jstream/scratch.go
@@ -0,0 +1,44 @@
+package jstream
+
+import (
+ "unicode/utf8"
+)
+
+type scratch struct {
+ data []byte
+ fill int
+}
+
+// reset scratch buffer
+func (s *scratch) reset() { s.fill = 0 }
+
+// bytes returns the written contents of scratch buffer
+func (s *scratch) bytes() []byte { return s.data[0:s.fill] }
+
+// grow scratch buffer
+func (s *scratch) grow() {
+ ndata := make([]byte, cap(s.data)*2)
+ copy(ndata, s.data)
+ s.data = ndata
+}
+
+// append single byte to scratch buffer
+func (s *scratch) add(c byte) {
+ if s.fill+1 >= cap(s.data) {
+ s.grow()
+ }
+
+ s.data[s.fill] = c
+ s.fill++
+}
+
+// append encoded rune to scratch buffer
+func (s *scratch) addRune(r rune) int {
+ if s.fill+utf8.UTFMax >= cap(s.data) {
+ s.grow()
+ }
+
+ n := utf8.EncodeRune(s.data[s.fill:], r)
+ s.fill += n
+ return n
+}
diff --git a/internal/s3select/parquet/reader.go b/internal/s3select/parquet/reader.go
index 7d27c3a35..f8dd311ee 100644
--- a/internal/s3select/parquet/reader.go
+++ b/internal/s3select/parquet/reader.go
@@ -22,10 +22,10 @@ import (
"io"
"time"
- "github.com/bcicen/jstream"
parquetgo "github.com/fraugster/parquet-go"
parquettypes "github.com/fraugster/parquet-go/parquet"
jsonfmt "github.com/minio/minio/internal/s3select/json"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
)
diff --git a/internal/s3select/simdj/record.go b/internal/s3select/simdj/record.go
index 3cf91de6f..9f66069d6 100644
--- a/internal/s3select/simdj/record.go
+++ b/internal/s3select/simdj/record.go
@@ -21,9 +21,9 @@ import (
"fmt"
"io"
- "github.com/bcicen/jstream"
csv "github.com/minio/csvparser"
"github.com/minio/minio/internal/s3select/json"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/minio/internal/s3select/sql"
"github.com/minio/simdjson-go"
)
diff --git a/internal/s3select/sql/evaluate.go b/internal/s3select/sql/evaluate.go
index b09be3b56..95dd716da 100644
--- a/internal/s3select/sql/evaluate.go
+++ b/internal/s3select/sql/evaluate.go
@@ -24,7 +24,7 @@ import (
"math"
"strings"
- "github.com/bcicen/jstream"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/simdjson-go"
)
diff --git a/internal/s3select/sql/jsonpath.go b/internal/s3select/sql/jsonpath.go
index 7e20c4584..9ac995e96 100644
--- a/internal/s3select/sql/jsonpath.go
+++ b/internal/s3select/sql/jsonpath.go
@@ -20,7 +20,7 @@ package sql
import (
"errors"
- "github.com/bcicen/jstream"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/simdjson-go"
)
diff --git a/internal/s3select/sql/jsonpath_test.go b/internal/s3select/sql/jsonpath_test.go
index b04361345..2825e9a9e 100644
--- a/internal/s3select/sql/jsonpath_test.go
+++ b/internal/s3select/sql/jsonpath_test.go
@@ -27,11 +27,11 @@ import (
"testing"
"github.com/alecthomas/participle"
- "github.com/bcicen/jstream"
+ "github.com/minio/minio/internal/s3select/jstream"
)
func getJSONStructs(b []byte) ([]interface{}, error) {
- dec := jstream.NewDecoder(bytes.NewBuffer(b), 0).ObjectAsKVS()
+ dec := jstream.NewDecoder(bytes.NewBuffer(b), 0).ObjectAsKVS().MaxDepth(100)
var result []interface{}
for parsedVal := range dec.Stream() {
result = append(result, parsedVal.Value)
diff --git a/internal/s3select/sql/statement.go b/internal/s3select/sql/statement.go
index ce8bfd8b8..14068b6d7 100644
--- a/internal/s3select/sql/statement.go
+++ b/internal/s3select/sql/statement.go
@@ -22,7 +22,7 @@ import (
"fmt"
"strings"
- "github.com/bcicen/jstream"
+ "github.com/minio/minio/internal/s3select/jstream"
"github.com/minio/simdjson-go"
)