Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1867,8 +1867,9 @@ abstract class RDD[T: ClassTag](
@Experimental
@Since("3.1.0")
def withResources(rp: ResourceProfile): this.type = {
resourceProfile = Option(rp)
sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
// Reuse an already-registered profile with equal resources if one exists so that equivalent
// profiles share a single id, allowing executors to be reused instead of newly allocated.
resourceProfile = Option(sc.resourceProfileManager.getOrAddEquivalentProfile(rp))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,43 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
}
// do this outside the write lock only when we add a new profile
if (putNewProfile) {
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}")
listenerBus.post(SparkListenerResourceProfileAdded(rp))
onProfileAdded(rp)
}
}

/**
* Get the registered ResourceProfile whose resources are equal to the given one, registering
* the given profile first if no equivalent one exists yet.
*/
def getOrAddEquivalentProfile(rp: ResourceProfile): ResourceProfile = {
isSupported(rp)
var addedProfile: Option[ResourceProfile] = None
val resolvedProfile = {
writeLock.lock()
try {
resourceProfileIdToResourceProfile.collectFirst {
case (_, existing) if existing.resourcesEqual(rp) => existing
}.getOrElse {
resourceProfileIdToResourceProfile.put(rp.id, rp)
addedProfile = Some(rp)
rp
}
} finally {
writeLock.unlock()
}
}
// do this outside the write lock only when we add a new profile
addedProfile.foreach(onProfileAdded)
resolvedProfile
}

private def onProfileAdded(rp: ResourceProfile): Unit = {
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
logInfo(log"Added ResourceProfile id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rp.id)}")
listenerBus.post(SparkListenerResourceProfileAdded(rp))
}

/*
* Gets the ResourceProfile associated with the id, if a profile doesn't exist
* it returns the default ResourceProfile created from the application level configs.
Expand All @@ -160,19 +190,4 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
readLock.unlock()
}
}

/*
* If the ResourceProfile passed in is equivalent to an existing one, return the
* existing one, other return None
*/
def getEquivalentProfile(rp: ResourceProfile): Option[ResourceProfile] = {
readLock.lock()
try {
resourceProfileIdToResourceProfile.find { case (_, rpEntry) =>
rpEntry.resourcesEqual(rp)
}.map(_._2)
} finally {
readLock.unlock()
}
}
}
15 changes: 4 additions & 11 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -645,17 +645,10 @@ private[spark] class DAGScheduler(
val startResourceProfile = stageResourceProfiles.head
val mergedProfile = stageResourceProfiles.drop(1)
.foldLeft(startResourceProfile)((a, b) => mergeResourceProfiles(a, b))
// compared merged profile with existing ones so we don't add it over and over again
// if the user runs the same operation multiple times
val resProfile = sc.resourceProfileManager.getEquivalentProfile(mergedProfile)
Comment thread
uros-b marked this conversation as resolved.
resProfile match {
case Some(existingRp) => existingRp
case None =>
// this ResourceProfile could be different if it was merged so we have to add it to
// our ResourceProfileManager
sc.resourceProfileManager.addResourceProfile(mergedProfile)
mergedProfile
}
// compare the merged profile with existing ones so we don't add it over and over again
// if the user runs the same operation multiple times. The merged ResourceProfile could
// be different from any existing one, in which case it is registered here.
sc.resourceProfileManager.getOrAddEquivalentProfile(mergedProfile)
} else {
throw new IllegalArgumentException("Multiple ResourceProfiles specified in the RDDs for " +
"this stage, either resolve the conflicting ResourceProfiles yourself or enable " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,34 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
" with dynamic allocation"))
}

test("ResourceProfileManager has equivalent profile") {
test("getOrAddEquivalentProfile reuses an equivalent profile") {
val conf = new SparkConf().set(EXECUTOR_CORES, 4)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
var rpAlreadyExist: Option[ResourceProfile] = None
val checkId = 500
for (i <- 1 to 1000) {

def buildProfile(cores: Int): ResourceProfile = {
val rprofBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.cores(i).memory("4g").memoryOverhead("2000m")
ereqs.cores(cores).memory("4g").memoryOverhead("2000m")
val treqs = new TaskResourceRequests()
treqs.cpus(i)
rprofBuilder.require(ereqs).require(treqs)
val rprof = rprofBuilder.build()
rpmanager.addResourceProfile(rprof)
if (i == checkId) rpAlreadyExist = Some(rprof)
treqs.cpus(1)
rprofBuilder.require(ereqs).require(treqs).build()
}
val rpNotMatch = new ResourceProfileBuilder().build()
assert(rpmanager.getEquivalentProfile(rpNotMatch).isEmpty,
s"resourceProfile should not have existed")

val rprofBuilder = new ResourceProfileBuilder()
val ereqs = new ExecutorResourceRequests()
ereqs.cores(checkId).memory("4g").memoryOverhead("2000m")
val treqs = new TaskResourceRequests()
treqs.cpus(checkId)
rprofBuilder.require(ereqs).require(treqs)
val rpShouldMatch = rprofBuilder.build()

val equivProf = rpmanager.getEquivalentProfile(rpShouldMatch)
assert(equivProf.nonEmpty)
assert(equivProf.get.id == rpAlreadyExist.get.id, s"resourceProfile should have existed")

val first = buildProfile(8)
val registered = rpmanager.getOrAddEquivalentProfile(first)
// A brand-new profile is registered and returned as-is.
assert(registered.id == first.id)

// A distinct profile object with equal resources resolves to the already-registered one,
// so they share a single id and can therefore reuse the same executors.
val equivalent = buildProfile(8)
assert(equivalent.id != first.id, "the new profile object should have a different id")
val resolved = rpmanager.getOrAddEquivalentProfile(equivalent)
assert(resolved.id == first.id, "equivalent profile should resolve to the existing id")

// A profile with different resources is registered under its own id.
val different = buildProfile(16)
val resolvedDifferent = rpmanager.getOrAddEquivalentProfile(different)
assert(resolvedDifferent.id == different.id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3948,13 +3948,15 @@ class SparkConnectPlanner(
name -> new TaskResourceRequest(res.getResourceName, res.getAmount)
}.toMap

// Create ResourceProfile add add it to ResourceProfileManager
val profile = if (ereqs.isEmpty) {
// Create the ResourceProfile and register it, reusing an already-registered profile with
// equal resources if one exists so that equivalent profiles share a single id (and thus
// can reuse the same executors instead of triggering new allocations).
val newProfile = if (ereqs.isEmpty) {
new TaskResourceProfile(treqs)
} else {
new ResourceProfile(ereqs, treqs)
}
session.sparkContext.resourceProfileManager.addResourceProfile(profile)
val profile = session.sparkContext.resourceProfileManager.getOrAddEquivalentProfile(newProfile)

executeHolder.eventsManager.postFinished()
responseObserver.onNext(
Expand Down