您的位置:首页 > 科技 > 能源 > scf900色带_宁波十大建设集团_怎么在百度发布信息_各大网站提交入口网址

scf900色带_宁波十大建设集团_怎么在百度发布信息_各大网站提交入口网址

2025/10/1 15:32:09 来源:https://blog.csdn.net/m0_63322122/article/details/146287696  浏览:    关键词:scf900色带_宁波十大建设集团_怎么在百度发布信息_各大网站提交入口网址
scf900色带_宁波十大建设集团_怎么在百度发布信息_各大网站提交入口网址

在 Spark 中,窗口函数(Window Functions) 是一种强大的工具,用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。窗口函数允许你在数据的某个“窗口”内进行计算,例如计算排名、累积和、移动平均等。

窗口函数的核心思想是定义一个“窗口”(通过 Window 类),然后在这个窗口上应用聚合函数(如 row_numberranksumavg 等)。


1. 窗口函数的基本概念

(1)窗口的定义

窗口函数通过 Window 类定义,主要包括以下两个部分:

  • 分区(Partitioning):将数据分为多个组(类似于 GROUP BY)。

  • 排序(Ordering):在每个分区内对数据进行排序。

  • 窗口范围(Frame):定义窗口的大小(如当前行及其前后若干行)。

(2)常见的窗口函数
  • 排名函数row_numberrankdense_rankpercent_rank

  • 聚合函数sumavgminmaxcount

  • 分析函数leadlagfirst_valuelast_value


2. 窗口函数的语法

(1)定义窗口
import org.apache.spark.sql.expressions.Windowval windowSpec = Window.partitionBy("column1", "column2") // 按列分区.orderBy("column3")                // 按列排序.rowsBetween(start, end)           // 定义窗口范围(可选)
  • partitionBy:指定分区的列。

  • orderBy:指定排序的列。

  • rowsBetween:定义窗口的范围(如 Window.unboundedPreceding 表示从分区的第一行开始)。

(2)应用窗口函数
import org.apache.spark.sql.functions._val resultDF = df.withColumn("new_column", F.row_number().over(windowSpec))
  • withColumn:添加新列。

  • row_number().over(windowSpec):在定义的窗口上应用 row_number 函数。


3. 窗口函数的示例

示例 1:计算每个部门的工资排名

假设有一个 DataFrame,包含用户的姓名、部门和工资:

import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Windowval spark = SparkSession.builder().appName("Window Function Example").master("local[*]").getOrCreate()// 示例数据
val data = Seq(("Alice", "HR", 3000),("Bob", "IT", 4000),("Charlie", "HR", 3500),("David", "IT", 4500),("Eva", "Finance", 5000)
)// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")// 定义窗口:按部门分区,按工资降序排序
val windowSpec = Window.partitionBy("department").orderBy(F.desc("salary"))// 计算每个部门的工资排名
val rankedDF = df.withColumn("rank", F.row_number().over(windowSpec))// 显示结果
rankedDF.show()

输出:

+-------+----------+------+----+
|   name|department|salary|rank|
+-------+----------+------+----+
|    Eva|   Finance|  5000|   1|
|  Alice|        HR|  3000|   2|
|Charlie|        HR|  3500|   1|
|   David|        IT|  4500|   1|
|    Bob|        IT|  4000|   2|
+-------+----------+------+----+
  • partitionBy("department"):按部门分区。

  • orderBy(F.desc("salary")):按工资降序排序。

  • row_number():计算每行的排名。


示例 2:计算每个部门的累积工资

使用 sum 函数计算每个部门的累积工资:

// 定义窗口:按部门分区,按工资升序排序
val windowSpec = Window.partitionBy("department").orderBy("salary").rowsBetween(Window.unboundedPreceding, Window.currentRow)// 计算累积工资
val cumulativeDF = df.withColumn("cumulative_salary", F.sum("salary").over(windowSpec))// 显示结果
cumulativeDF.show()

输出:

+-------+----------+------+----------------+
|   name|department|salary|cumulative_salary|
+-------+----------+------+----------------+
|    Eva|   Finance|  5000|            5000|
|  Alice|        HR|  3000|            3000|
|Charlie|        HR|  3500|            6500|
|    Bob|        IT|  4000|            4000|
|   David|        IT|  4500|            8500|
+-------+----------+------+----------------+
  • rowsBetween(Window.unboundedPreceding, Window.currentRow):定义窗口范围为从分区的第一行到当前行。

  • sum("salary").over(windowSpec):计算累积工资。


示例 3:计算每个部门的工资移动平均
使用 avg 函数计算每个部门的工资移动平均(当前行及其前一行):// 定义窗口:按部门分区,按工资升序排序,窗口范围为当前行及其前一行
val windowSpec = Window.partitionBy("department").orderBy("salary").rowsBetween(-1, Window.currentRow)// 计算移动平均
val movingAvgDF = df.withColumn("moving_avg", F.avg("salary").over(windowSpec))// 显示结果
movingAvgDF.show()

输出:

+-------+----------+------+----------+
|   name|department|salary| moving_avg|
+-------+----------+------+----------+
|    Eva|   Finance|  5000|    5000.0|
|  Alice|        HR|  3000|    3000.0|
|Charlie|        HR|  3500|    3250.0|
|    Bob|        IT|  4000|    4000.0|
|   David|        IT|  4500|    4250.0|
+-------+----------+------+----------+
  • rowsBetween(-1, Window.currentRow):定义窗口范围为当前行及其前一行。

  • avg("salary").over(windowSpec):计算移动平均。


4. 常见的窗口函数

(1)排名函数
  • row_number():为每行分配一个唯一的序号(从 1 开始)。

  • rank():计算排名,相同值会有相同的排名,后续排名会跳过。

  • dense_rank():计算排名,相同值会有相同的排名,后续排名不会跳过。

  • percent_rank():计算百分比排名。

(2)聚合函数
  • sum():计算窗口内的总和。

  • avg():计算窗口内的平均值。

  • min():计算窗口内的最小值。

  • max():计算窗口内的最大值。

  • count():计算窗口内的行数。

(3)分析函数
  • lead():获取当前行之后的某一行。

  • lag():获取当前行之前的某一行。

  • first_value():获取窗口内的第一个值。

  • last_value():获取窗口内的最后一个值。


5. 窗口范围的定义

窗口范围通过 rowsBetween 或 rangeBetween 定义:

  • rowsBetween(start, end):基于行的偏移量定义窗口范围。

    • Window.unboundedPreceding:从分区的第一行开始。

    • Window.unboundedFollowing:到分区的最后一行结束。

    • Window.currentRow:当前行。

  • rangeBetween(start, end):基于值的范围定义窗口范围(适用于数值或日期类型)。


6. 总结

  • 窗口函数 用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。

  • 通过 Window 类定义窗口,包括分区、排序和窗口范围。

  • 常见的窗口函数包括排名函数、聚合函数和分析函数。

  • 窗口范围可以通过 rowsBetween 或 rangeBetween 定义。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com