Hibernate-able transport#128
Conversation
🦋 Changeset detectedLatest commit: 1cbd675 The changes in this PR will be included in the next version bump. This PR includes changesets to release 2 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
After some discussion I'm going to try to move the server definition into Agent's |
geelen
left a comment
There was a problem hiding this comment.
This is actually way better than I thought it was gonna look like. So we have no need for SSEEdgeTransport at all? That's pretty great tbqh.
If you make init an alias for onStart this should start working, right?
| ws.addEventListener("message", async (event) => { | ||
| try { | ||
| // Send the message as an SSE event | ||
| const messageText = `event: message\ndata: ${event.data}\n\n`; |
There was a problem hiding this comment.
I'd maybe consider first decoding the event data here, making sure it's JSON and has the format {ok: true, data: <mcp-response-here> }, rather than passing it through unchecked? I suppose it's not like a Response so you wouldn't get error messages or status codes here so maybe it's not necessary, but it always feels weird to sit in the middle of things and not inspect them?
|
A lot of this boilerplate is already handled by the PartyServer class, can we just use that? |
That's part of what's required. We also need to persist
I think @geelen had some concerns about directly subclassing Agent / PartyServer, but I don't remember exactly what they were. I've started trying to sketch out what that would look like in |
| Props extends Record<string, unknown> = Record<string, unknown>, | ||
| > extends DurableObject<Env> { | ||
| #status: "zero" | "starting" | "started" = "zero"; | ||
| #transport?: McpTransport; |
There was a problem hiding this comment.
This assumes there is only one connection / McpAgent. That might work for now, but we should likely make this more flexible in the future.
| async #initialize(): Promise<void> { | ||
| await this.ctx.blockConcurrencyWhile(async () => { | ||
| this.#status = "starting"; | ||
| await this.onStart(); | ||
| this.#status = "started"; | ||
| }); | ||
| } |
There was a problem hiding this comment.
Shamelessly stolen from PartyServer: https://github.com/threepointone/partyserver/blob/main/packages/partyserver/src/index.ts#L397-L403
a7f13e3 to
d826c4f
Compare
|
Marking as ready for review. Testing locally this seems to work well! @threepointone I'd love your eyes here. I suspect that there's stuff we can further simplify, but guidance on where and how would be very welcome 🙏 I think we want to keep from inheriting Agent / PartyServer directly for now just to keep the scope on this manageable. cc @geelen for verification on that |
geelen
left a comment
There was a problem hiding this comment.
A few comments still left outstanding but otherwise lgtm, keen to get this out there.
|
ugh, I merged #125 that moved some files around, and now I'm struggling to do a rebase or merge to fix the conflicts. Can you help please? Sorry! Happy to pair. |
Mostly working, but fails if the DO hibernates because the websocket server is then undefined Ran into bigger issues once hibernation is in play. Going to open a draft for feedback on how to approach this. Make it work with hibernation These changes are not pretty, but they do seem to allow it to work with hibernation. We have to: - Make sure to serialize props to state or they are gone when we wake up from hibernation - Rerun our init method so that the server has all the tools, etc defined. This likely introduces issues depending on what the user does in this method. Just defining the server seems to work. - Re-connect the server to the transport (the DO instance itself for now, though I think we can simplify this a bit) - Make sure that when we wake up we re-run our onStart. This is copied shamelessly from PartyServer Overall I'm not super happy with the complexity here. I think Sunil's suggestion to just use PartyServer directly is probably the right move. Start sketching out what this would look like if we directly subclassed Agent Factor out the transport into a separate class Put props into DO storage instead of agent state Cleanup Remove SSEEdge and mcp-next. Clean up to prep for merge let -> const Bring back sseEdge for rebase purposes
c6eaa15 to
1b9d611
Compare
|
@threepointone I brought back the transport file and squished all my commits and that made the rebase a lot easier. This should be ready to review now |
|
awesome I'll review asap |
threepointone
left a comment
There was a problem hiding this comment.
just a couple of comments, but tentative approval anyway. happy to land whenever you say.
|
I'm going to land and ship this today so we can play with it! as soon as CI does its thing |
| "extends": "../../tsconfig.base.json" | ||
| "extends": "../../tsconfig.base.json", | ||
| "compilerOptions": { | ||
| "target": "ES2021" |
There was a problem hiding this comment.
Note that I didn't change the target in the base config, it was already ES2022, so this should be a change in target for the playground.
I changed the lib though I'm not 100% on the implications of that
Current problem: When a user adds an McpAgent to a session with an LLM, the DO is always active during the whole session, even if they never call it.
This modifies the MCP Transport to allow the Durable Object to hibernate. In order to facilitate that, we need a much simpler Transport class that is not based around HTTP+SSEs. https://developers.cloudflare.com/durable-objects/best-practices/websockets/#websocket-hibernation-api
When a request comes in on
/ssethe worker initializes a websocket connection to the appropriate DO for the session, and returns an SSE stream back to the browser. All responses flow over this stream.Incoming messages on
/sse/messagepass the message to the DO, which allows the MCP server to handle it, sending the response back of WS -> SSE -> Client.Despite the relative complexity, this seems to work pretty well!