输入输出
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
object SparkStreaming {
def main(args: Array[String]): Unit = {
try {
// 定义更新函数
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(6)
Some(currentCount + previousCount)
}
// 创建 SparkConf 对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("update")
// 创建 StreamingContext 对象
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 设置检查点目录
ssc.checkpoint("./ck")
// 创建 DStream,从 socket 接收数据
val lines = ssc.socketTextStream("node01", 9999)
// 将每行数据拆分为单词
val words = lines.flatMap(_.split(" "))
// 将每个单词映射为 (单词, 1) 的键值对
val pairs = words.map((_, 1))
// 使用 updateStateByKey 函数更新状态
val stateDStream: DStream[(String, Int)] = pairs.updateStateByKey[Int](updateFunc)
// 打印更新后的状态
stateDStream.print()
// 启动流计算
ssc.start()
// 等待计算终止
ssc.awaitTermination()
} catch {
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
e.printStackTrace()
}
}
}
第二个
输入输出
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming {
def main(args: Array[String]): Unit = {
try {
// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 window
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("window")
// 创建 StreamingContext 对象,批处理间隔为 3 秒
val ssc = new StreamingContext(sparkConf, Seconds(6))
// 设置检查点目录,用于保存状态信息
ssc.checkpoint("./ck")
// 从指定的主机和端口接收数据,创建 DStream
val lines = ssc.socketTextStream("node01", 9999)
// 将每行数据拆分为单词
val words = lines.flatMap(_.split(" "))
// 将每个单词映射为 (单词, 1) 的键值对
val pairs = words.map((_, 1))
// 使用 reduceByKeyAndWindow 进行窗口操作,窗口大小为 12 秒,滑动间隔为 6 秒
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
// 打印每个窗口内的单词计数结果
wordCounts.print()
// 启动流计算
ssc.start()
// 等待计算终止
ssc.awaitTermination()
} catch {
case e: Exception =>
println(s"An error occurred: ${e.getMessage}")
e.printStackTrace()
}
}
}