From f7312ad70600a8062d90cb4ae99edaaa0ddc254c Mon Sep 17 00:00:00 2001 From: PA Savalle Date: Mon, 15 Jun 2026 13:48:22 +0200 Subject: [PATCH 1/2] [SPARK-57467][SCHEDULER] Reuse identical resource profiles when available --- .../main/scala/org/apache/spark/rdd/RDD.scala | 5 ++- .../resource/ResourceProfileManager.scala | 38 +++++++++++++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 15 ++------ .../ResourceProfileManagerSuite.scala | 31 +++++++++++++++ .../connect/planner/SparkConnectPlanner.scala | 8 ++-- 5 files changed, 77 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5500f085de1e3..75a418d11f8de 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1867,8 +1867,9 @@ abstract class RDD[T: ClassTag]( @Experimental @Since("3.1.0") def withResources(rp: ResourceProfile): this.type = { - resourceProfile = Option(rp) - sc.resourceProfileManager.addResourceProfile(resourceProfile.get) + // Reuse an already-registered profile with equal resources if one exists so that equivalent + // profiles share a single id, allowing executors to be reused instead of newly allocated. + resourceProfile = Option(sc.resourceProfileManager.getOrAddEquivalentProfile(rp)) this } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index a3d76a92ddd8b..671ec5208a7e4 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -139,13 +139,43 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, } // do this outside the write lock only when we add a new profile if (putNewProfile) { - // force the computation of maxTasks and limitingResource now so we don't have cost later - rp.limitingResource(sparkConf) - logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}") - listenerBus.post(SparkListenerResourceProfileAdded(rp)) + onProfileAdded(rp) } } + /** + * Get the registered ResourceProfile whose resources are equal to the given one, registering + * the given profile first if no equivalent one exists yet. + */ + def getOrAddEquivalentProfile(rp: ResourceProfile): ResourceProfile = { + isSupported(rp) + var addedProfile: Option[ResourceProfile] = None + val resolvedProfile = { + writeLock.lock() + try { + resourceProfileIdToResourceProfile.collectFirst { + case (_, existing) if existing.resourcesEqual(rp) => existing + }.getOrElse { + resourceProfileIdToResourceProfile.put(rp.id, rp) + addedProfile = Some(rp) + rp + } + } finally { + writeLock.unlock() + } + } + // do this outside the write lock only when we add a new profile + addedProfile.foreach(onProfileAdded) + resolvedProfile + } + + private def onProfileAdded(rp: ResourceProfile): Unit = { + // force the computation of maxTasks and limitingResource now so we don't have cost later + rp.limitingResource(sparkConf) + logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}") + listenerBus.post(SparkListenerResourceProfileAdded(rp)) + } + /* * Gets the ResourceProfile associated with the id, if a profile doesn't exist * it returns the default ResourceProfile created from the application level configs. diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 22720b98aafde..369dd71261990 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -645,17 +645,10 @@ private[spark] class DAGScheduler( val startResourceProfile = stageResourceProfiles.head val mergedProfile = stageResourceProfiles.drop(1) .foldLeft(startResourceProfile)((a, b) => mergeResourceProfiles(a, b)) - // compared merged profile with existing ones so we don't add it over and over again - // if the user runs the same operation multiple times - val resProfile = sc.resourceProfileManager.getEquivalentProfile(mergedProfile) - resProfile match { - case Some(existingRp) => existingRp - case None => - // this ResourceProfile could be different if it was merged so we have to add it to - // our ResourceProfileManager - sc.resourceProfileManager.addResourceProfile(mergedProfile) - mergedProfile - } + // compare the merged profile with existing ones so we don't add it over and over again + // if the user runs the same operation multiple times. The merged ResourceProfile could + // be different from any existing one, in which case it is registered here. + sc.resourceProfileManager.getOrAddEquivalentProfile(mergedProfile) } else { throw new IllegalArgumentException("Multiple ResourceProfiles specified in the RDDs for " + "this stage, either resolve the conflicting ResourceProfiles yourself or enable " + diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala index ab57f1c801b0e..f2a367e62ada9 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala @@ -220,4 +220,35 @@ class ResourceProfileManagerSuite extends SparkFunSuite { assert(equivProf.nonEmpty) assert(equivProf.get.id == rpAlreadyExist.get.id, s"resourceProfile should have existed") } + + test("getOrAddEquivalentProfile reuses an equivalent profile") { + val conf = new SparkConf().set(EXECUTOR_CORES, 4) + val rpmanager = new ResourceProfileManager(conf, listenerBus) + + def buildProfile(cores: Int): ResourceProfile = { + val rprofBuilder = new ResourceProfileBuilder() + val ereqs = new ExecutorResourceRequests() + ereqs.cores(cores).memory("4g").memoryOverhead("2000m") + val treqs = new TaskResourceRequests() + treqs.cpus(1) + rprofBuilder.require(ereqs).require(treqs).build() + } + + val first = buildProfile(8) + val registered = rpmanager.getOrAddEquivalentProfile(first) + // A brand-new profile is registered and returned as-is. + assert(registered.id == first.id) + + // A distinct profile object with equal resources resolves to the already-registered one, + // so they share a single id and can therefore reuse the same executors. + val equivalent = buildProfile(8) + assert(equivalent.id != first.id, "the new profile object should have a different id") + val resolved = rpmanager.getOrAddEquivalentProfile(equivalent) + assert(resolved.id == first.id, "equivalent profile should resolve to the existing id") + + // A profile with different resources is registered under its own id. + val different = buildProfile(16) + val resolvedDifferent = rpmanager.getOrAddEquivalentProfile(different) + assert(resolvedDifferent.id == different.id) + } } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index b067efa0579a9..79c47ddaaeb23 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3948,13 +3948,15 @@ class SparkConnectPlanner( name -> new TaskResourceRequest(res.getResourceName, res.getAmount) }.toMap - // Create ResourceProfile add add it to ResourceProfileManager - val profile = if (ereqs.isEmpty) { + // Create the ResourceProfile and register it, reusing an already-registered profile with + // equal resources if one exists so that equivalent profiles share a single id (and thus + // can reuse the same executors instead of triggering new allocations). + val newProfile = if (ereqs.isEmpty) { new TaskResourceProfile(treqs) } else { new ResourceProfile(ereqs, treqs) } - session.sparkContext.resourceProfileManager.addResourceProfile(profile) + val profile = session.sparkContext.resourceProfileManager.getOrAddEquivalentProfile(newProfile) executeHolder.eventsManager.postFinished() responseObserver.onNext( From 0253787a47e44a12f76d5040e94b531b210d37d2 Mon Sep 17 00:00:00 2001 From: PA Savalle Date: Wed, 17 Jun 2026 21:25:50 +0200 Subject: [PATCH 2/2] remove unused getEquivalentProfile --- .../resource/ResourceProfileManager.scala | 15 --------- .../ResourceProfileManagerSuite.scala | 33 ------------------- 2 files changed, 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 671ec5208a7e4..497b1f2214110 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -190,19 +190,4 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, readLock.unlock() } } - - /* - * If the ResourceProfile passed in is equivalent to an existing one, return the - * existing one, other return None - */ - def getEquivalentProfile(rp: ResourceProfile): Option[ResourceProfile] = { - readLock.lock() - try { - resourceProfileIdToResourceProfile.find { case (_, rpEntry) => - rpEntry.resourcesEqual(rp) - }.map(_._2) - } finally { - readLock.unlock() - } - } } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala index f2a367e62ada9..822c727f9acb7 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala @@ -188,39 +188,6 @@ class ResourceProfileManagerSuite extends SparkFunSuite { " with dynamic allocation")) } - test("ResourceProfileManager has equivalent profile") { - val conf = new SparkConf().set(EXECUTOR_CORES, 4) - val rpmanager = new ResourceProfileManager(conf, listenerBus) - var rpAlreadyExist: Option[ResourceProfile] = None - val checkId = 500 - for (i <- 1 to 1000) { - val rprofBuilder = new ResourceProfileBuilder() - val ereqs = new ExecutorResourceRequests() - ereqs.cores(i).memory("4g").memoryOverhead("2000m") - val treqs = new TaskResourceRequests() - treqs.cpus(i) - rprofBuilder.require(ereqs).require(treqs) - val rprof = rprofBuilder.build() - rpmanager.addResourceProfile(rprof) - if (i == checkId) rpAlreadyExist = Some(rprof) - } - val rpNotMatch = new ResourceProfileBuilder().build() - assert(rpmanager.getEquivalentProfile(rpNotMatch).isEmpty, - s"resourceProfile should not have existed") - - val rprofBuilder = new ResourceProfileBuilder() - val ereqs = new ExecutorResourceRequests() - ereqs.cores(checkId).memory("4g").memoryOverhead("2000m") - val treqs = new TaskResourceRequests() - treqs.cpus(checkId) - rprofBuilder.require(ereqs).require(treqs) - val rpShouldMatch = rprofBuilder.build() - - val equivProf = rpmanager.getEquivalentProfile(rpShouldMatch) - assert(equivProf.nonEmpty) - assert(equivProf.get.id == rpAlreadyExist.get.id, s"resourceProfile should have existed") - } - test("getOrAddEquivalentProfile reuses an equivalent profile") { val conf = new SparkConf().set(EXECUTOR_CORES, 4) val rpmanager = new ResourceProfileManager(conf, listenerBus)