diff --git a/go.mod b/go.mod index 48a319bfc..9288e427f 100644 --- a/go.mod +++ b/go.mod @@ -50,8 +50,8 @@ require ( github.com/inconshreveable/go-update v0.0.0-20160112193335-8152e7eb6ccf github.com/jonboulle/clockwork v0.1.0 // indirect github.com/json-iterator/go v1.1.7 - github.com/klauspost/compress v1.9.7 - github.com/klauspost/cpuid v1.2.2 // indirect + github.com/klauspost/compress v1.9.8 + github.com/klauspost/cpuid v1.2.2 github.com/klauspost/pgzip v1.2.1 github.com/klauspost/readahead v1.3.1 github.com/klauspost/reedsolomon v1.9.3 @@ -70,6 +70,7 @@ require ( github.com/minio/minio-go/v6 v6.0.45 github.com/minio/parquet-go v0.0.0-20200125064549-a1e49702e174 github.com/minio/sha256-simd v0.1.1 + github.com/minio/simdjson-go v0.1.2 github.com/minio/sio v0.2.0 github.com/mitchellh/go-homedir v1.1.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index af6fd239c..f02b090c9 100644 --- a/go.sum +++ b/go.sum @@ -224,6 +224,8 @@ github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.9.4/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA= github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us= github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM= @@ -278,6 +280,12 @@ github.com/minio/parquet-go v0.0.0-20200125064549-a1e49702e174 h1:WYFHZIJ5LTWd4C github.com/minio/parquet-go v0.0.0-20200125064549-a1e49702e174/go.mod h1:PXYM9yI2l0YPmxHUXe6mFTmkQcyaVasDshAPTbGpDoo= github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKUJU= github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= +github.com/minio/simdjson-go v0.1.0 h1:4QbJlXkE6lIzwzFP63e1aExJxDI6certBM0YKNPVm3g= +github.com/minio/simdjson-go v0.1.0/go.mod h1:ZAVJ8HB/LLrnk+nt4+nYd9kTAdRKGZdvW/x5Jelgevk= +github.com/minio/simdjson-go v0.1.1 h1:m1e/lgS21pUtACcNn6j6juiZetOBaM/FjrwW/pehg14= +github.com/minio/simdjson-go v0.1.1/go.mod h1:ZAVJ8HB/LLrnk+nt4+nYd9kTAdRKGZdvW/x5Jelgevk= +github.com/minio/simdjson-go v0.1.2 h1:ZKMpcX+qscSGcy3ZG9uBeJoNbYNZkwCpVyj7pWJLXQQ= +github.com/minio/simdjson-go v0.1.2/go.mod h1:ZAVJ8HB/LLrnk+nt4+nYd9kTAdRKGZdvW/x5Jelgevk= github.com/minio/sio v0.2.0 h1:NCRCFLx0r5pRbXf65LVNjxbCGZgNQvNFQkgX3XF4BoA= github.com/minio/sio v0.2.0/go.mod h1:nKM5GIWSrqbOZp0uhyj6M1iA0X6xQzSGtYSaTKSCut0= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= diff --git a/pkg/s3select/csv/record.go b/pkg/s3select/csv/record.go index 026601e82..5ad3c3374 100644 --- a/pkg/s3select/csv/record.go +++ b/pkg/s3select/csv/record.go @@ -55,10 +55,10 @@ func (r *Record) Get(name string) (*sql.Value, error) { } // Set - sets the value for a column name. -func (r *Record) Set(name string, value *sql.Value) error { +func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) { r.columnNames = append(r.columnNames, name) r.csvRecord = append(r.csvRecord, value.CSVString()) - return nil + return r, nil } // Reset data in record. @@ -121,7 +121,7 @@ func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) { } // Replace - is not supported for CSV -func (r *Record) Replace(_ jstream.KVS) error { +func (r *Record) Replace(_ interface{}) error { return errors.New("Replace is not supported for CSV") } diff --git a/pkg/s3select/json/record.go b/pkg/s3select/json/record.go index 37db6fd30..9d211ec22 100644 --- a/pkg/s3select/json/record.go +++ b/pkg/s3select/json/record.go @@ -22,6 +22,8 @@ import ( "errors" "fmt" "io" + "math" + "strconv" "strings" "github.com/bcicen/jstream" @@ -72,7 +74,7 @@ func (r *Record) Clone(dst sql.Record) sql.Record { } // Set - sets the value for a column name. -func (r *Record) Set(name string, value *sql.Value) error { +func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) { var v interface{} if b, ok := value.ToBool(); ok { v = b @@ -97,12 +99,12 @@ func (r *Record) Set(name string, value *sql.Value) error { } else if arr, ok := value.ToArray(); ok { v = arr } else { - return fmt.Errorf("unsupported sql value %v and type %v", value, value.GetTypeString()) + return nil, fmt.Errorf("unsupported sql value %v and type %v", value, value.GetTypeString()) } name = strings.Replace(name, "*", "__ALL__", -1) r.KVS = append(r.KVS, jstream.KV{Key: name, Value: v}) - return nil + return r, nil } // WriteCSV - encodes to CSV data. @@ -111,7 +113,11 @@ func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { for _, kv := range r.KVS { var columnValue string switch val := kv.Value.(type) { - case bool, float64, int64, string: + case float64: + columnValue = jsonFloat(val) + case string: + columnValue = val + case bool, int64: columnValue = fmt.Sprintf("%v", val) case nil: columnValue = "" @@ -153,8 +159,12 @@ func (r *Record) WriteJSON(writer io.Writer) error { } // Replace the underlying buffer of json data. -func (r *Record) Replace(k jstream.KVS) error { - r.KVS = k +func (r *Record) Replace(k interface{}) error { + v, ok := k.(jstream.KVS) + if !ok { + return fmt.Errorf("cannot replace internal data in json record with type %T", k) + } + r.KVS = v return nil } @@ -165,3 +175,32 @@ func NewRecord(f sql.SelectObjectFormat) *Record { SelectFormat: f, } } + +// jsonFloat converts a float to string similar to Go stdlib formats json floats. +func jsonFloat(f float64) string { + var tmp [32]byte + dst := tmp[:0] + + // Convert as if by ES6 number to string conversion. + // This matches most other JSON generators. + // See golang.org/issue/6384 and golang.org/issue/14135. + // Like fmt %g, but the exponent cutoffs are different + // and exponents themselves are not padded to two digits. + abs := math.Abs(f) + fmt := byte('f') + if abs != 0 { + if abs < 1e-6 || abs >= 1e21 { + fmt = 'e' + } + } + dst = strconv.AppendFloat(dst, f, fmt, -1, 64) + if fmt == 'e' { + // clean up e-09 to e-9 + n := len(dst) + if n >= 4 && dst[n-4] == 'e' && dst[n-3] == '-' && dst[n-2] == '0' { + dst[n-2] = dst[n-1] + dst = dst[:n-1] + } + } + return string(dst) +} diff --git a/pkg/s3select/progress.go b/pkg/s3select/progress.go index fa0ace5c3..2e926063e 100644 --- a/pkg/s3select/progress.go +++ b/pkg/s3select/progress.go @@ -39,6 +39,9 @@ func (r *countUpReader) Read(p []byte) (n int, err error) { } func (r *countUpReader) BytesRead() int64 { + if r == nil { + return 0 + } return atomic.LoadInt64(&r.bytesRead) } @@ -69,6 +72,9 @@ func (pr *progressReader) Read(p []byte) (n int, err error) { } func (pr *progressReader) Close() error { + if pr.rc == nil { + return nil + } pr.closedMu.Lock() defer pr.closedMu.Unlock() if pr.closed { @@ -79,6 +85,9 @@ func (pr *progressReader) Close() error { } func (pr *progressReader) Stats() (bytesScanned, bytesProcessed int64) { + if pr == nil { + return 0, 0 + } return pr.scannedReader.BytesRead(), pr.processedReader.BytesRead() } diff --git a/pkg/s3select/select.go b/pkg/s3select/select.go index 5ace10707..a158394ae 100644 --- a/pkg/s3select/select.go +++ b/pkg/s3select/select.go @@ -30,7 +30,9 @@ import ( "github.com/minio/minio/pkg/s3select/csv" "github.com/minio/minio/pkg/s3select/json" "github.com/minio/minio/pkg/s3select/parquet" + "github.com/minio/minio/pkg/s3select/simdj" "github.com/minio/minio/pkg/s3select/sql" + "github.com/minio/simdjson-go" ) type recordReader interface { @@ -317,7 +319,11 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos } if strings.EqualFold(s3Select.Input.JSONArgs.ContentType, "lines") { - s3Select.recordReader = json.NewPReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + if simdjson.SupportedCPU() { + s3Select.recordReader = simdj.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + } else { + s3Select.recordReader = json.NewPReader(s3Select.progressReader, &s3Select.Input.JSONArgs) + } } else { s3Select.recordReader = json.NewReader(s3Select.progressReader, &s3Select.Input.JSONArgs) } @@ -350,7 +356,9 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { if err != nil { return err } - buf.Truncate(buf.Len() - 1) + if buf.Bytes()[buf.Len()-1] == '\n' { + buf.Truncate(buf.Len() - 1) + } buf.WriteString(s3Select.Output.CSVArgs.RecordDelimiter) return nil @@ -359,8 +367,10 @@ func (s3Select *S3Select) marshal(buf *bytes.Buffer, record sql.Record) error { if err != nil { return err } - - buf.Truncate(buf.Len() - 1) + // Trim trailing newline from non-simd output + if buf.Bytes()[buf.Len()-1] == '\n' { + buf.Truncate(buf.Len() - 1) + } buf.WriteString(s3Select.Output.JSONArgs.RecordDelimiter) return nil diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index e1e758378..bbf81e171 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -21,13 +21,16 @@ import ( "fmt" "io" "io/ioutil" + "math" "net/http" "os" "reflect" "strings" "testing" + "github.com/klauspost/cpuid" "github.com/minio/minio-go/v6" + "github.com/minio/simdjson-go" ) type testResponseWriter struct { @@ -56,6 +59,7 @@ func TestJSONQueries(t *testing.T) { {"id": 1,"title": "Second Record","desc": "another text","synonyms": ["some", "synonym", "value"]} {"id": 2,"title": "Second Record","desc": "another text","numbers": [2, 3.0, 4]} {"id": 3,"title": "Second Record","desc": "another text","nested": [[2, 3.0, 4], [7, 8.5, 9]]}` + var testTable = []struct { name string query string @@ -229,6 +233,11 @@ func TestJSONQueries(t *testing.T) { {"id":3, "value": "true"} `, }, + { + name: "index-wildcard-in", + query: `SELECT * from s3object s WHERE title = 'Test Record'`, + wantResult: `{"id":0,"title":"Test Record","desc":"Some text","synonyms":["foo","bar","whatever"]}`, + }, { name: "select-output-field-as-csv", requestXML: []byte(` @@ -260,7 +269,7 @@ func TestJSONQueries(t *testing.T) { NONE - DOCUMENT + LINES @@ -274,6 +283,59 @@ func TestJSONQueries(t *testing.T) { for _, testCase := range testTable { t.Run(testCase.name, func(t *testing.T) { + // Hack cpuid to the CPU doesn't appear to support AVX2. + // Restore whatever happens. + defer func(f cpuid.Flags) { + cpuid.CPU.Features = f + }(cpuid.CPU.Features) + cpuid.CPU.Features &= math.MaxUint64 - cpuid.AVX2 + + testReq := testCase.requestXML + if len(testReq) == 0 { + testReq = []byte(fmt.Sprintf(defRequest, testCase.query)) + } + s3Select, err := NewS3Select(bytes.NewReader(testReq)) + if err != nil { + t.Fatal(err) + } + + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + in := input + if len(testCase.withJSON) > 0 { + in = testCase.withJSON + } + return ioutil.NopCloser(bytes.NewBufferString(in)), nil + }); err != nil { + t.Fatal(err) + } + + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + gotS := strings.TrimSpace(string(got)) + if !reflect.DeepEqual(gotS, testCase.wantResult) { + t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.query, gotS, testCase.wantResult) + } + }) + t.Run("simd-"+testCase.name, func(t *testing.T) { + if !simdjson.SupportedCPU() { + t.Skip("No CPU support") + } testReq := testCase.requestXML if len(testReq) == 0 { testReq = []byte(fmt.Sprintf(defRequest, testCase.query)) @@ -320,6 +382,89 @@ func TestJSONQueries(t *testing.T) { } func TestCSVQueries(t *testing.T) { + input := `index,ID,CaseNumber,Date,Day,Month,Year,Block,IUCR,PrimaryType,Description,LocationDescription,Arrest,Domestic,Beat,District,Ward,CommunityArea,FBI Code,XCoordinate,YCoordinate,UpdatedOn,Latitude,Longitude,Location +2700763,7732229,,2010-05-26 00:00:00,26,May,2010,113XX S HALSTED ST,1150,,CREDIT CARD FRAUD,,False,False,2233,22.0,34.0,,11,,,,41.688043288,-87.6422444,"(41.688043288, -87.6422444)"` + + var testTable = []struct { + name string + query string + requestXML []byte + wantResult string + }{ + { + name: "select-in-text-simple", + query: `SELECT index FROM s3Object s WHERE "Month"='May'`, + wantResult: `2700763`, + }, + } + + defRequest := ` + + %s + SQL + + NONE + + , + USE + " + " + \n + + + + + + + + FALSE + +` + + for _, testCase := range testTable { + t.Run(testCase.name, func(t *testing.T) { + testReq := testCase.requestXML + if len(testReq) == 0 { + testReq = []byte(fmt.Sprintf(defRequest, testCase.query)) + } + s3Select, err := NewS3Select(bytes.NewReader(testReq)) + if err != nil { + t.Fatal(err) + } + + if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { + return ioutil.NopCloser(bytes.NewBufferString(input)), nil + }); err != nil { + t.Fatal(err) + } + + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + gotS := strings.TrimSpace(string(got)) + if !reflect.DeepEqual(gotS, testCase.wantResult) { + t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.query, gotS, testCase.wantResult) + } + }) + } +} + +func TestCSVQueries2(t *testing.T) { input := `id,time,num,num2,text 1,2010-01-01T,7867786,4565.908123,"a text, with comma" 2,2017-01-02T03:04Z,-5, 0.765111, @@ -559,7 +704,23 @@ func TestCSVInput(t *testing.T) { s3Select.Close() if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult) + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + + t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v\ndecoded:%s", w.response, testCase.expectedResult, string(got)) } }) } @@ -667,7 +828,23 @@ func TestJSONInput(t *testing.T) { s3Select.Close() if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult) + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + + t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v\ndecoded:%s", w.response, testCase.expectedResult, string(got)) } }) } @@ -766,7 +943,23 @@ func TestParquetInput(t *testing.T) { s3Select.Close() if !reflect.DeepEqual(w.response, testCase.expectedResult) { - t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v", w.response, testCase.expectedResult) + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + + t.Errorf("received response does not match with expected reply\ngot: %#v\nwant:%#v\ndecoded:%s", w.response, testCase.expectedResult, string(got)) } }) } diff --git a/pkg/s3select/simdj/errors.go b/pkg/s3select/simdj/errors.go new file mode 100644 index 000000000..5d07fc6b6 --- /dev/null +++ b/pkg/s3select/simdj/errors.go @@ -0,0 +1,64 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package simdj + +import "fmt" + +type s3Error struct { + code string + message string + statusCode int + cause error +} + +func (err *s3Error) Cause() error { + return err.cause +} + +func (err *s3Error) ErrorCode() string { + return err.code +} + +func (err *s3Error) ErrorMessage() string { + return err.message +} + +func (err *s3Error) HTTPStatusCode() int { + return err.statusCode +} + +func (err *s3Error) Error() string { + return err.message +} + +func errInvalidJSONType(err error) *s3Error { + return &s3Error{ + code: "InvalidJsonType", + message: "The JsonType is invalid. Only DOCUMENT and LINES are supported.", + statusCode: 400, + cause: err, + } +} + +func errJSONParsingError(err error) *s3Error { + return &s3Error{ + code: "JSONParsingError", + message: fmt.Sprintf("Encountered an error parsing the JSON file: %v. Check the file and try again.", err), + statusCode: 400, + cause: err, + } +} diff --git a/pkg/s3select/simdj/reader.go b/pkg/s3select/simdj/reader.go new file mode 100644 index 000000000..8870ab952 --- /dev/null +++ b/pkg/s3select/simdj/reader.go @@ -0,0 +1,187 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package simdj + +import ( + "fmt" + "io" + "sync" + + "github.com/minio/minio/pkg/s3select/json" + "github.com/minio/minio/pkg/s3select/sql" + "github.com/minio/simdjson-go" +) + +// Reader - JSON record reader for S3Select. +type Reader struct { + args *json.ReaderArgs + input chan simdjson.Stream + decoded chan simdjson.Object + + // err will only be returned after decoded has been closed. + err *error + readCloser io.ReadCloser + + exitReader chan struct{} + readerWg sync.WaitGroup +} + +// Read - reads single record. +func (r *Reader) Read(dst sql.Record) (sql.Record, error) { + v, ok := <-r.decoded + if !ok { + if r.err != nil && *r.err != nil { + return nil, errJSONParsingError(*r.err) + } + return nil, io.EOF + } + dstRec, ok := dst.(*Record) + if !ok { + dstRec = &Record{} + } + dstRec.object = v + return dstRec, nil +} + +// Close - closes underlying reader. +func (r *Reader) Close() error { + // Close the input. + // Potentially racy if the stream decoder is still reading. + if r.readCloser != nil { + r.readCloser.Close() + } + if r.exitReader != nil { + close(r.exitReader) + r.readerWg.Wait() + r.exitReader = nil + r.input = nil + } + return nil +} + +// startReader will start a reader that accepts input from r.input. +// Input should be root -> object input. Each root indicates a record. +// If r.input is closed, it is assumed that no more input will come. +// When this function returns r.readerWg will be decremented and r.decoded will be closed. +// On errors, r.err will be set. This should only be accessed after r.decoded has been closed. +func (r *Reader) startReader() { + defer r.readerWg.Done() + defer close(r.decoded) + var tmpObj simdjson.Object + for { + var in simdjson.Stream + select { + case in = <-r.input: + case <-r.exitReader: + return + } + if in.Error != nil && in.Error != io.EOF { + r.err = &in.Error + return + } + if in.Value == nil { + if in.Error == io.EOF { + return + } + continue + } + i := in.Value.Iter() + readloop: + for { + var next simdjson.Iter + typ, err := i.AdvanceIter(&next) + if err != nil { + r.err = &err + return + } + switch typ { + case simdjson.TypeNone: + break readloop + case simdjson.TypeRoot: + typ, obj, err := next.Root(nil) + if err != nil { + r.err = &err + return + } + if typ != simdjson.TypeObject { + if typ == simdjson.TypeNone { + continue + } + err = fmt.Errorf("unexpected json type below root :%v", typ) + r.err = &err + return + } + + o, err := obj.Object(&tmpObj) + if err != nil { + r.err = &err + return + } + select { + case <-r.exitReader: + return + case r.decoded <- *o: + } + default: + err = fmt.Errorf("unexpected root json type:%v", typ) + r.err = &err + return + } + } + if in.Error == io.EOF { + return + } + } +} + +// NewReader - creates new JSON reader using readCloser. +func NewReader(readCloser io.ReadCloser, args *json.ReaderArgs) *Reader { + r := Reader{ + args: args, + readCloser: readCloser, + decoded: make(chan simdjson.Object, 1000), + input: make(chan simdjson.Stream, 2), + exitReader: make(chan struct{}), + } + simdjson.ParseNDStream(readCloser, r.input, nil) + r.readerWg.Add(1) + go r.startReader() + return &r +} + +// NewElementReader - creates new JSON reader using readCloser. +func NewElementReader(ch chan simdjson.Object, err *error, args *json.ReaderArgs) *Reader { + return &Reader{ + args: args, + decoded: ch, + err: err, + readCloser: nil, + } +} + +// NewTapeReaderChan will start a reader that will read input from the provided channel. +func NewTapeReaderChan(pj chan simdjson.Stream, args *json.ReaderArgs) *Reader { + r := Reader{ + args: args, + decoded: make(chan simdjson.Object, 1000), + input: pj, + exitReader: make(chan struct{}), + } + r.readerWg.Add(1) + go r.startReader() + return &r +} diff --git a/pkg/s3select/simdj/reader_test.go b/pkg/s3select/simdj/reader_test.go new file mode 100644 index 000000000..36a296570 --- /dev/null +++ b/pkg/s3select/simdj/reader_test.go @@ -0,0 +1,165 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package simdj + +import ( + "bytes" + "io" + "io/ioutil" + "path/filepath" + "testing" + + "github.com/klauspost/compress/zstd" + "github.com/minio/minio/pkg/s3select/json" + "github.com/minio/simdjson-go" +) + +type tester interface { + Fatal(args ...interface{}) +} + +func loadCompressed(t tester, file string) (js []byte) { + dec, err := zstd.NewReader(nil) + if err != nil { + t.Fatal(err) + } + defer dec.Close() + js, err = ioutil.ReadFile(filepath.Join("testdata", file+".json.zst")) + if err != nil { + t.Fatal(err) + } + js, err = dec.DecodeAll(js, nil) + if err != nil { + t.Fatal(err) + } + + return js +} + +var testCases = []struct { + name string + array bool +}{ + { + name: "parking-citations-10", + }, +} + +func TestNDJSON(t *testing.T) { + for _, tt := range testCases { + + t.Run(tt.name, func(t *testing.T) { + ref := loadCompressed(t, tt.name) + + var err error + dst := make(chan simdjson.Object, 100) + dec := NewElementReader(dst, &err, &json.ReaderArgs{ContentType: "json"}) + pj, err := simdjson.ParseND(ref, nil) + if err != nil { + t.Fatal(err) + } + i := pj.Iter() + cpy := i + b, err := cpy.MarshalJSON() + if err != nil { + t.Fatal(err) + } + if false { + t.Log(string(b)) + } + //_ = ioutil.WriteFile(filepath.Join("testdata", tt.name+".json"), b, os.ModePerm) + + parser: + for { + var next simdjson.Iter + typ, err := i.AdvanceIter(&next) + if err != nil { + t.Fatal(err) + } + switch typ { + case simdjson.TypeNone: + close(dst) + break parser + case simdjson.TypeRoot: + typ, obj, err := next.Root(nil) + if err != nil { + t.Fatal(err) + } + if typ != simdjson.TypeObject { + if typ == simdjson.TypeNone { + close(dst) + break parser + } + t.Fatal("Unexpected type:", typ.String()) + } + + o, err := obj.Object(nil) + if err != nil { + t.Fatal(err) + } + dst <- *o + default: + t.Fatal("unexpected type:", typ.String()) + } + } + refDec := json.NewReader(ioutil.NopCloser(bytes.NewBuffer(ref)), &json.ReaderArgs{ContentType: "json"}) + + for { + rec, err := dec.Read(nil) + if err == io.EOF { + break + } + if err != nil { + t.Error(err) + } + want, err := refDec.Read(nil) + if err != nil { + t.Error(err) + } + var gotB, wantB bytes.Buffer + err = rec.WriteCSV(&gotB, ',') + if err != nil { + t.Error(err) + } + err = want.WriteCSV(&wantB, ',') + if err != nil { + t.Error(err) + } + + if !bytes.Equal(gotB.Bytes(), wantB.Bytes()) { + t.Errorf("CSV output mismatch.\nwant: %s(%x)\ngot: %s(%x)", wantB.String(), wantB.Bytes(), gotB.String(), gotB.Bytes()) + } + gotB.Reset() + wantB.Reset() + + err = rec.WriteJSON(&gotB) + if err != nil { + t.Error(err) + } + err = want.WriteJSON(&wantB) + if err != nil { + t.Error(err) + } + // truncate newline from 'want' + wantB.Truncate(wantB.Len() - 1) + if !bytes.Equal(gotB.Bytes(), wantB.Bytes()) { + t.Errorf("JSON output mismatch.\nwant: %s\ngot: %s", wantB.String(), gotB.String()) + } + } + }) + } +} diff --git a/pkg/s3select/simdj/record.go b/pkg/s3select/simdj/record.go new file mode 100644 index 000000000..d67c56d37 --- /dev/null +++ b/pkg/s3select/simdj/record.go @@ -0,0 +1,228 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package simdj + +import ( + "encoding/csv" + "fmt" + "io" + + "github.com/bcicen/jstream" + "github.com/minio/minio/pkg/s3select/json" + "github.com/minio/minio/pkg/s3select/sql" + "github.com/minio/simdjson-go" +) + +// Record - is JSON record. +type Record struct { + // object + object simdjson.Object +} + +// Get - gets the value for a column name. +func (r *Record) Get(name string) (*sql.Value, error) { + elem := r.object.FindKey(name, nil) + if elem == nil { + return nil, nil + } + return iterToValue(elem.Iter) +} + +func iterToValue(iter simdjson.Iter) (*sql.Value, error) { + switch iter.Type() { + case simdjson.TypeString: + v, err := iter.String() + if err != nil { + return nil, err + } + return sql.FromString(v), nil + case simdjson.TypeFloat: + v, err := iter.Float() + if err != nil { + return nil, err + } + return sql.FromFloat(v), nil + case simdjson.TypeInt: + v, err := iter.Int() + if err != nil { + return nil, err + } + return sql.FromInt(v), nil + case simdjson.TypeUint: + v, err := iter.Int() + if err != nil { + // Can't fit into int, convert to float. + v, err := iter.Float() + return sql.FromFloat(v), err + } + return sql.FromInt(v), nil + case simdjson.TypeBool: + v, err := iter.Bool() + if err != nil { + return nil, err + } + return sql.FromBool(v), nil + case simdjson.TypeNull: + return sql.FromNull(), nil + case simdjson.TypeObject, simdjson.TypeArray: + b, err := iter.MarshalJSON() + return sql.FromBytes(b), err + } + return nil, fmt.Errorf("iterToValue: unknown JSON type: %s", iter.Type().String()) +} + +// Reset the record. +func (r *Record) Reset() { + r.object = simdjson.Object{} +} + +// Clone the record and if possible use the destination provided. +func (r *Record) Clone(dst sql.Record) sql.Record { + other, ok := dst.(*Record) + if !ok { + other = &Record{} + } + other.object = r.object + return other +} + +// CloneTo clones the record to a json Record. +// Values are only unmashaled on object level. +func (r *Record) CloneTo(dst *json.Record) (sql.Record, error) { + if dst == nil { + dst = &json.Record{SelectFormat: sql.SelectFmtJSON} + } + dst.Reset() + elems, err := r.object.Parse(nil) + if err != nil { + return nil, err + } + if cap(dst.KVS) < len(elems.Elements) { + dst.KVS = make(jstream.KVS, 0, len(elems.Elements)) + } + for _, elem := range elems.Elements { + v, err := sql.IterToValue(elem.Iter) + if err != nil { + v, err = elem.Iter.Interface() + if err != nil { + panic(err) + } + } + dst.KVS = append(dst.KVS, jstream.KV{ + Key: elem.Name, + Value: v, + }) + } + return dst, nil +} + +// Set - sets the value for a column name. +func (r *Record) Set(name string, value *sql.Value) (sql.Record, error) { + dst, err := r.CloneTo(nil) + if err != nil { + return nil, err + } + return dst.Set(name, value) +} + +// WriteCSV - encodes to CSV data. +func (r *Record) WriteCSV(writer io.Writer, fieldDelimiter rune) error { + csvRecord := make([]string, 0, 10) + var tmp simdjson.Iter + obj := r.object +allElems: + for { + _, typ, err := obj.NextElement(&tmp) + if err != nil { + return err + } + var columnValue string + switch typ { + case simdjson.TypeNull, simdjson.TypeFloat, simdjson.TypeUint, simdjson.TypeInt, simdjson.TypeBool, simdjson.TypeString: + val, err := tmp.StringCvt() + if err != nil { + return err + } + columnValue = val + case simdjson.TypeObject, simdjson.TypeArray: + b, err := tmp.MarshalJSON() + if err != nil { + return err + } + columnValue = string(b) + case simdjson.TypeNone: + break allElems + default: + return fmt.Errorf("cannot marshal unhandled type: %s", typ.String()) + } + csvRecord = append(csvRecord, columnValue) + } + w := csv.NewWriter(writer) + w.Comma = fieldDelimiter + if err := w.Write(csvRecord); err != nil { + return err + } + w.Flush() + if err := w.Error(); err != nil { + return err + } + + return nil +} + +// Raw - returns the underlying representation. +func (r *Record) Raw() (sql.SelectObjectFormat, interface{}) { + return sql.SelectFmtSIMDJSON, r.object +} + +// WriteJSON - encodes to JSON data. +func (r *Record) WriteJSON(writer io.Writer) error { + o := r.object + elems, err := o.Parse(nil) + if err != nil { + return err + } + b, err := elems.MarshalJSON() + if err != nil { + return err + } + n, err := writer.Write(b) + if err != nil { + return err + } + if n != len(b) { + return io.ErrShortWrite + } + return nil +} + +// Replace the underlying buffer of json data. +func (r *Record) Replace(k interface{}) error { + v, ok := k.(simdjson.Object) + if !ok { + return fmt.Errorf("cannot replace internal data in simd json record with type %T", k) + } + r.object = v + return nil +} + +// NewRecord - creates new empty JSON record. +func NewRecord(f sql.SelectObjectFormat, obj simdjson.Object) *Record { + return &Record{ + object: obj, + } +} diff --git a/pkg/s3select/simdj/testdata/parking-citations-10.json.zst b/pkg/s3select/simdj/testdata/parking-citations-10.json.zst new file mode 100644 index 000000000..c25da3449 Binary files /dev/null and b/pkg/s3select/simdj/testdata/parking-citations-10.json.zst differ diff --git a/pkg/s3select/sql/evaluate.go b/pkg/s3select/sql/evaluate.go index 1a846aa38..a2c9d3b52 100644 --- a/pkg/s3select/sql/evaluate.go +++ b/pkg/s3select/sql/evaluate.go @@ -20,9 +20,11 @@ import ( "encoding/json" "errors" "fmt" + "math" "strings" "github.com/bcicen/jstream" + "github.com/minio/simdjson-go" ) var ( @@ -370,11 +372,9 @@ func (e *JSONPath) evalNode(r Record) (*Value, error) { keypath = ps[1] } } - objFmt, rawVal := r.Raw() - switch objFmt { - case SelectFmtJSON, SelectFmtParquet: - rowVal := rawVal.(jstream.KVS) - + _, rawVal := r.Raw() + switch rowVal := rawVal.(type) { + case jstream.KVS, simdjson.Object: pathExpr := e.PathExpr if len(pathExpr) == 0 { pathExpr = []*JSONPathElement{{Key: &ObjectKey{ID: e.BaseKey}}} @@ -400,6 +400,11 @@ func jsonToValue(result interface{}) (*Value, error) { return FromFloat(rval), nil case int64: return FromInt(rval), nil + case uint64: + if rval <= math.MaxInt64 { + return FromInt(int64(rval)), nil + } + return FromFloat(float64(rval)), nil case bool: return FromBool(rval), nil case jstream.KVS: @@ -418,6 +423,17 @@ func jsonToValue(result interface{}) (*Value, error) { dst[i] = *v } return FromArray(dst), nil + case simdjson.Object: + o := rval + elems, err := o.Parse(nil) + if err != nil { + return nil, err + } + bs, err := elems.MarshalJSON() + if err != nil { + return nil, err + } + return FromBytes(bs), nil case []Value: return FromArray(rval), nil case nil: diff --git a/pkg/s3select/sql/jsonpath.go b/pkg/s3select/sql/jsonpath.go index a2570f432..200b2d204 100644 --- a/pkg/s3select/sql/jsonpath.go +++ b/pkg/s3select/sql/jsonpath.go @@ -20,6 +20,7 @@ import ( "errors" "github.com/bcicen/jstream" + "github.com/minio/simdjson-go" ) var ( @@ -42,17 +43,29 @@ func jsonpathEval(p []*JSONPathElement, v interface{}) (r interface{}, flat bool case p[0].Key != nil: key := p[0].Key.keyString() - kvs, ok := v.(jstream.KVS) - if !ok { + switch kvs := v.(type) { + case jstream.KVS: + for _, kv := range kvs { + if kv.Key == key { + return jsonpathEval(p[1:], kv.Value) + } + } + // Key not found - return nil result + return nil, false, nil + case simdjson.Object: + elem := kvs.FindKey(key, nil) + if elem == nil { + // Key not found - return nil result + return nil, false, nil + } + val, err := IterToValue(elem.Iter) + if err != nil { + return nil, false, err + } + return jsonpathEval(p[1:], val) + default: return nil, false, errKeyLookup } - for _, kv := range kvs { - if kv.Key == key { - return jsonpathEval(p[1:], kv.Value) - } - } - // Key not found - return nil result - return nil, false, nil case p[0].Index != nil: idx := *p[0].Index @@ -68,17 +81,23 @@ func jsonpathEval(p []*JSONPathElement, v interface{}) (r interface{}, flat bool return jsonpathEval(p[1:], arr[idx]) case p[0].ObjectWildcard: - kvs, ok := v.(jstream.KVS) - if !ok { + switch kvs := v.(type) { + case jstream.KVS: + if len(p[1:]) > 0 { + return nil, false, errWilcardObjectUsageInvalid + } + + return kvs, false, nil + case simdjson.Object: + if len(p[1:]) > 0 { + return nil, false, errWilcardObjectUsageInvalid + } + + return kvs, false, nil + default: return nil, false, errWildcardObjectLookup } - if len(p[1:]) > 0 { - return nil, false, errWilcardObjectUsageInvalid - } - - return kvs, false, nil - case p[0].ArrayWildcard: arr, ok := v.([]interface{}) if !ok { diff --git a/pkg/s3select/sql/record.go b/pkg/s3select/sql/record.go index 2f3c8d215..4f34d73ee 100644 --- a/pkg/s3select/sql/record.go +++ b/pkg/s3select/sql/record.go @@ -17,9 +17,10 @@ package sql import ( + "fmt" "io" - "github.com/bcicen/jstream" + "github.com/minio/simdjson-go" ) // SelectObjectFormat specifies the format of the underlying data @@ -32,6 +33,8 @@ const ( SelectFmtCSV // SelectFmtJSON - JSON format SelectFmtJSON + // SelectFmtSIMDJSON - SIMD JSON format + SelectFmtSIMDJSON // SelectFmtParquet - Parquet format SelectFmtParquet ) @@ -39,7 +42,10 @@ const ( // Record - is a type containing columns and their values. type Record interface { Get(name string) (*Value, error) - Set(name string, value *Value) error + + // Set a value. + // Can return a different record type. + Set(name string, value *Value) (Record, error) WriteCSV(writer io.Writer, fieldDelimiter rune) error WriteJSON(writer io.Writer) error @@ -51,5 +57,77 @@ type Record interface { Raw() (SelectObjectFormat, interface{}) // Replaces the underlying data - Replace(k jstream.KVS) error + Replace(k interface{}) error +} + +// IterToValue converts a simdjson Iter to its underlying value. +// Objects are returned as simdjson.Object +// Arrays are returned as []interface{} with parsed values. +func IterToValue(iter simdjson.Iter) (interface{}, error) { + switch iter.Type() { + case simdjson.TypeString: + v, err := iter.String() + if err != nil { + return nil, err + } + return v, nil + case simdjson.TypeFloat: + v, err := iter.Float() + if err != nil { + return nil, err + } + return v, nil + case simdjson.TypeInt: + v, err := iter.Int() + if err != nil { + return nil, err + } + return v, nil + case simdjson.TypeUint: + v, err := iter.Int() + if err != nil { + // Can't fit into int, convert to float. + v, err := iter.Float() + return v, err + } + return v, nil + case simdjson.TypeBool: + v, err := iter.Bool() + if err != nil { + return nil, err + } + return v, nil + case simdjson.TypeObject: + obj, err := iter.Object(nil) + if err != nil { + return nil, err + } + return *obj, err + case simdjson.TypeArray: + arr, err := iter.Array(nil) + if err != nil { + return nil, err + } + iter := arr.Iter() + var dst []interface{} + var next simdjson.Iter + for { + typ, err := iter.AdvanceIter(&next) + if err != nil { + return nil, err + } + if typ == simdjson.TypeNone { + break + } + v, err := IterToValue(next) + if err != nil { + return nil, err + } + dst = append(dst, v) + } + return dst, err + case simdjson.TypeNull: + return nil, nil + } + return nil, fmt.Errorf("IterToValue: unknown JSON type: %s", iter.Type().String()) } diff --git a/pkg/s3select/sql/statement.go b/pkg/s3select/sql/statement.go index 8db06244f..e418410b2 100644 --- a/pkg/s3select/sql/statement.go +++ b/pkg/s3select/sql/statement.go @@ -22,6 +22,7 @@ import ( "strings" "github.com/bcicen/jstream" + "github.com/minio/simdjson-go" ) var ( @@ -140,36 +141,56 @@ func parseLimit(v *LitValue) (int64, error) { // EvalFrom evaluates the From clause on the input record. It only // applies to JSON input data format (currently). func (e *SelectStatement) EvalFrom(format string, input Record) (Record, error) { - if e.selectAST.From.HasKeypath() { - if format == "json" { - objFmt, rawVal := input.Raw() - if objFmt != SelectFmtJSON { - return nil, errDataSource(errors.New("unexpected non JSON input")) - } + if !e.selectAST.From.HasKeypath() { + return input, nil + } + _, rawVal := input.Raw() - jsonRec := rawVal.(jstream.KVS) - txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], jsonRec) + if format != "json" { + return nil, errDataSource(errors.New("path not supported")) + } + switch rec := rawVal.(type) { + case jstream.KVS: + txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec) + if err != nil { + return nil, err + } + + var kvs jstream.KVS + switch v := txedRec.(type) { + case jstream.KVS: + kvs = v + default: + kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}} + } + + if err = input.Replace(kvs); err != nil { + return nil, err + } + + return input, nil + case simdjson.Object: + txedRec, _, err := jsonpathEval(e.selectAST.From.Table.PathExpr[1:], rec) + if err != nil { + return nil, err + } + + switch v := txedRec.(type) { + case simdjson.Object: + err := input.Replace(v) if err != nil { return nil, err } - - var kvs jstream.KVS - switch v := txedRec.(type) { - case jstream.KVS: - kvs = v - default: - kvs = jstream.KVS{jstream.KV{Key: "_1", Value: v}} - } - - if err = input.Replace(kvs); err != nil { + default: + input.Reset() + input, err = input.Set("_1", &Value{value: v}) + if err != nil { return nil, err } - - return input, nil } - return nil, errDataSource(errors.New("path not supported")) + return input, nil } - return input, nil + return nil, errDataSource(errors.New("unexpected non JSON input")) } // IsAggregated returns if the statement involves SQL aggregation @@ -186,9 +207,12 @@ func (e *SelectStatement) AggregateResult(output Record) error { return err } if expr.As != "" { - output.Set(expr.As, v) + output, err = output.Set(expr.As, v) } else { - output.Set(fmt.Sprintf("_%d", i+1), v) + output, err = output.Set(fmt.Sprintf("_%d", i+1), v) + } + if err != nil { + return err } } return nil @@ -250,8 +274,7 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) { if e.limitValue > -1 { e.outputCount++ } - output = input.Clone(output) - return output, nil + return input.Clone(output), nil } for i, expr := range e.selectAST.Expression.Expressions { @@ -262,11 +285,14 @@ func (e *SelectStatement) Eval(input, output Record) (Record, error) { // Pick output column names if expr.As != "" { - output.Set(expr.As, v) + output, err = output.Set(expr.As, v) } else if comp, ok := getLastKeypathComponent(expr.Expression); ok { - output.Set(comp, v) + output, err = output.Set(comp, v) } else { - output.Set(fmt.Sprintf("_%d", i+1), v) + output, err = output.Set(fmt.Sprintf("_%d", i+1), v) + } + if err != nil { + return nil, err } }