Spring Boot 实现 SSE 服务端推送事件

Spring Boot HTTP SSE About 6,816 words

SSE

Sever Send Event,是HTTP协议中的一种,Content-Typetext/event-stream,能够保持长连接。

示例代码

Spring Boot

@CrossOrigin
@RestController
public class SSEController {

    private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @PostConstruct
    private void init() {
        Executors.newScheduledThreadPool(10).scheduleWithFixedDelay(() -> {
            System.out.println("sseEmitterMap#" + sseEmitterMap);
            sseEmitterMap.forEach((s, sseEmitter) -> {
                try {
                    sseEmitter.send(SseEmitter.event()
                        .id(UUID.randomUUID().toString())
                        .data(MyEvent.builder().code(222).msg(s + "#" +
                            LocalDateTime.now() + "#" + Thread.currentThread().getName() + "#"
                            + Thread.currentThread().getState()).build())
                        .reconnectTime(3000L)
                        .comment("this is comment")
                    );
                    if (LocalDateTime.now().getSecond() % 2 == 0) {
                        sseEmitter.send(
                            SseEmitter.event()
                                .id("1")
                                .name("customEventName")
                                .data("customData")
                        );
                    }
                } catch (IOException e) {
                    System.out.println("Error#" + e);
//                    sseEmitter.completeWithError(e);
                }
            });
        }, 3, 3, TimeUnit.SECONDS);
    }

    @GetMapping("/test/sse")
    public SseEmitter sseEmitter(@RequestParam("uid") String uid) throws IOException {
        SseEmitter sseEmitter = new SseEmitter(-1L);
//        SseEmitter sseEmitter = new SseEmitter(5L);
        sseEmitter.send(SseEmitter.event().id("1").name("Connected").data(LocalDateTime.now()).reconnectTime(3000));
        sseEmitterMap.put(uid, sseEmitter);

        sseEmitter.onCompletion(() -> {
            System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on completion");
            sseEmitterMap.remove(uid);
        });
        sseEmitter.onTimeout(() -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on timeout#" + sseEmitter.getTimeout()));
        sseEmitter.onError(throwable -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on error#" + throwable.toString()));
        return sseEmitter;
    }
}

JavaScript

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script>
        window.onload = function () {
            let connectBtn = document.getElementById("connectSSE");
            let disconnectBtn = document.getElementById("disconnectSSE");
            let userIdElement = document.getElementById("userId");
            let userIdInfoElement = document.getElementById("userIdInfo");
            let sse;
            connectBtn.onclick = function () {
                if (!userIdElement.value) {
                    userIdInfoElement.innerText = "userId is empty";
                    console.log("userId is empty")
                    return;
                }
                userIdInfoElement.innerText = userIdElement.value;
                const eventSource = new EventSource('http://localhost:18080/test/sse?uid=' + userIdElement.value);
                eventSource.onopen = (event) => {
                    console.log("onopen", event.readyState, event.target);
                    sse = event.target;
                    let element = document.getElementById("onOpenInfo");
                    element.innerText = JSON.stringify(event.target);
                };
                eventSource.onmessage = (event) => {
                    let element = document.getElementById("onMessageInfo");
                    element.innerText = event.data;
                };
                eventSource.onerror = (event) => {
                    console.log("onerror", event);
                    if (event.readyState === EventSource.CLOSED) {
                        console.log('connection is closed');
                    } else {
                        console.log("Error occured", event);
                    }
                    event.target.close();
                    let element = document.getElementById("onErrorInfo");
                    element.innerText = JSON.stringify(event);
                };
                eventSource.addEventListener("customEventName", (event) => {
                    console.log("Message id is " + event.lastEventId);
                });
            };

            disconnectBtn.onclick = function () {
                if (sse) {
                    sse.close();
                }
            };

        };
    </script>
</head>
<body>

<div>
    <input id="userId" type="text">
    <button id="connectSSE">Connect</button>
    <button id="disconnectSSE">Disconnect</button>
</div>

<div>
    userId: <span id="userIdInfo"></span>
</div>

<div>
    onOpen: <span id="onOpenInfo"></span>
</div>

<div>
    onMessage: <span id="onMessageInfo"></span>
</div>

<div>
    onError: <span id="onErrorInfo"></span>
</div>

</body>
</html>

SseEmitter

SpringMVC封装的SSE实现,Controller中直接返回SseEmitter,不调用complete()方法,即可保持长链接。

超时时间

SseEmitter():无参构造,默认超时时间依赖于Web容器,容器为Tomcat则超时时间为30秒。

SseEmitter(Long timeout):有参构造,设置超时时间。传入-1L表示没有超时时间。

无参构造可通过配置mvc属性来设置超时时间,单位毫秒:

spring:
  mvc:
    async:
      request-timeout: 15000

注意

客户端关闭了连接,不管是调用了event.target.close()还是关闭了网页,服务端不会触发任何回调。直到服务端调用send后才会触发onErroronCompletion回调。

服务端触发了onCompletion回调后,连接就断开了。

重试

浏览器会保持连接一直打开。服务端可以通过调用completecompleteWithError方法关闭连接,这两个事件会触发客户端的error回调。

当服务端关闭连接或网络错误时,如果客户端不调用event.target.close()关闭连接的话,浏览器会发起重新连接。

浏览器默认会等待3秒再尝试重新建立连接,并且浏览器会保持重试知道获得HTTP请求返回的200状态码。

服务端可以通过发送retry标志位更改默认3秒的等待时间。服务端可以设置标志位为0,表示连接关闭则立即发起重试,没有等待时间。

限制

  • 只能发送文本(可通过base64等方法简单加密)。
  • 很多浏览器(包括Chrome)限制同一端口最多开启的SSE连接数,最多为6个,即每个端口最多可开启6个连接。超出6个连接后进入pending状态。
PS C:\> netstat -an | findstr 18080 | findstr ESTABLISHED
  TCP    [::1]:18080            [::1]:52001            ESTABLISHED
  TCP    [::1]:18080            [::1]:52987            ESTABLISHED
  TCP    [::1]:18080            [::1]:52997            ESTABLISHED
  TCP    [::1]:18080            [::1]:53006            ESTABLISHED
  TCP    [::1]:18080            [::1]:53007            ESTABLISHED
  TCP    [::1]:18080            [::1]:53029            ESTABLISHED

Spring Flux SSE

https://www.baeldung.com/spring-server-sent-events

参考

https://dzone.com/articles/server-sent-events-using-spring

https://www.baeldung.com/spring-server-sent-events

Views: 6,376 · Posted: 2022-09-28

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb/LiteNote

扫描下方二维码关注公众号和小程序↓↓↓

扫描下方二维码关注公众号和小程序↓↓↓


Today On History
Browsing Refresh