Skip to content

Commit bd2d60a

Browse files
committed
Add test for asgi streaming response
1 parent 2352651 commit bd2d60a

File tree

8 files changed

+266
-202
lines changed

8 files changed

+266
-202
lines changed

src/pyodide/internal/serializeJsModule.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export function createImportProxy(
6363
if (!IS_CREATING_SNAPSHOT) {
6464
return mod;
6565
}
66-
if (!mod || typeof mod !== 'object') {
66+
if (!mod || (typeof mod !== 'object' && typeof mod !== 'function')) {
6767
return mod;
6868
}
6969
return new Proxy(mod, {

src/pyodide/internal/workers-api/src/asgi.py

Lines changed: 36 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any
77

88
import js
9-
from workers import Context, Request
9+
from workers import Context, Request, wait_until
1010

1111
ASGI = {"spec_version": "2.0", "version": "3.0"}
1212
logger = logging.getLogger("asgi")
@@ -124,9 +124,10 @@ async def process_request(
124124
status = None
125125
headers = None
126126
result = Future()
127-
is_sse = False
128127
finished_response = Event()
129-
use_streaming = ctx is not None
128+
129+
# Streaming state — initialized lazily on first body chunk with more_body=True.
130+
writer = None
130131

131132
receive_queue = Queue()
132133
if req.body:
@@ -149,73 +150,50 @@ async def receive():
149150
message = {"type": "http.disconnect"}
150151
return message
151152

152-
# When ctx is available we can stream via a TransformStream: the Response
153-
# wraps the readable side and ctx.waitUntil keeps the worker alive while
154-
# chunks are written. Without ctx we buffer all chunks in Python and
155-
# build the Response from the complete body after the app finishes.
156-
if use_streaming:
157-
transform_stream = TransformStream.new()
158-
readable = transform_stream.readable
159-
writer = transform_stream.writable.getWriter()
160-
161-
# Buffered body chunks when not streaming
162-
body_chunks: list[bytes] = []
163-
164153
async def send(got):
165154
nonlocal status
166155
nonlocal headers
167-
nonlocal is_sse
156+
nonlocal writer
168157

169158
if got["type"] == "http.response.start":
170159
status = got["status"]
171160
# Like above, we need to convert byte-pairs into string explicitly.
172161
headers = [(k.decode(), v.decode()) for k, v in got["headers"]]
173-
# Track SSE for backwards-compatible error when ctx is missing
174-
for k, v in headers:
175-
if k.lower() == "content-type" and v.lower().startswith(
176-
"text/event-stream"
177-
):
178-
is_sse = True
179-
break
180-
if use_streaming:
181-
# Return the response immediately so the runtime can start
182-
# consuming body chunks as they are written.
183-
resp = Response.new(
184-
readable, headers=Object.fromEntries(headers), status=status
185-
)
186-
result.set_result(resp)
187162

188163
elif got["type"] == "http.response.body":
189164
body = got["body"]
190165
more_body = got.get("more_body", False)
191166

192-
if use_streaming:
193-
# Convert body to JS buffer and write to the stream
194-
px = create_proxy(body)
195-
buf = px.getBuffer()
196-
px.destroy()
197-
198-
await writer.write(buf.data)
167+
if writer is not None:
168+
# Already in streaming mode — write chunk to the stream.
169+
with acquire_js_buffer(body) as jsbytes:
170+
await writer.write(jsbytes)
199171
if not more_body:
200172
await writer.close()
201173
finished_response.set()
174+
elif more_body:
175+
# First body chunk with more data coming — switch to streaming.
176+
# Create a TransformStream so the runtime can start consuming
177+
# body chunks as they are written.
178+
transform_stream = TransformStream.new()
179+
readable = transform_stream.readable
180+
writer = transform_stream.writable.getWriter()
181+
resp = Response.new(
182+
readable, headers=Object.fromEntries(headers), status=status
183+
)
184+
result.set_result(resp)
185+
with acquire_js_buffer(body) as jsbytes:
186+
await writer.write(jsbytes)
202187
else:
203-
# Buffer chunks in Python to avoid TransformStream deadlock
204-
# TODO(soon): This is inefficident, make `ctx` mandatory and let
205-
# the runtime handle the streaming.
206-
body_chunks.append(body)
207-
if not more_body:
208-
full_body = b"".join(body_chunks)
209-
px = create_proxy(full_body)
210-
buf = px.getBuffer()
211-
px.destroy()
212-
resp = Response.new(
213-
buf.data,
214-
headers=Object.fromEntries(headers),
215-
status=status,
216-
)
217-
result.set_result(resp)
218-
finished_response.set()
188+
# Complete body in a single chunk
189+
px = create_proxy(body)
190+
buf = px.getBuffer()
191+
px.destroy()
192+
resp = Response.new(
193+
buf.data, headers=Object.fromEntries(headers), status=status
194+
)
195+
result.set_result(resp)
196+
finished_response.set()
219197

220198
# Run the application in the background
221199
async def run_app():
@@ -228,7 +206,7 @@ async def run_app():
228206
except Exception as e:
229207
if not result.done():
230208
result.set_exception(e)
231-
if use_streaming:
209+
if writer is not None:
232210
await writer.close()
233211
finished_response.set()
234212
else:
@@ -237,24 +215,10 @@ async def run_app():
237215
logger.exception("Exception in ASGI application after response started")
238216

239217
# Create task to run the application in the background
240-
app_task = create_task(run_app())
241-
242-
# Wait for the result (the response)
243-
response = await result
244-
245-
if use_streaming:
246-
# Let the application continue running in the background to stream
247-
# the response body via the TransformStream.
248-
if is_sse and ctx is None:
249-
raise RuntimeError(
250-
"Server-Side-Events require ctx to be passed to asgi.fetch"
251-
)
252-
ctx.waitUntil(create_proxy(app_task))
253-
else:
254-
# Without ctx the response was built from buffered bytes, so the
255-
# app task has already completed or will complete momentarily.
256-
await app_task
257-
return response
218+
app_task = create_proxy(create_task(run_app()))
219+
wait_until(app_task)
220+
app_task.destroy()
221+
return await result
258222

259223

260224
async def process_websocket(app: Any, req: "Request | js.Request") -> js.Response:

src/pyodide/internal/workers-api/src/workers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
"patch_env",
5252
"python_from_rpc",
5353
"python_to_rpc",
54+
"wait_until",
5455
"waitUntil",
5556
]
5657

@@ -59,7 +60,7 @@ def __getattr__(key):
5960
if key == "env":
6061
cloudflare_workers = import_from_javascript("cloudflare:workers")
6162
return cloudflare_workers.env
62-
if key == "waitUntil":
63+
if key in ("wait_until", "waitUntil"):
6364
cloudflare_workers = import_from_javascript("cloudflare:workers")
6465
return cloudflare_workers.waitUntil
6566
raise AttributeError(f"module {__name__!r} has no attribute {key!r}")

src/pyodide/python-entrypoint-helper.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ function patchWaitUntil(ctx: {
7272

7373
export type PyodideEntrypointHelper = {
7474
doAnImport: (mod: string) => Promise<any>;
75-
cloudflareWorkersModule: { env: any };
75+
cloudflareWorkersModule: {
76+
env: any;
77+
waitUntil: (p: Promise<void> | PyFuture<void>) => void;
78+
};
7679
cloudflareSocketsModule: any;
7780
workerEntrypoint: any;
7881
patchWaitUntil: typeof patchWaitUntil;

src/workerd/server/tests/python/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ py_wd_test("env-param")
2525

2626
py_wd_test("asgi")
2727

28-
py_wd_test("asgi-sse")
29-
3028
py_wd_test("random")
3129

3230
py_wd_test("subdirectory")

src/workerd/server/tests/python/asgi-sse/asgi-sse.wd-test

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/workerd/server/tests/python/asgi-sse/worker.py

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)