diff --git a/cmd/erasure-server-pool.go b/cmd/erasure-server-pool.go index 35778fbbc..e70b95c7c 100644 --- a/cmd/erasure-server-pool.go +++ b/cmd/erasure-server-pool.go @@ -25,6 +25,7 @@ import ( "io" "math/rand" "net/http" + "path" "sort" "strconv" "strings" @@ -1323,6 +1324,85 @@ func (z *erasureServerPools) ListObjects(ctx context.Context, bucket, prefix, ma } opts.setBucketMeta(ctx) + ri := logger.GetReqInfo(ctx) + hadoop := ri != nil && strings.Contains(ri.UserAgent, `Hadoop `) && strings.Contains(ri.UserAgent, "scala/") + matches := func() bool { + if prefix == "" { + return false + } + // List of standard files supported by s3a + // that involves a List() on a directory + // where directory is actually an object on + // namespace. + for _, k := range []string{ + "_SUCCESS/", + ".parquet/", + ".csv/", + ".json/", + ".avro/", + ".orc/", + ".txt/", + // Add any other files in future + } { + if strings.HasSuffix(prefix, k) { + return true + } + } + return false + } + if hadoop && matches() && delimiter == SlashSeparator && maxKeys == 2 && marker == "" { + // Optimization for Spark/Hadoop workload where spark sends a garbage + // request of this kind + // + // GET /testbucket/?list-type=2&delimiter=%2F&max-keys=2&prefix=parquet%2F_SUCCESS%2F&fetch-owner=false + // + // Here spark is expecting that the List() return empty instead, so from MinIO's point + // of view if we simply do a GetObjectInfo() on this prefix by treating it as an object + // We save a lot of calls over the network. + // + // This happens repeatedly for all objects that are created concurrently() avoiding this + // as a List() call is an important performance improvement. + // + // Spark based s3a committers are a big enough use-case to have this optimization. + // + // A sample code to see the improvements is as follows, this sample code is + // simply a read on JSON from MinIO and write it back as "parquet". + // + // import org.apache.spark.sql.SparkSession + // import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + // object SparkJSONRead { + // def main(args: Array[String]): Unit = { + // val spark:SparkSession = SparkSession.builder() + // .appName("SparkByExample") + // .master("local[1]").getOrCreate() + // + // spark.sparkContext.setLogLevel("ERROR") + // spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://minio-lb:9000") + // spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true") + // spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "minioadmin") + // spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "minioadmin") + // + // val df = spark.read.json("s3a://testbucket/s3.json") + // + // df.write.parquet("s3a://testbucket/parquet/") + // } + // } + objInfo, err := z.GetObjectInfo(ctx, bucket, path.Dir(prefix), ObjectOptions{NoLock: true}) + if err == nil { + if opts.Lifecycle != nil { + evt := evalActionFromLifecycle(ctx, *opts.Lifecycle, opts.Retention, objInfo) + if evt.Action.Delete() { + globalExpiryState.enqueueByDays(objInfo, evt, lcEventSrc_s3ListObjects) + if !evt.Action.DeleteRestored() { + // Skip entry if ILM action was DeleteVersionAction or DeleteAction + return loi, nil + } + } + } + return loi, nil + } + } + if len(prefix) > 0 && maxKeys == 1 && marker == "" { // Optimization for certain applications like // - Cohesity