如何在 WebSocketHandler 中控制连接的断开
1. WebSocket 连接的生命周期
在 Spring WebFlux 中,WebSocket 连接的生命周期由 Mono<Void> 来控制。当 WebSocket 连接通过 ReactorNettyWebSocketClient 被发起时,Mono<Void> 表示 WebSocket 连接的生命周期,只有在连接被关闭时,Mono<Void> 才会完成。
client.execute() 返回的 Mono<Void> 与 WebSocketHandler 返回的 Mono<Void> 是相同的,它们都是用于表示 WebSocket 连接的生命周期,表示 WebSocket 连接的打开与关闭。这两个 Mono 都是完成时,连接会关闭。
例如,client.execute() 返回的 Mono<Void> 会在连接关闭时完成:
public Mono<Void> connectToWebSocketServer() {return client.execute(URI.create("ws://example.com/socket"), session -> {// 发送消息Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));// 处理接收到的消息session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println) // 打印收到的消息.subscribe();return send;});
}
2. client.execute() 和 WebSocketHandler 返回的 Mono
client.execute() 和 WebSocketHandler 返回的 Mono<Void> 是紧密关联的。client.execute() 方法会调用 WebSocketHandler 来处理与服务器的 WebSocket 连接,在执行 WebSocket 连接的过程中,Mono<Void> 会表示这个连接的生命周期。
-
Mono<Void>表示连接的生命周期:Mono<Void>在 WebSocket 连接成功建立时开始,直到连接关闭时完成。在 WebSocketHandler 中,我们通过session.send()来发送消息,通过session.receive()来接收消息。在这个过程中,Mono<Void>会一直保持开启,直到连接关闭。 -
WebSocketHandler 的作用:
WebSocketHandler是用来处理 WebSocket 消息的核心组件,它是通过client.execute()与 WebSocket 服务器建立连接的。当连接成功建立时,Mono<Void>开始执行,直到连接被主动关闭或由于某些异常关闭。
例如,在 WebSocketHandler 中,你可能会返回一个 Mono<Void> 来表示消息的发送和接收,最终决定什么时候关闭连接:
public Mono<Void> handle(WebSocketSession session) {// 发送消息到服务器Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!")));// 处理接收到的消息session.receive().map(WebSocketMessage::getPayloadAsText).doOnNext(System.out::println) // 打印收到的消息.subscribe();// 返回 Mono<Void>,连接将持续,直到 Mono 完成return send.then(session.close()); // 发送完消息后关闭连接
}
3. WebSocket 何时会关闭?
WebSocket 连接的关闭由以下几种情况引起:
-
Mono 结束: 如果
Mono<Void>结束,WebSocket 连接会关闭。例如,当session.send()里的Mono<Void>完成后,WebSocket 连接就会关闭。Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!"))); return send; // 发送完 Hello! 后,Mono<Void> 会结束,WebSocket 关闭 -
服务器或客户端主动关闭: 服务器或客户端可以通过
session.close()主动关闭 WebSocket 连接。return session.send(Flux.just(session.textMessage("Hello"))).then(session.close()); -
Flux 终止: 如果
session.send()使用的是有限的Flux(例如Flux.just()),当消息发送完毕后,WebSocket 连接也会关闭。Flux<String> messages = Flux.just("Msg1", "Msg2", "Msg3"); // 发送3条消息 return session.send(messages.map(session::textMessage)); // 发送完毕后,WebSocket 会关闭 -
无限流不会自动关闭: 如果
Flux.interval()被用于消息流,它不会自动终止,所以 WebSocket 连接不会自动关闭。为了让连接在一定时间后断开,可以使用.take(n)限制消息数量。Flux<String> messages = Flux.interval(Duration.ofSeconds(1)).map(i -> "Msg " + i).take(10); // 只发送10条消息 return session.send(messages.map(session::textMessage)).then(session.close()); // 发送完10条后关闭连接
4. 如何主动控制 WebSocket 连接的断开?
如果你希望 WebSocket 在特定条件下主动断开,可以通过以下方式控制:
-
使用
Mono<Void>来控制连接关闭: 当Mono<Void>完成时,连接会自动关闭。Mono<Void> send = session.send(Mono.just(session.textMessage("Hello Server!"))); return send.then(session.close()); // 发送完消息后手动关闭连接 -
限制消息流: 你可以使用
take(n)来限制发送的消息数量,达到一定数量后 WebSocket 连接会自动关闭。Flux<String> output = Flux.interval(Duration.ofSeconds(1)).map(time -> "Server time: " + time).take(10); // 只发送10条消息 return session.send(output.map(session::textMessage)).then(session.close()); // 发送完毕后手动关闭连接 -
监听连接关闭: 可以通过
session.isOpen()监听 WebSocket 是否已关闭,或者使用doOnTerminate()监听关闭事件。session.receive().doOnTerminate(() -> System.out.println("WebSocket connection closed")).subscribe(); -
异常处理与重连: 使用
retryWhen()或Flux.never()来确保连接在断开后能够重连,或保持连接一直处于开放状态,直到明确关闭。client.execute(URI.create("ws://example.com/socket"), session -> {return session.send(Flux.interval(Duration.ofSeconds(1)).map(i -> session.textMessage("Ping " + i))).then(); }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))) // 断开后自动重连 .subscribe();return session.receive().then(Flux.never()); // 永不结束,保持 WebSocket 连接
5. 总结
client.execute() 返回的 Mono<Void> 和 WebSocketHandler 返回的 Mono<Void> 是同一个概念,它们表示 WebSocket 连接的生命周期。只有在连接关闭时,这些 Mono 才会完成。
通过 Mono<Void> 和 WebSocketHandler,你可以灵活地控制 WebSocket 连接的打开、消息的发送与接收,以及连接的关闭。
