在 Apache Spark 中,RDD(弹性分布式数据集)是核心的数据抽象,RDD 算子可分为转换算子(Transformation)和行动算子(Action)。下面使用 Scala 语言为你详细介绍这两类算子。
转换算子(Transformation)
转换算子会基于现有的 RDD 创建一个新的 RDD,这些操作是惰性的,即只有在遇到行动算子时才会真正触发计算。
常见转换算子示例
- map(func):对 RDD 中的每个元素应用指定的函数,生成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}object MapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MapExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val newRDD = rdd.map(x => x * 2)newRDD.collect().foreach(println)sc.stop()}
}
- filter(func):筛选出满足指定条件的元素,生成新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}object FilterExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FilterExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val newRDD = rdd.filter(x => x % 2 == 0)newRDD.collect().foreach(println)sc.stop()}
}
- flatMap(func):对 RDD 中的每个元素应用函数,然后将结果扁平化,生成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}object FlatMapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq("hello world", "spark rdd"))val newRDD = rdd.flatMap(x => x.split(" "))newRDD.collect().foreach(println)sc.stop()}
}
- union(other):将两个 RDD 合并成一个新的 RDD。
scala
import org.apache.spark.{SparkConf, SparkContext}object UnionExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UnionExample").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Seq(1, 2))val rdd2 = sc.parallelize(Seq(3, 4))val newRDD = rdd1.union(rdd2)newRDD.collect().foreach(println)sc.stop()}
}
- groupByKey():对键值对 RDD 按照键进行分组。
scala
import org.apache.spark.{SparkConf, SparkContext}object GroupByKeyExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))val newRDD = rdd.groupByKey()newRDD.collect().foreach(println)sc.stop()}
}
- reduceByKey(func):对键值对 RDD 按照键进行聚合操作。
scala
import org.apache.spark.{SparkConf, SparkContext}object ReduceByKeyExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))val newRDD = rdd.reduceByKey(_ + _)newRDD.collect().foreach(println)sc.stop()}
}
行动算子(Action)
行动算子会触发对 RDD 的计算,并返回一个具体的结果或者将结果保存到外部存储。
常见行动算子示例
- collect():将 RDD 中的所有元素收集到驱动程序中。
scala
import org.apache.spark.{SparkConf, SparkContext}object CollectExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CollectExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val result = rdd.collect()result.foreach(println)sc.stop()}
}
- count():返回 RDD 中元素的数量。
scala
import org.apache.spark.{SparkConf, SparkContext}object CountExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CountExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val count = rdd.count()println(count)sc.stop()}
}
- first():返回 RDD 中的第一个元素。
scala
import org.apache.spark.{SparkConf, SparkContext}object FirstExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FirstExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val firstElement = rdd.first()println(firstElement)sc.stop()}
}
- take(n):返回 RDD 中的前
n
个元素。
scala
import org.apache.spark.{SparkConf, SparkContext}object TakeExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("TakeExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))val result = rdd.take(3)result.foreach(println)sc.stop()}
}
- reduce(func):使用指定的函数对 RDD 中的元素进行聚合操作。
scala
import org.apache.spark.{SparkConf, SparkContext}object ReduceExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ReduceExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val result = rdd.reduce(_ + _)println(result)sc.stop()}
}
- saveAsTextFile(path):将 RDD 中的元素以文本文件的形式保存到指定的路径。
scala
import org.apache.spark.{SparkConf, SparkContext}object SaveAsTextFileExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SaveAsTextFileExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))rdd.saveAsTextFile("output_path")sc.stop()}
}