使用 SpringAI 来实现 SSE 旁路输出

gomkiri 发布于 13 天前 26 次阅读


AI 摘要

主 Agent 阻塞导致用户体验卡顿?本文教你用 SpringAI + SSE 旁路输出破解难题:主 Agent 秒回“活已派”,异步子 Agent 后台默默画图,完成后通过 Sinks 漏斗直推前端。告别等待,实现流式文字与图表同时展示,性能拉满!

概述

最近来使用 SpringAI 去开发一个高校教学系统,为了实现个性化的教学,特意引入了 Mermaid 思维导图和基于实训项目和学生画像个性化出题的功能。并且一个主 Agent 基于当前的教学场景来动态的使用这些工具,但是这样就有一个问题,生成个性化内容的时候,主 Agent 是阻塞的,十分影响用户的体验,这个过程就算是流式输出了 Mermaid 代码,在用户的阅读上也没有很大的提升。

因此,我们设计一套基于 tools 调用、Sinks 和 SSE 旁路输出的解决方案,其大致的流程如下:

  1. 主 Agent 触发工具:主 Agent 决定需要思维导图,调用 drawMindMap 工具。
  2. 工具光速放行:Java 方法内部立刻返回一个占位提示(例如:“已通知后台绘制”),彻底解放主 Agent,让它继续给前端流式输出文字。
  3. 子 Agent 后台打工:Java 方法内部同时启动一个异步线程 (@Async 或响应式异步调度),去调用真正的 Mermaid Agent。
  4. Sinks 旁路注入 UI:几秒后,Mermaid Agent 生成完毕。它拿着生成的代码,顺着那个全局的 Sinks 漏斗,打上 event: mermaid 的标签,直接空降推送到前端页面上!

实现一:单用户 demo 级别

在实现上,首先准备一个全局的 Sinks 漏斗:

import org.springframework.stereotype.Component;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Flux;

@Component
public class SseDispatcher {
    // 多播漏斗,装载标准的 SSE 事件
    private final Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();

    public void dispatch(String eventName, String data) {
        sink.tryEmitNext(ServerSentEvent.<String>builder()
                .event(eventName)
                .data(data)
                .build());
    }

    public Flux<ServerSentEvent<String>> getStream() {
        return sink.asFlux();
    }
}

然后在工具干的活中,先异步的调用Agent,然后直接返回一个固定的提示词即可:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Description;
import org.springframework.scheduling.annotation.Async;
import java.util.function.Function;

@Configuration
public class MermaidToolConfig {

    private final SseDispatcher sseDispatcher;
    private final ChatClient mermaidAgentClient; // 专门负责写代码的子模型

    public MermaidToolConfig(SseDispatcher sseDispatcher, @Qualifier("codingClient") ChatClient mermaidAgentClient) {
        this.sseDispatcher = sseDispatcher;
        this.mermaidAgentClient = mermaidAgentClient;
    }

    public record MindMapRequest(String topic, String structuralHints) {}
    
    @Bean
    @Description("当需要配合思维导图解释概念时调用此工具。工具会异步执行,执行完成后会自动显示在用户的前端页面")
    public Function<MindMapRequest, String> asyncMindMapTool() {
        return request -> {
            // 1. 触发真正的耗时生成任务(甩给后台线程池,立刻放行当前线程)
            generateMermaidCodeAsync(request.topic(), request.structuralHints());
            
            // 2. 立刻返回一个安抚性文案给主 Agent,主 Agent 拿到这个就不卡了
            return "SUCCESS: 绘图任务已提交至后台 Mermaid Agent。";
        };
    }

    // 真正耗时的子 Agent 调用
    @Async 
    public void generateMermaidCodeAsync(String topic, String hints) {
        // 让子模型专心写代码
        String mermaidCode = mermaidAgentClient.prompt()
                .system("你是一个专业的 Mermaid 代码生成器。只输出合法的 graph TD 代码,不要多余的废话。")
                .user("主题:" + topic + "\n结构提示:" + hints)
                .call()
                .content();

        // 拿着写好的代码,通过旁路漏斗,直接空降给前端!
        sseDispatcher.dispatch("mermaid_chart", mermaidCode);
    }
}

这样写还有一个好处,我们可以更加方便的对子 Agent 的生成结果进行校验,要知道,一旦生成的结果不能正确的被前端渲染成图表,那么在用户的页面的后果是灾难级别的。

然后在主干道中,需要对旁路的流进行合并然后一并发送给前端:

@RestController
public class ChatController {
    // ... 注入依赖

