abstractclassRDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extendsSerializablewithLogging{
if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) { // This is a warning instead of an exception in order to avoid breaking user programs that // might have defined nested RDDs without running jobs with them. logWarning("Spark does not support nested RDDs (see SPARK-5063)") } /** Construct an RDD with just a one-to-one dependency on one parent */ defthis(@transient oneParent: RDD[_]) = this(oneParent.context , List(newOneToOneDependency(oneParent))) }
/** * Return a new RDD by applying a function to all elements of this RDD. */ defmap[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) newMapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
private[spark] defwithScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)
// RDDOperationScope.scala
/** * A general, named code block representing an operation that instantiates RDDs. * * All RDDs instantiated in the corresponding code block will store a pointer to this object. * Examples include, but will not be limited to, existing RDD operations, such as textFile, * reduceByKey, and treeAggregate. * * An operation scope may be nested in other scopes. For instance, a SQL query may enclose * scopes associated with the public RDD APIs it uses under the hood. * * There is no particular relationship between an operation scope and a stage or a job. * A scope may live inside one stage (e.g. map) or span across multiple jobs (e.g. take). */ @JsonInclude(Include.NON_NULL) @JsonPropertyOrder(Array("id", "name", "parent")) private[spark] classRDDOperationScope( val name: String, val parent: Option[RDDOperationScope] = None, val id: String = RDDOperationScope.nextScopeId().toString) {
/** * Return a list of scopes that this scope is a part of, including this scope itself. * The result is ordered from the outermost scope (eldest ancestor) to this scope. */ @JsonIgnore defgetAllScopes: Seq[RDDOperationScope] = { parent.map(_.getAllScopes).getOrElse(Seq.empty) ++ Seq(this) }
overridedefequals(other: Any): Boolean = { other match { case s: RDDOperationScope => id == s.id && name == s.name && parent == s.parent case _ => false } }
overridedeftoString: String = toJson }
/** * A collection of utility methods to construct a hierarchical representation of RDD scopes. * An RDD scope tracks the series of operations that created a given RDD. */ private[spark] objectRDDOperationScopeextendsLogging{ privateval jsonMapper = newObjectMapper().registerModule(DefaultScalaModule) privateval scopeCounter = newAtomicInteger(0)
/** Return a globally unique operation scope ID. */ defnextScopeId(): Int = scopeCounter.getAndIncrement
/** * Execute the given body such that all RDDs created in this body will have the same scope. * The name of the scope will be the first method name in the stack trace that is not the * same as this method's. * * Note: Return statements are NOT allowed in body. */ private[spark] defwithScope[T]( sc: SparkContext, allowNesting: Boolean = false)(body: => T): T = { val ourMethodName = "withScope" val callerMethodName = Thread.currentThread.getStackTrace() .dropWhile(_.getMethodName != ourMethodName) // 去掉了 withScope 之后的所有函数调用 .find(_.getMethodName != ourMethodName) // 找到调用 withScope 的函数,如 RDD#withScope .map(_.getMethodName) .getOrElse { // Log a warning just in case, but this should almost certainly never happen logWarning("No valid method name for this RDD operation scope!") "N/A" } withScope[T](sc, callerMethodName, allowNesting, ignoreParent = false)(body) }
/** * Execute the given body such that all RDDs created in this body will have the same scope. * * If nesting is allowed, any subsequent calls to this method in the given body will instantiate * child scopes that are nested within our scope. Otherwise, these calls will take no effect. * * Additionally, the caller of this method may optionally ignore the configurations and scopes * set by the higher level caller. In this case, this method will ignore the parent caller's * intention to disallow nesting, and the new scope instantiated will not have a parent. This * is useful for scoping physical operations in Spark SQL, for instance. * * Note: Return statements are NOT allowed in body. */ private[spark] defwithScope[T]( sc: SparkContext, name: String, allowNesting: Boolean, ignoreParent: Boolean)(body: => T): T = { // Save the old scope to restore it later val scopeKey = SparkContext.RDD_SCOPE_KEY val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY val oldScopeJson = sc.getLocalProperty(scopeKey) val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson) val oldNoOverride = sc.getLocalProperty(noOverrideKey) try { if (ignoreParent) { // Ignore all parent settings and scopes and start afresh with our own root scope sc.setLocalProperty(scopeKey, newRDDOperationScope(name).toJson) } elseif (sc.getLocalProperty(noOverrideKey) == null) { // Otherwise, set the scope only if the higher level caller allows us to do so sc.setLocalProperty(scopeKey, newRDDOperationScope(name, oldScope).toJson) } // Optionally disallow the child body to override our scope if (!allowNesting) { sc.setLocalProperty(noOverrideKey, "true") } // 在执行传入的函数前先将一个新的 RDDOperationScope 设定到 sc 中 body } finally { // 执行完毕后再还原 // Remember to restore any state that was modified before exiting sc.setLocalProperty(scopeKey, oldScopeJson) sc.setLocalProperty(noOverrideKey, oldNoOverride) } } }
/** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */ defsample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { newPartitionwiseSampledRDD[T, T](this, newPoissonSampler[T](fraction), true, seed) } else { newPartitionwiseSampledRDD[T, T](this, newBernoulliSampler[T](fraction), true, seed) } }
private[spark] classPartitionwiseSampledRDDPartition(val prev: Partition, val seed: Long) extendsPartitionwithSerializable { overrideval index: Int = prev.index }
/** * A RDD sampled from its parent RDD partition-wise. For each partition of the parent RDD, * a user-specified [[org.apache.spark.util.random.RandomSampler]] instance is used to obtain * a random sample of the records in the partition. The random seeds assigned to the samplers * are guaranteed to have different values. * * @param prev RDD to be sampled * @param sampler a random sampler * @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD * @param seed random seed * @tparam T input RDD item type * @tparam U sampled RDD item type */ private[spark] classPartitionwiseSampledRDD[T: ClassTag, U: ClassTag]( prev: RDD[T], sampler: RandomSampler[T, U], @transient preservesPartitioning: Boolean, @transient seed: Long = Utils.random.nextLong) extendsRDD[U](prev) {
@transientoverrideval partitioner = if (preservesPartitioning) prev.partitioner elseNone
overridedefgetPartitions: Array[Partition] = { val random = newRandom(seed) firstParent[T].partitions.map(x => newPartitionwiseSampledRDDPartition(x, random.nextLong())) }
/** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ defcartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { newCartesianRDD(sc, this, other) }