Java OpenResty Spring Spring Boot MySQL Redis MongoDB PostgreSQL Linux Android Nginx 面试 小程序 Arthas JVM AQS juc Kubernetes Docker DevOps


Spring Boot 实现 SSE 服务端推送事件

Spring Boot HTTP 大约 6816 字

SSE

Sever Send Event,是HTTP协议中的一种,Content-Typetext/event-stream,能够保持长连接。

示例代码

Spring Boot

@CrossOrigin
@RestController
public class SSEController {

    private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @PostConstruct
    private void init() {
        Executors.newScheduledThreadPool(10).scheduleWithFixedDelay(() -> {
            System.out.println("sseEmitterMap#" + sseEmitterMap);
            sseEmitterMap.forEach((s, sseEmitter) -> {
                try {
                    sseEmitter.send(SseEmitter.event()
                        .id(UUID.randomUUID().toString())
                        .data(MyEvent.builder().code(222).msg(s + "#" +
                            LocalDateTime.now() + "#" + Thread.currentThread().getName() + "#"
                            + Thread.currentThread().getState()).build())
                        .reconnectTime(3000L)
                        .comment("this is comment")
                    );
                    if (LocalDateTime.now().getSecond() % 2 == 0) {
                        sseEmitter.send(
                            SseEmitter.event()
                                .id("1")
                                .name("customEventName")
                                .data("customData")
                        );
                    }
                } catch (IOException e) {
                    System.out.println("Error#" + e);
//                    sseEmitter.completeWithError(e);
                }
            });
        }, 3, 3, TimeUnit.SECONDS);
    }

    @GetMapping("/test/sse")
    public SseEmitter sseEmitter(@RequestParam("uid") String uid) throws IOException {
        SseEmitter sseEmitter = new SseEmitter(-1L);
//        SseEmitter sseEmitter = new SseEmitter(5L);
        sseEmitter.send(SseEmitter.event().id("1").name("Connected").data(LocalDateTime.now()).reconnectTime(3000));
        sseEmitterMap.put(uid, sseEmitter);

        sseEmitter.onCompletion(() -> {
            System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on completion");
            sseEmitterMap.remove(uid);
        });
        sseEmitter.onTimeout(() -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on timeout#" + sseEmitter.getTimeout()));
        sseEmitter.onError(throwable -> System.out.println(LocalDateTime.now() + ", uid#" + uid + ", on error#" + throwable.toString()));
        return sseEmitter;
    }
}

JavaScript

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <script>
        window.onload = function () {
            let connectBtn = document.getElementById("connectSSE");
            let disconnectBtn = document.getElementById("disconnectSSE");
            let userIdElement = document.getElementById("userId");
            let userIdInfoElement = document.getElementById("userIdInfo");
            let sse;
            connectBtn.onclick = function () {
                if (!userIdElement.value) {
                    userIdInfoElement.innerText = "userId is empty";
                    console.log("userId is empty")
                    return;
                }
                userIdInfoElement.innerText = userIdElement.value;
                const eventSource = new EventSource('http://localhost:18080/test/sse?uid=' + userIdElement.value);
                eventSource.onopen = (event) => {
                    console.log("onopen", event.readyState, event.target);
                    sse = event.target;
                    let element = document.getElementById("onOpenInfo");
                    element.innerText = JSON.stringify(event.target);
                };
                eventSource.onmessage = (event) => {
                    let element = document.getElementById("onMessageInfo");
                    element.innerText = event.data;
                };
                eventSource.onerror = (event) => {
                    console.log("onerror", event);
                    if (event.readyState === EventSource.CLOSED) {
                        console.log('connection is closed');
                    } else {
                        console.log("Error occured", event);
                    }
                    event.target.close();
                    let element = document.getElementById("onErrorInfo");
                    element.innerText = JSON.stringify(event);
                };
                eventSource.addEventListener("customEventName", (event) => {
                    console.log("Message id is " + event.lastEventId);
                });
            };

            disconnectBtn.onclick = function () {
                if (sse) {
                    sse.close();
                }
            };

        };
    </script>
</head>
<body>

<div>
    <input id="userId" type="text">
    <button id="connectSSE">Connect</button>
    <button id="disconnectSSE">Disconnect</button>
</div>

<div>
    userId: <span id="userIdInfo"></span>
</div>

<div>
    onOpen: <span id="onOpenInfo"></span>
</div>

<div>
    onMessage: <span id="onMessageInfo"></span>
</div>

<div>
    onError: <span id="onErrorInfo"></span>
</div>

</body>
</html>

SseEmitter

SpringMVC封装的SSE实现,Controller中直接返回SseEmitter,不调用complete()方法,即可保持长链接。

超时时间

SseEmitter():无参构造,默认超时时间依赖于Web容器,容器为Tomcat则超时时间为30秒。

SseEmitter(Long timeout):有参构造,设置超时时间。传入-1L表示没有超时时间。

无参构造可通过配置mvc属性来设置超时时间,单位毫秒:

spring:
  mvc:
    async:
      request-timeout: 15000

注意

客户端关闭了连接,不管是调用了event.target.close()还是关闭了网页,服务端不会触发任何回调。直到服务端调用send后才会触发onErroronCompletion回调。

服务端触发了onCompletion回调后,连接就断开了。

重试

浏览器会保持连接一直打开。服务端可以通过调用completecompleteWithError方法关闭连接,这两个事件会触发客户端的error回调。

当服务端关闭连接或网络错误时,如果客户端不调用event.target.close()关闭连接的话,浏览器会发起重新连接。

浏览器默认会等待3秒再尝试重新建立连接,并且浏览器会保持重试知道获得HTTP请求返回的200状态码。

服务端可以通过发送retry标志位更改默认3秒的等待时间。服务端可以设置标志位为0,表示连接关闭则立即发起重试,没有等待时间。

限制

  • 只能发送文本(可通过base64等方法简单加密)。
  • 很多浏览器(包括Chrome)限制同一端口最多开启的SSE连接数,最多为6个,即每个端口最多可开启6个连接。超出6个连接后进入pending状态。
PS C:\> netstat -an | findstr 18080 | findstr ESTABLISHED
  TCP    [::1]:18080            [::1]:52001            ESTABLISHED
  TCP    [::1]:18080            [::1]:52987            ESTABLISHED
  TCP    [::1]:18080            [::1]:52997            ESTABLISHED
  TCP    [::1]:18080            [::1]:53006            ESTABLISHED
  TCP    [::1]:18080            [::1]:53007            ESTABLISHED
  TCP    [::1]:18080            [::1]:53029            ESTABLISHED

Spring Flux SSE

https://www.baeldung.com/spring-server-sent-events

参考

https://dzone.com/articles/server-sent-events-using-spring

https://www.baeldung.com/spring-server-sent-events

阅读 239 · 发布于 2022-09-28

————        END        ————

Give me a Star, Thanks:)

https://github.com/fendoudebb

扫描下方二维码关注公众号和小程序↓↓↓

扫描二维码关注我
昵称:
随便看看 换一批