您的位置:首页 > 财经 > 金融 > 公司网站建设价格贵吗_引擎搜索网站_十大网络舆情案例_产品软文撰写

公司网站建设价格贵吗_引擎搜索网站_十大网络舆情案例_产品软文撰写

2025/5/12 1:20:08 来源:https://blog.csdn.net/2401_83374628/article/details/147752192  浏览:    关键词:公司网站建设价格贵吗_引擎搜索网站_十大网络舆情案例_产品软文撰写
公司网站建设价格贵吗_引擎搜索网站_十大网络舆情案例_产品软文撰写

在 Apache Spark 中,RDD(弹性分布式数据集)是核心的数据抽象,RDD 算子可分为转换算子(Transformation)和行动算子(Action)。下面使用 Scala 语言为你详细介绍这两类算子。

转换算子(Transformation)

转换算子会基于现有的 RDD 创建一个新的 RDD,这些操作是惰性的,即只有在遇到行动算子时才会真正触发计算。

常见转换算子示例
  1. map(func):对 RDD 中的每个元素应用指定的函数,生成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}object MapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MapExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val newRDD = rdd.map(x => x * 2)newRDD.collect().foreach(println)sc.stop()}
}

  1. filter(func):筛选出满足指定条件的元素,生成新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}object FilterExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FilterExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val newRDD = rdd.filter(x => x % 2 == 0)newRDD.collect().foreach(println)sc.stop()}
}

  1. flatMap(func):对 RDD 中的每个元素应用函数,然后将结果扁平化,生成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}object FlatMapExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlatMapExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq("hello world", "spark rdd"))val newRDD = rdd.flatMap(x => x.split(" "))newRDD.collect().foreach(println)sc.stop()}
}

  1. union(other):将两个 RDD 合并成一个新的 RDD。

scala

import org.apache.spark.{SparkConf, SparkContext}object UnionExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("UnionExample").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Seq(1, 2))val rdd2 = sc.parallelize(Seq(3, 4))val newRDD = rdd1.union(rdd2)newRDD.collect().foreach(println)sc.stop()}
}

  1. groupByKey():对键值对 RDD 按照键进行分组。

scala

import org.apache.spark.{SparkConf, SparkContext}object GroupByKeyExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("GroupByKeyExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))val newRDD = rdd.groupByKey()newRDD.collect().foreach(println)sc.stop()}
}

  1. reduceByKey(func):对键值对 RDD 按照键进行聚合操作。

scala

import org.apache.spark.{SparkConf, SparkContext}object ReduceByKeyExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))val newRDD = rdd.reduceByKey(_ + _)newRDD.collect().foreach(println)sc.stop()}
}

行动算子(Action)

行动算子会触发对 RDD 的计算,并返回一个具体的结果或者将结果保存到外部存储。

常见行动算子示例
  1. collect():将 RDD 中的所有元素收集到驱动程序中。

scala

import org.apache.spark.{SparkConf, SparkContext}object CollectExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CollectExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val result = rdd.collect()result.foreach(println)sc.stop()}
}

  1. count():返回 RDD 中元素的数量。

scala

import org.apache.spark.{SparkConf, SparkContext}object CountExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("CountExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val count = rdd.count()println(count)sc.stop()}
}

  1. first():返回 RDD 中的第一个元素。

scala

import org.apache.spark.{SparkConf, SparkContext}object FirstExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FirstExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))val firstElement = rdd.first()println(firstElement)sc.stop()}
}

  1. take(n):返回 RDD 中的前 n 个元素。

scala

import org.apache.spark.{SparkConf, SparkContext}object TakeExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("TakeExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5))val result = rdd.take(3)result.foreach(println)sc.stop()}
}

  1. reduce(func):使用指定的函数对 RDD 中的元素进行聚合操作。

scala

import org.apache.spark.{SparkConf, SparkContext}object ReduceExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("ReduceExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3, 4))val result = rdd.reduce(_ + _)println(result)sc.stop()}
}

  1. saveAsTextFile(path):将 RDD 中的元素以文本文件的形式保存到指定的路径。

scala

import org.apache.spark.{SparkConf, SparkContext}object SaveAsTextFileExample {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("SaveAsTextFileExample").setMaster("local")val sc = new SparkContext(conf)val rdd = sc.parallelize(Seq(1, 2, 3))rdd.saveAsTextFile("output_path")sc.stop()}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com