    @GetMapping(value = "/ai/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> chat(String prompt) {
        
        // 主路:主 Agent 实时文字输出 (加上 tool 配置)
        Flux<ServerSentEvent<String>> mainAgentStream = mainChatClient.prompt()
                .user(prompt)
                .tools("asyncMindMapTool") // 给它配上我们写的异步工具
                .stream().content()
                .map(text -> ServerSentEvent.<String>builder()
                        .event("message")
                        .data(text)
                        .build());

        // 旁路:获取全局漏斗里的独立推送(比如画好的 Mermaid 代码)
        Flux<ServerSentEvent<String>> sideChannelStream = sseDispatcher.getStream();

        // 合并两条流并返回。任意一条流有数据,前端都能立刻收到!
        return Flux.merge(mainAgentStream, sideChannelStream);
    }
}

然后前端根据不同 EVENT 将流给分离出来,分别进行展示:

const sse = new EventSource('/ai/chat?prompt=讲解微服务架构并画图');

// 监听主 Agent 的文字解说,实时打字输出
sse.addEventListener('message', (e) => {
    chatBox.innerHTML += e.data;
});

// 监听 Mermaid Agent 的异步突袭推送!
sse.addEventListener('mermaid_chart', (e) => {
    const code = e.data;
    // 收到代码后,调用前端的 mermaid.js 库进行渲染
    renderMermaidToCanvas(code); 
});

实现二:多用户、多工具级别系统

在上面的实现,有二个问题:

  1. 全局 sink 环境是公共的,所有用户的异步流都会挤到一块。
  2. 主 Agent 每多一个工具调用,前端就要多写一个解析流。

两个问题其实都好解决。sink 是公共的,就为每个会话都创建一个 sink ,然后放入一个 Map 中。同时将所有的非主 Agent 的 EVENT 都收敛为 tool_output,然后添加一个标识字段,前端统一接收后只需要根据不同的表示选择不同的处理器即可。

首先就是改造过后的会话管理器 SseDispatcher :

import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class SseSessionManager {

    // 🌟 核心:一个线程安全的 Map,Key 是对唯一话 ID,Value 是该对话专属的 Sink
    private final Map<String, Sinks.Many<ServerSentEvent<String>>> sessionSinks = new ConcurrentHashMap<>();

    // 1. 建立连接:前端连进来时,为他动态创建一根专属水管
    public Flux<ServerSentEvent<String>> subscribe(String sessionId) {
        // 使用 unicast() 单播,因为一根管子只对应一个前端浏览器
        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
        sessionSinks.put(sessionId, sink);
        
        System.out.println("🔗 会话 " + sessionId + " 已建立旁路连接");

        return sink.asFlux()
                // 🧹 极其重要:当浏览器断开连接时,自动清理 Map,防止内存泄漏!
                .doFinally(signalType -> {
                    sessionSinks.remove(sessionId);
                    System.out.println("❌ 会话 " + sessionId + " 连接断开,释放资源");
                });
    }

    // 2. 定向推送:后台工具干完活,根据 sessionId 找到那根管子倒水
    public void dispatch(String sessionId, String eventName, String data) {
        Sinks.Many<ServerSentEvent<String>> sink = sessionSinks.get(sessionId);
        if (sink != null) {
            sink.tryEmitNext(ServerSentEvent.<String>builder()
                    .event(eventName)
                    .data(data)
                    .build());
        } else {
            System.err.println("⚠️ 找不到会话 " + sessionId + " 的通道,可能已断线");
        }
    }
}

可以使用 SessionId 作为 key,并为每个实现设置好 eventName ,即:sseSessionManager.dispatch(sessionId, "mermaid", code) 就可以实现安全的数据隔离了。

在上面的布局中,我们不仅考虑到的 Sink 被公用的问题,还添加一个 eventName 的参数,这个参数,并不是 SSE 中的 event 标签。所有此类事件的 event 都为 tool_output,前端从该时间监听到数据后,就可以通过刚才 eventName 参数将展示的任务交给不同的组件:

// 1. 维护一个组件映射表(把字符串映射到具体的渲染函数/组件)
const WidgetRegistry = {
    'mermaid': renderMermaidChart,
    'weather': renderWeatherCard,
    'stock':   renderStockKLine,
    'default': renderUnknownWidget
};

const sse = new EventSource('/ai/chat?sessionId=user123');

// 2. 永远只需要这一个监听器!
sse.addEventListener('tool_output', (e) => {
    // 解析后端传来的统一结构
    const payload = JSON.parse(e.data);
    
    // 从映射表中找到对应的渲染器
    const renderFunction = WidgetRegistry[payload.widgetType] || WidgetRegistry['default'];
    
    // 把数据交给具体的渲染器去画 UI
    renderFunction(payload.content);
});

小码农 & GPT调教糕手
最后更新于 2026-06-06