您的位置:首页 > 游戏 > 手游 > 武汉外贸型网站建设_自己搭建云游戏服务器_友情链接出售_2024年重启核酸

武汉外贸型网站建设_自己搭建云游戏服务器_友情链接出售_2024年重启核酸

2025/7/4 15:16:02 来源:https://blog.csdn.net/2301_82217925/article/details/147565623  浏览:    关键词:武汉外贸型网站建设_自己搭建云游戏服务器_友情链接出售_2024年重启核酸
武汉外贸型网站建设_自己搭建云游戏服务器_友情链接出售_2024年重启核酸

 

输入输出

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

object SparkStreaming {
  def main(args: Array[String]): Unit = {
    try {
      // 定义更新函数
      val updateFunc = (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(6)
        Some(currentCount + previousCount)
      }

      // 创建 SparkConf 对象
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")

      // 创建 StreamingContext 对象
      val ssc = new StreamingContext(sparkConf, Seconds(5))

      // 设置检查点目录
      ssc.checkpoint("./ck")

      // 创建 DStream,从 socket 接收数据
      val lines = ssc.socketTextStream("node01", 9999)

      // 将每行数据拆分为单词
      val words = lines.flatMap(_.split(" "))

      // 将每个单词映射为 (单词, 1) 的键值对
      val pairs = words.map((_, 1))

      // 使用 updateStateByKey 函数更新状态
      val stateDStream: DStream[(String, Int)] = pairs.updateStateByKey[Int](updateFunc)

      // 打印更新后的状态
      stateDStream.print()

      // 启动流计算
      ssc.start()

      // 等待计算终止
      ssc.awaitTermination()
    } catch {
      case e: Exception =>
        println(s"An error occurred: ${e.getMessage}")
        e.printStackTrace()
    }
  }
}

第二个

输入输出

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming {
  def main(args: Array[String]): Unit = {
    try {
      // 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 window
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")
      // 创建 StreamingContext 对象,批处理间隔为 3 秒
      val ssc = new StreamingContext(sparkConf, Seconds(6))
      // 设置检查点目录,用于保存状态信息
      ssc.checkpoint("./ck")

      // 从指定的主机和端口接收数据,创建 DStream
      val lines = ssc.socketTextStream("node01", 9999)
      // 将每行数据拆分为单词
      val words = lines.flatMap(_.split(" "))
      // 将每个单词映射为 (单词, 1) 的键值对
      val pairs = words.map((_, 1))
      // 使用 reduceByKeyAndWindow 进行窗口操作,窗口大小为 12 秒,滑动间隔为 6 秒
      val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
      // 打印每个窗口内的单词计数结果
      wordCounts.print()

      // 启动流计算
      ssc.start()
      // 等待计算终止
      ssc.awaitTermination()
    } catch {
      case e: Exception =>
        println(s"An error occurred: ${e.getMessage}")
        e.printStackTrace()
    }
  }
}

 

 

 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com