概述
最近来使用 SpringAI 去开发一个高校教学系统,为了实现个性化的教学,特意引入了 Mermaid 思维导图和基于实训项目和学生画像个性化出题的功能。并且一个主 Agent 基于当前的教学场景来动态的使用这些工具,但是这样就有一个问题,生成个性化内容的时候,主 Agent 是阻塞的,十分影响用户的体验,这个过程就算是流式输出了 Mermaid 代码,在用户的阅读上也没有很大的提升。
因此,我们设计一套基于 tools 调用、Sinks 和 SSE 旁路输出的解决方案,其大致的流程如下:
- 主 Agent 触发工具:主 Agent 决定需要思维导图,调用
drawMindMap工具。 - 工具光速放行:Java 方法内部立刻返回一个占位提示(例如:“已通知后台绘制”),彻底解放主 Agent,让它继续给前端流式输出文字。
- 子 Agent 后台打工:Java 方法内部同时启动一个异步线程 (
@Async或响应式异步调度),去调用真正的 Mermaid Agent。 - Sinks 旁路注入 UI:几秒后,Mermaid Agent 生成完毕。它拿着生成的代码,顺着那个全局的
Sinks漏斗,打上event: mermaid的标签,直接空降推送到前端页面上!
实现一:单用户 demo 级别
在实现上,首先准备一个全局的 Sinks 漏斗:
import org.springframework.stereotype.Component;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Flux;
@Component
public class SseDispatcher {
// 多播漏斗,装载标准的 SSE 事件
private final Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
public void dispatch(String eventName, String data) {
sink.tryEmitNext(ServerSentEvent.<String>builder()
.event(eventName)
.data(data)
.build());
}
public Flux<ServerSentEvent<String>> getStream() {
return sink.asFlux();
}
}然后在工具干的活中,先异步的调用Agent,然后直接返回一个固定的提示词即可:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Description;
import org.springframework.scheduling.annotation.Async;
import java.util.function.Function;
@Configuration
public class MermaidToolConfig {
private final SseDispatcher sseDispatcher;
private final ChatClient mermaidAgentClient; // 专门负责写代码的子模型
public MermaidToolConfig(SseDispatcher sseDispatcher, @Qualifier("codingClient") ChatClient mermaidAgentClient) {
this.sseDispatcher = sseDispatcher;
this.mermaidAgentClient = mermaidAgentClient;
}
public record MindMapRequest(String topic, String structuralHints) {}
@Bean
@Description("当需要配合思维导图解释概念时调用此工具。工具会异步执行,执行完成后会自动显示在用户的前端页面")
public Function<MindMapRequest, String> asyncMindMapTool() {
return request -> {
// 1. 触发真正的耗时生成任务(甩给后台线程池,立刻放行当前线程)
generateMermaidCodeAsync(request.topic(), request.structuralHints());
// 2. 立刻返回一个安抚性文案给主 Agent,主 Agent 拿到这个就不卡了
return "SUCCESS: 绘图任务已提交至后台 Mermaid Agent。";
};
}
// 真正耗时的子 Agent 调用
@Async
public void generateMermaidCodeAsync(String topic, String hints) {
// 让子模型专心写代码
String mermaidCode = mermaidAgentClient.prompt()
.system("你是一个专业的 Mermaid 代码生成器。只输出合法的 graph TD 代码,不要多余的废话。")
.user("主题:" + topic + "\n结构提示:" + hints)
.call()
.content();
// 拿着写好的代码,通过旁路漏斗,直接空降给前端!
sseDispatcher.dispatch("mermaid_chart", mermaidCode);
}
}这样写还有一个好处,我们可以更加方便的对子 Agent 的生成结果进行校验,要知道,一旦生成的结果不能正确的被前端渲染成图表,那么在用户的页面的后果是灾难级别的。
然后在主干道中,需要对旁路的流进行合并然后一并发送给前端:
@RestController
public class ChatController {
// ... 注入依赖
@GetMapping(value = "/ai/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chat(String prompt) {
// 主路:主 Agent 实时文字输出 (加上 tool 配置)
Flux<ServerSentEvent<String>> mainAgentStream = mainChatClient.prompt()
.user(prompt)
.tools("asyncMindMapTool") // 给它配上我们写的异步工具
.stream().content()
.map(text -> ServerSentEvent.<String>builder()
.event("message")
.data(text)
.build());
// 旁路:获取全局漏斗里的独立推送(比如画好的 Mermaid 代码)
Flux<ServerSentEvent<String>> sideChannelStream = sseDispatcher.getStream();
// 合并两条流并返回。任意一条流有数据,前端都能立刻收到!
return Flux.merge(mainAgentStream, sideChannelStream);
}
}然后前端根据不同 EVENT 将流给分离出来,分别进行展示:
const sse = new EventSource('/ai/chat?prompt=讲解微服务架构并画图');
// 监听主 Agent 的文字解说,实时打字输出
sse.addEventListener('message', (e) => {
chatBox.innerHTML += e.data;
});
// 监听 Mermaid Agent 的异步突袭推送!
sse.addEventListener('mermaid_chart', (e) => {
const code = e.data;
// 收到代码后,调用前端的 mermaid.js 库进行渲染
renderMermaidToCanvas(code);
});实现二:多用户、多工具级别系统
在上面的实现,有二个问题:
- 全局 sink 环境是公共的,所有用户的异步流都会挤到一块。
- 主 Agent 每多一个工具调用,前端就要多写一个解析流。
两个问题其实都好解决。sink 是公共的,就为每个会话都创建一个 sink ,然后放入一个 Map 中。同时将所有的非主 Agent 的 EVENT 都收敛为 tool_output,然后添加一个标识字段,前端统一接收后只需要根据不同的表示选择不同的处理器即可。
首先就是改造过后的会话管理器 SseDispatcher :
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SseSessionManager {
// 🌟 核心:一个线程安全的 Map,Key 是对唯一话 ID,Value 是该对话专属的 Sink
private final Map<String, Sinks.Many<ServerSentEvent<String>>> sessionSinks = new ConcurrentHashMap<>();
// 1. 建立连接:前端连进来时,为他动态创建一根专属水管
public Flux<ServerSentEvent<String>> subscribe(String sessionId) {
// 使用 unicast() 单播,因为一根管子只对应一个前端浏览器
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().unicast().onBackpressureBuffer();
sessionSinks.put(sessionId, sink);
System.out.println("🔗 会话 " + sessionId + " 已建立旁路连接");
return sink.asFlux()
// 🧹 极其重要:当浏览器断开连接时,自动清理 Map,防止内存泄漏!
.doFinally(signalType -> {
sessionSinks.remove(sessionId);
System.out.println("❌ 会话 " + sessionId + " 连接断开,释放资源");
});
}
// 2. 定向推送:后台工具干完活,根据 sessionId 找到那根管子倒水
public void dispatch(String sessionId, String eventName, String data) {
Sinks.Many<ServerSentEvent<String>> sink = sessionSinks.get(sessionId);
if (sink != null) {
sink.tryEmitNext(ServerSentEvent.<String>builder()
.event(eventName)
.data(data)
.build());
} else {
System.err.println("⚠️ 找不到会话 " + sessionId + " 的通道,可能已断线");
}
}
}可以使用 SessionId 作为 key,并为每个实现设置好 eventName ,即:sseSessionManager.dispatch(sessionId, "mermaid", code) 就可以实现安全的数据隔离了。
在上面的布局中,我们不仅考虑到的 Sink 被公用的问题,还添加一个 eventName 的参数,这个参数,并不是 SSE 中的 event 标签。所有此类事件的 event 都为 tool_output,前端从该时间监听到数据后,就可以通过刚才 eventName 参数将展示的任务交给不同的组件:
// 1. 维护一个组件映射表(把字符串映射到具体的渲染函数/组件)
const WidgetRegistry = {
'mermaid': renderMermaidChart,
'weather': renderWeatherCard,
'stock': renderStockKLine,
'default': renderUnknownWidget
};
const sse = new EventSource('/ai/chat?sessionId=user123');
// 2. 永远只需要这一个监听器!
sse.addEventListener('tool_output', (e) => {
// 解析后端传来的统一结构
const payload = JSON.parse(e.data);
// 从映射表中找到对应的渲染器
const renderFunction = WidgetRegistry[payload.widgetType] || WidgetRegistry['default'];
// 把数据交给具体的渲染器去画 UI
renderFunction(payload.content);
});
Comments NOTHING