Skip to content

Commit 15db6d5

Browse files
committed
Add test for asgi streaming response
1 parent e17eef5 commit 15db6d5

File tree

7 files changed

+274
-200
lines changed

7 files changed

+274
-200
lines changed

src/pyodide/internal/snapshot.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ async function importJsModulesFromSnapshot(
584584
type CustomSerialized =
585585
| { pyodide_entrypoint_helper: true }
586586
| { cloudflare_compat_flags: true }
587+
| { waitUntil: true }
587588
| SerializedJsModule;
588589
/**
589590
* Global objects that need a custom serializer
@@ -602,6 +603,11 @@ function getHiwireSerializer(
602603
return { pyodide_entrypoint_helper: true };
603604
} else if (obj === globalObj.cloudflare_compat_flags) {
604605
return { cloudflare_compat_flags: true };
606+
} else if (
607+
obj ===
608+
globalObj.pyodide_entrypoint_helper.cloudflareWorkersModule.waitUntil
609+
) {
610+
return { waitUntil: true };
605611
}
606612
const serializedModule = maybeSerializeJsModule(obj, modules);
607613
if (serializedModule) {
@@ -619,6 +625,9 @@ function getHiwireDeserializer(
619625
return globalObj.pyodide_entrypoint_helper;
620626
} else if ('cloudflare_compat_flags' in obj) {
621627
return globalObj.cloudflare_compat_flags;
628+
} else if ('waitUntil' in obj) {
629+
return globalObj.pyodide_entrypoint_helper.cloudflareWorkersModule
630+
.waitUntil;
622631
}
623632
if ('jsModule' in obj) {
624633
return deserializeJsModule(obj, JS_MODULES);

src/pyodide/internal/workers-api/src/asgi.py

Lines changed: 38 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any
77

88
import js
9-
from workers import Context, Request
9+
from workers import Context, Request, waitUntil
1010

1111
ASGI = {"spec_version": "2.0", "version": "3.0"}
1212
logger = logging.getLogger("asgi")
@@ -124,9 +124,10 @@ async def process_request(
124124
status = None
125125
headers = None
126126
result = Future()
127-
is_sse = False
128127
finished_response = Event()
129-
use_streaming = ctx is not None
128+
129+
# Streaming state — initialized lazily on first body chunk with more_body=True.
130+
writer = None
130131

131132
receive_queue = Queue()
132133
if req.body:
@@ -149,73 +150,52 @@ async def receive():
149150
message = {"type": "http.disconnect"}
150151
return message
151152

152-
# When ctx is available we can stream via a TransformStream: the Response
153-
# wraps the readable side and ctx.waitUntil keeps the worker alive while
154-
# chunks are written. Without ctx we buffer all chunks in Python and
155-
# build the Response from the complete body after the app finishes.
156-
if use_streaming:
157-
transform_stream = TransformStream.new()
158-
readable = transform_stream.readable
159-
writer = transform_stream.writable.getWriter()
160-
161-
# Buffered body chunks when not streaming
162-
body_chunks: list[bytes] = []
163-
164153
async def send(got):
165154
nonlocal status
166155
nonlocal headers
167-
nonlocal is_sse
156+
nonlocal writer
168157

169158
if got["type"] == "http.response.start":
170159
status = got["status"]
171160
# Like above, we need to convert byte-pairs into string explicitly.
172161
headers = [(k.decode(), v.decode()) for k, v in got["headers"]]
173-
# Track SSE for backwards-compatible error when ctx is missing
174-
for k, v in headers:
175-
if k.lower() == "content-type" and v.lower().startswith(
176-
"text/event-stream"
177-
):
178-
is_sse = True
179-
break
180-
if use_streaming:
181-
# Return the response immediately so the runtime can start
182-
# consuming body chunks as they are written.
183-
resp = Response.new(
184-
readable, headers=Object.fromEntries(headers), status=status
185-
)
186-
result.set_result(resp)
187162

188163
elif got["type"] == "http.response.body":
189164
body = got["body"]
190165
more_body = got.get("more_body", False)
191166

192-
if use_streaming:
193-
# Convert body to JS buffer and write to the stream
194-
px = create_proxy(body)
195-
buf = px.getBuffer()
196-
px.destroy()
197-
198-
await writer.write(buf.data)
167+
if writer is not None:
168+
# Already in streaming mode — write chunk to the stream.
169+
with acquire_js_buffer(body) as jsbytes:
170+
await writer.write(jsbytes)
199171
if not more_body:
200172
await writer.close()
201173
finished_response.set()
174+
175+
elif more_body:
176+
# First body chunk with more data coming — switch to streaming.
177+
# Create a TransformStream so the runtime can start consuming
178+
# body chunks as they are written.
179+
transform_stream = TransformStream.new()
180+
readable = transform_stream.readable
181+
writer = transform_stream.writable.getWriter()
182+
resp = Response.new(
183+
readable, headers=Object.fromEntries(headers), status=status
184+
)
185+
result.set_result(resp)
186+
with acquire_js_buffer(body) as jsbytes:
187+
await writer.write(jsbytes)
188+
202189
else:
203-
# Buffer chunks in Python to avoid TransformStream deadlock
204-
# TODO(soon): This is inefficident, make `ctx` mandatory and let
205-
# the runtime handle the streaming.
206-
body_chunks.append(body)
207-
if not more_body:
208-
full_body = b"".join(body_chunks)
209-
px = create_proxy(full_body)
210-
buf = px.getBuffer()
211-
px.destroy()
212-
resp = Response.new(
213-
buf.data,
214-
headers=Object.fromEntries(headers),
215-
status=status,
216-
)
217-
result.set_result(resp)
218-
finished_response.set()
190+
# Complete body in a single chunk
191+
px = create_proxy(body)
192+
buf = px.getBuffer()
193+
px.destroy()
194+
resp = Response.new(
195+
buf.data, headers=Object.fromEntries(headers), status=status
196+
)
197+
result.set_result(resp)
198+
finished_response.set()
219199

220200
# Run the application in the background
221201
async def run_app():
@@ -228,7 +208,7 @@ async def run_app():
228208
except Exception as e:
229209
if not result.done():
230210
result.set_exception(e)
231-
if use_streaming:
211+
if writer is not None:
232212
await writer.close()
233213
finished_response.set()
234214
else:
@@ -237,24 +217,10 @@ async def run_app():
237217
logger.exception("Exception in ASGI application after response started")
238218

239219
# Create task to run the application in the background
240-
app_task = create_task(run_app())
241-
242-
# Wait for the result (the response)
243-
response = await result
244-
245-
if use_streaming:
246-
# Let the application continue running in the background to stream
247-
# the response body via the TransformStream.
248-
if is_sse and ctx is None:
249-
raise RuntimeError(
250-
"Server-Side-Events require ctx to be passed to asgi.fetch"
251-
)
252-
ctx.waitUntil(create_proxy(app_task))
253-
else:
254-
# Without ctx the response was built from buffered bytes, so the
255-
# app task has already completed or will complete momentarily.
256-
await app_task
257-
return response
220+
app_task = create_proxy(create_task(run_app()))
221+
waitUntil(app_task)
222+
app_task.destroy()
223+
return await result
258224

259225

260226
async def process_websocket(app: Any, req: "Request | js.Request") -> js.Response:

src/pyodide/python-entrypoint-helper.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ function patchWaitUntil(ctx: {
7272

7373
export type PyodideEntrypointHelper = {
7474
doAnImport: (mod: string) => Promise<any>;
75-
cloudflareWorkersModule: { env: any };
75+
cloudflareWorkersModule: {
76+
env: any;
77+
waitUntil: (p: Promise<void> | PyFuture<void>) => void;
78+
};
7679
cloudflareSocketsModule: any;
7780
workerEntrypoint: any;
7881
patchWaitUntil: typeof patchWaitUntil;

src/workerd/server/tests/python/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ py_wd_test("env-param")
2525

2626
py_wd_test("asgi")
2727

28-
py_wd_test("asgi-sse")
29-
3028
py_wd_test("random")
3129

3230
py_wd_test("subdirectory")

src/workerd/server/tests/python/asgi-sse/asgi-sse.wd-test

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/workerd/server/tests/python/asgi-sse/worker.py

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)