From dca7cf7200b86424231cb2a5101f28690fa68d7a Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sat, 3 Apr 2021 17:25:19 +0200 Subject: [PATCH] select: Support Parquet dates (#11928) Pass schema to parser to support dates. Fixes #11926 --- pkg/s3select/internal/parquet-go/column.go | 15 +- pkg/s3select/internal/parquet-go/reader.go | 10 +- .../internal/parquet-go/reader_test.go | 10 +- pkg/s3select/parquet/reader.go | 16 ++ pkg/s3select/select_test.go | 242 +++++++++++++++++- .../testdata/lineitem_shipdate.parquet | Bin 0 -> 213 bytes pkg/s3select/{ => testdata}/testdata.parquet | Bin 7 files changed, 280 insertions(+), 13 deletions(-) create mode 100644 pkg/s3select/testdata/lineitem_shipdate.parquet rename pkg/s3select/{ => testdata}/testdata.parquet (100%) diff --git a/pkg/s3select/internal/parquet-go/column.go b/pkg/s3select/internal/parquet-go/column.go index cb2f0b7fc..3ed3c1e8d 100644 --- a/pkg/s3select/internal/parquet-go/column.go +++ b/pkg/s3select/internal/parquet-go/column.go @@ -67,10 +67,18 @@ func getColumns( if nameColumnMap == nil { nameColumnMap = make(map[string]*column) } + var se *parquet.SchemaElement + for _, schema := range schemaElements { + if schema != nil && schema.Name == columnName { + se = schema + break + } + } nameColumnMap[columnName] = &column{ name: columnName, metadata: meta, + schema: se, schemaElements: schemaElements, rc: rc, thriftReader: thriftReader, @@ -95,6 +103,7 @@ type column struct { valueIndex int valueType parquet.Type metadata *parquet.ColumnMetaData + schema *parquet.SchemaElement schemaElements []*parquet.SchemaElement nameIndexMap map[string]int dictPage *page @@ -140,14 +149,14 @@ func (column *column) readPage() { column.dataTable.Merge(page.DataTable) } -func (column *column) read() (value interface{}, valueType parquet.Type) { +func (column *column) read() (value interface{}, valueType parquet.Type, cnv *parquet.SchemaElement) { if column.dataTable == nil { column.readPage() column.valueIndex = 0 } if column.endOfValues { - return nil, column.metadata.GetType() + return nil, column.metadata.GetType(), column.schema } value = column.dataTable.Values[column.valueIndex] @@ -156,5 +165,5 @@ func (column *column) read() (value interface{}, valueType parquet.Type) { column.dataTable = nil } - return value, column.metadata.GetType() + return value, column.metadata.GetType(), column.schema } diff --git a/pkg/s3select/internal/parquet-go/reader.go b/pkg/s3select/internal/parquet-go/reader.go index ad38f8036..c757503d3 100644 --- a/pkg/s3select/internal/parquet-go/reader.go +++ b/pkg/s3select/internal/parquet-go/reader.go @@ -72,8 +72,9 @@ func fileMetadata(getReaderFunc GetReaderFunc) (*parquet.FileMetaData, error) { // Value - denotes column value type Value struct { - Value interface{} - Type parquet.Type + Value interface{} + Type parquet.Type + Schema *parquet.SchemaElement } // MarshalJSON - encodes to JSON data @@ -144,8 +145,9 @@ func (reader *Reader) Read() (record *Record, err error) { record = newRecord(reader.nameList) for name := range reader.columns { - value, valueType := reader.columns[name].read() - record.set(name, Value{value, valueType}) + col := reader.columns[name] + value, valueType, schema := col.read() + record.set(name, Value{Value: value, Type: valueType, Schema: schema}) } reader.rowIndex++ diff --git a/pkg/s3select/internal/parquet-go/reader_test.go b/pkg/s3select/internal/parquet-go/reader_test.go index 5f0221d92..c3886ade8 100644 --- a/pkg/s3select/internal/parquet-go/reader_test.go +++ b/pkg/s3select/internal/parquet-go/reader_test.go @@ -59,9 +59,9 @@ func TestReader(t *testing.T) { } expectedRecords := []string{ - `map[one:{-1 DOUBLE} three:{true BOOLEAN} two:{[102 111 111] BYTE_ARRAY}]`, - `map[one:{ DOUBLE} three:{false BOOLEAN} two:{[98 97 114] BYTE_ARRAY}]`, - `map[one:{2.5 DOUBLE} three:{true BOOLEAN} two:{[98 97 122] BYTE_ARRAY}]`, + `map[one:{-1 DOUBLE SchemaElement({Type:DOUBLE TypeLength: RepetitionType:OPTIONAL Name:one NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} three:{true BOOLEAN SchemaElement({Type:BOOLEAN TypeLength: RepetitionType:OPTIONAL Name:three NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} two:{[102 111 111] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength: RepetitionType:OPTIONAL Name:two NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})}]`, + `map[one:{ DOUBLE SchemaElement({Type:DOUBLE TypeLength: RepetitionType:OPTIONAL Name:one NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} three:{false BOOLEAN SchemaElement({Type:BOOLEAN TypeLength: RepetitionType:OPTIONAL Name:three NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} two:{[98 97 114] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength: RepetitionType:OPTIONAL Name:two NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})}]`, + `map[one:{2.5 DOUBLE SchemaElement({Type:DOUBLE TypeLength: RepetitionType:OPTIONAL Name:one NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} three:{true BOOLEAN SchemaElement({Type:BOOLEAN TypeLength: RepetitionType:OPTIONAL Name:three NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})} two:{[98 97 122] BYTE_ARRAY SchemaElement({Type:BYTE_ARRAY TypeLength: RepetitionType:OPTIONAL Name:two NumChildren: ConvertedType: Scale: Precision: FieldID: LogicalType:})}]`, } i := 0 @@ -76,11 +76,11 @@ func TestReader(t *testing.T) { } if i == len(expectedRecords) { - t.Fatalf("read more than expected record count %v", len(expectedRecords)) + t.Errorf("read more than expected record count %v", len(expectedRecords)) } if record.String() != expectedRecords[i] { - t.Fatalf("record%v: expected: %v, got: %v", i+1, expectedRecords[i], record.String()) + t.Errorf("record%v: expected: %v, got: %v", i+1, expectedRecords[i], record.String()) } i++ diff --git a/pkg/s3select/parquet/reader.go b/pkg/s3select/parquet/reader.go index 3476faef7..bc9557273 100644 --- a/pkg/s3select/parquet/reader.go +++ b/pkg/s3select/parquet/reader.go @@ -19,6 +19,7 @@ package parquet import ( "fmt" "io" + "time" "github.com/bcicen/jstream" parquetgo "github.com/minio/minio/pkg/s3select/internal/parquet-go" @@ -63,8 +64,23 @@ func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) { value = v.Value.(bool) case parquetgen.Type_INT32: value = int64(v.Value.(int32)) + if v.Schema != nil && v.Schema.ConvertedType != nil { + switch *v.Schema.ConvertedType { + case parquetgen.ConvertedType_DATE: + value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(v.Value.(int32)), 0).UTC()) + } + } case parquetgen.Type_INT64: value = v.Value.(int64) + if v.Schema != nil && v.Schema.ConvertedType != nil { + switch *v.Schema.ConvertedType { + // Only UTC supported, add one NS to never be exactly midnight. + case parquetgen.ConvertedType_TIMESTAMP_MILLIS: + value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Millisecond).UTC()) + case parquetgen.ConvertedType_TIMESTAMP_MICROS: + value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Microsecond).UTC()) + } + } case parquetgen.Type_FLOAT: value = float64(v.Value.(float32)) case parquetgen.Type_DOUBLE: diff --git a/pkg/s3select/select_test.go b/pkg/s3select/select_test.go index 2ef6be2e0..407300f4c 100644 --- a/pkg/s3select/select_test.go +++ b/pkg/s3select/select_test.go @@ -1069,7 +1069,7 @@ func TestParquetInput(t *testing.T) { for i, testCase := range testTable { t.Run(fmt.Sprint(i), func(t *testing.T) { getReader := func(offset int64, length int64) (io.ReadCloser, error) { - testdataFile := "testdata.parquet" + testdataFile := "testdata/testdata.parquet" file, err := os.Open(testdataFile) if err != nil { return nil, err @@ -1126,3 +1126,243 @@ func TestParquetInput(t *testing.T) { }) } } + +func TestParquetInputSchema(t *testing.T) { + os.Setenv("MINIO_API_SELECT_PARQUET", "on") + defer os.Setenv("MINIO_API_SELECT_PARQUET", "off") + + var testTable = []struct { + requestXML []byte + wantResult string + }{ + { + requestXML: []byte(` + + + SELECT * FROM S3Object LIMIT 5 + SQL + + NONE + + + + + + + + + FALSE + + +`), wantResult: `{"shipdate":"1996-03-13T"} +{"shipdate":"1996-04-12T"} +{"shipdate":"1996-01-29T"} +{"shipdate":"1996-04-21T"} +{"shipdate":"1996-03-30T"}`, + }, + { + requestXML: []byte(` + + + SELECT DATE_ADD(day, 2, shipdate) as shipdate FROM S3Object LIMIT 5 + SQL + + NONE + + + + + + + + + FALSE + + +`), wantResult: `{"shipdate":"1996-03-15T"} +{"shipdate":"1996-04-14T"} +{"shipdate":"1996-01-31T"} +{"shipdate":"1996-04-23T"} +{"shipdate":"1996-04T"}`, + }, + } + + for i, testCase := range testTable { + t.Run(fmt.Sprint(i), func(t *testing.T) { + getReader := func(offset int64, length int64) (io.ReadCloser, error) { + testdataFile := "testdata/lineitem_shipdate.parquet" + file, err := os.Open(testdataFile) + if err != nil { + return nil, err + } + + fi, err := file.Stat() + if err != nil { + return nil, err + } + + if offset < 0 { + offset = fi.Size() + offset + } + + if _, err = file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } + + return file, nil + } + + s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) + if err != nil { + t.Fatal(err) + } + + if err = s3Select.Open(getReader); err != nil { + t.Fatal(err) + } + + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + gotS := strings.TrimSpace(string(got)) + if !reflect.DeepEqual(gotS, testCase.wantResult) { + t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.requestXML, gotS, testCase.wantResult) + } + + }) + } +} + +func TestParquetInputSchemaCSV(t *testing.T) { + os.Setenv("MINIO_API_SELECT_PARQUET", "on") + defer os.Setenv("MINIO_API_SELECT_PARQUET", "off") + + var testTable = []struct { + requestXML []byte + wantResult string + }{ + { + requestXML: []byte(` + + + SELECT * FROM S3Object LIMIT 5 + SQL + + NONE + + + + + + + + FALSE + + +`), wantResult: `1996-03-13T +1996-04-12T +1996-01-29T +1996-04-21T +1996-03-30T`, + }, + { + requestXML: []byte(` + + + SELECT DATE_ADD(day, 2, shipdate) as shipdate FROM S3Object LIMIT 5 + SQL + + NONE + + + + + + + + FALSE + + +`), wantResult: `1996-03-15T +1996-04-14T +1996-01-31T +1996-04-23T +1996-04T`, + }, + } + + for i, testCase := range testTable { + t.Run(fmt.Sprint(i), func(t *testing.T) { + getReader := func(offset int64, length int64) (io.ReadCloser, error) { + testdataFile := "testdata/lineitem_shipdate.parquet" + file, err := os.Open(testdataFile) + if err != nil { + return nil, err + } + + fi, err := file.Stat() + if err != nil { + return nil, err + } + + if offset < 0 { + offset = fi.Size() + offset + } + + if _, err = file.Seek(offset, io.SeekStart); err != nil { + return nil, err + } + + return file, nil + } + + s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) + if err != nil { + t.Fatal(err) + } + + if err = s3Select.Open(getReader); err != nil { + t.Fatal(err) + } + + w := &testResponseWriter{} + s3Select.Evaluate(w) + s3Select.Close() + resp := http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader(w.response)), + ContentLength: int64(len(w.response)), + } + res, err := minio.NewSelectResults(&resp, "testbucket") + if err != nil { + t.Error(err) + return + } + got, err := ioutil.ReadAll(res) + if err != nil { + t.Error(err) + return + } + gotS := strings.TrimSpace(string(got)) + if !reflect.DeepEqual(gotS, testCase.wantResult) { + t.Errorf("received response does not match with expected reply. Query: %s\ngot: %s\nwant:%s", testCase.requestXML, gotS, testCase.wantResult) + } + + }) + } +} diff --git a/pkg/s3select/testdata/lineitem_shipdate.parquet b/pkg/s3select/testdata/lineitem_shipdate.parquet new file mode 100644 index 0000000000000000000000000000000000000000..17e8518b3b7b76b22b2010d906dff25eebcf60d4 GIT binary patch literal 213 zcmWG=3^EjD5RDPd(-9Q`GT1~pWF%M?sxdG~uy86dFo-b#fqXXyh+<%1PEJZl5J>b1 zNKIJ5oFuG}$NtDk!Ho4$YZebrp%)L&9EUgDM;NpM`o8J`Rf{r7>UeNxWR|7I7bj<= z<|c|V0mW3ABshvQG7C}?OHx&N7{o**Wh7