Skip to content

Commit b6cde3b

Browse files
committed
Add test for asgi streaming response
1 parent 2352651 commit b6cde3b

File tree

8 files changed

+272
-203
lines changed

8 files changed

+272
-203
lines changed

src/pyodide/internal/serializeJsModule.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export function createImportProxy(
6363
if (!IS_CREATING_SNAPSHOT) {
6464
return mod;
6565
}
66-
if (!mod || typeof mod !== 'object') {
66+
if (!mod || (typeof mod !== 'object' && typeof mod !== 'function')) {
6767
return mod;
6868
}
6969
return new Proxy(mod, {

src/pyodide/internal/workers-api/src/asgi.py

Lines changed: 42 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any
77

88
import js
9-
from workers import Context, Request
9+
from workers import Context, Request, wait_until
1010

1111
ASGI = {"spec_version": "2.0", "version": "3.0"}
1212
logger = logging.getLogger("asgi")
@@ -115,7 +115,12 @@ async def send(got):
115115

116116

117117
async def process_request(
118-
app: Any, req: "Request | js.Request", env: Any, ctx: Context | None
118+
app: Any,
119+
req: "Request | js.Request",
120+
env: Any,
121+
# added for waitUntil, but not used anymore
122+
# TODO(later): remove this parameter after unvendoring Python SDK from workerd
123+
ctx: Context | None,
119124
) -> js.Response:
120125
from js import Object, Response, TransformStream
121126

@@ -124,9 +129,10 @@ async def process_request(
124129
status = None
125130
headers = None
126131
result = Future()
127-
is_sse = False
128132
finished_response = Event()
129-
use_streaming = ctx is not None
133+
134+
# Streaming state — initialized lazily on first body chunk with more_body=True.
135+
writer = None
130136

131137
receive_queue = Queue()
132138
if req.body:
@@ -149,73 +155,50 @@ async def receive():
149155
message = {"type": "http.disconnect"}
150156
return message
151157

152-
# When ctx is available we can stream via a TransformStream: the Response
153-
# wraps the readable side and ctx.waitUntil keeps the worker alive while
154-
# chunks are written. Without ctx we buffer all chunks in Python and
155-
# build the Response from the complete body after the app finishes.
156-
if use_streaming:
157-
transform_stream = TransformStream.new()
158-
readable = transform_stream.readable
159-
writer = transform_stream.writable.getWriter()
160-
161-
# Buffered body chunks when not streaming
162-
body_chunks: list[bytes] = []
163-
164158
async def send(got):
165159
nonlocal status
166160
nonlocal headers
167-
nonlocal is_sse
161+
nonlocal writer
168162

169163
if got["type"] == "http.response.start":
170164
status = got["status"]
171165
# Like above, we need to convert byte-pairs into string explicitly.
172166
headers = [(k.decode(), v.decode()) for k, v in got["headers"]]
173-
# Track SSE for backwards-compatible error when ctx is missing
174-
for k, v in headers:
175-
if k.lower() == "content-type" and v.lower().startswith(
176-
"text/event-stream"
177-
):
178-
is_sse = True
179-
break
180-
if use_streaming:
181-
# Return the response immediately so the runtime can start
182-
# consuming body chunks as they are written.
183-
resp = Response.new(
184-
readable, headers=Object.fromEntries(headers), status=status
185-
)
186-
result.set_result(resp)
187167

188168
elif got["type"] == "http.response.body":
189169
body = got["body"]
190170
more_body = got.get("more_body", False)
191171

192-
if use_streaming:
193-
# Convert body to JS buffer and write to the stream
194-
px = create_proxy(body)
195-
buf = px.getBuffer()
196-
px.destroy()
197-
198-
await writer.write(buf.data)
172+
if writer is not None:
173+
# Already in streaming mode — write chunk to the stream.
174+
with acquire_js_buffer(body) as jsbytes:
175+
await writer.write(jsbytes)
199176
if not more_body:
200177
await writer.close()
201178
finished_response.set()
179+
elif more_body:
180+
# First body chunk with more data coming — switch to streaming.
181+
# Create a TransformStream so the runtime can start consuming
182+
# body chunks as they are written.
183+
transform_stream = TransformStream.new()
184+
readable = transform_stream.readable
185+
writer = transform_stream.writable.getWriter()
186+
resp = Response.new(
187+
readable, headers=Object.fromEntries(headers), status=status
188+
)
189+
result.set_result(resp)
190+
with acquire_js_buffer(body) as jsbytes:
191+
await writer.write(jsbytes)
202192
else:
203-
# Buffer chunks in Python to avoid TransformStream deadlock
204-
# TODO(soon): This is inefficident, make `ctx` mandatory and let
205-
# the runtime handle the streaming.
206-
body_chunks.append(body)
207-
if not more_body:
208-
full_body = b"".join(body_chunks)
209-
px = create_proxy(full_body)
210-
buf = px.getBuffer()
211-
px.destroy()
212-
resp = Response.new(
213-
buf.data,
214-
headers=Object.fromEntries(headers),
215-
status=status,
216-
)
217-
result.set_result(resp)
218-
finished_response.set()
193+
# Complete body in a single chunk
194+
px = create_proxy(body)
195+
buf = px.getBuffer()
196+
px.destroy()
197+
resp = Response.new(
198+
buf.data, headers=Object.fromEntries(headers), status=status
199+
)
200+
result.set_result(resp)
201+
finished_response.set()
219202

220203
# Run the application in the background
221204
async def run_app():
@@ -228,7 +211,7 @@ async def run_app():
228211
except Exception as e:
229212
if not result.done():
230213
result.set_exception(e)
231-
if use_streaming:
214+
if writer is not None:
232215
await writer.close()
233216
finished_response.set()
234217
else:
@@ -237,24 +220,10 @@ async def run_app():
237220
logger.exception("Exception in ASGI application after response started")
238221

239222
# Create task to run the application in the background
240-
app_task = create_task(run_app())
241-
242-
# Wait for the result (the response)
243-
response = await result
244-
245-
if use_streaming:
246-
# Let the application continue running in the background to stream
247-
# the response body via the TransformStream.
248-
if is_sse and ctx is None:
249-
raise RuntimeError(
250-
"Server-Side-Events require ctx to be passed to asgi.fetch"
251-
)
252-
ctx.waitUntil(create_proxy(app_task))
253-
else:
254-
# Without ctx the response was built from buffered bytes, so the
255-
# app task has already completed or will complete momentarily.
256-
await app_task
257-
return response
223+
app_task = create_proxy(create_task(run_app()))
224+
wait_until(app_task)
225+
app_task.destroy()
226+
return await result
258227

259228

260229
async def process_websocket(app: Any, req: "Request | js.Request") -> js.Response:

src/pyodide/internal/workers-api/src/workers/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,15 @@
5252
"python_from_rpc",
5353
"python_to_rpc",
5454
"waitUntil",
55+
"wait_until",
5556
]
5657

5758

5859
def __getattr__(key):
5960
if key == "env":
6061
cloudflare_workers = import_from_javascript("cloudflare:workers")
6162
return cloudflare_workers.env
62-
if key == "waitUntil":
63+
if key in ("wait_until", "waitUntil"):
6364
cloudflare_workers = import_from_javascript("cloudflare:workers")
6465
return cloudflare_workers.waitUntil
6566
raise AttributeError(f"module {__name__!r} has no attribute {key!r}")

src/pyodide/python-entrypoint-helper.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ function patchWaitUntil(ctx: {
7272

7373
export type PyodideEntrypointHelper = {
7474
doAnImport: (mod: string) => Promise<any>;
75-
cloudflareWorkersModule: { env: any };
75+
cloudflareWorkersModule: {
76+
env: any;
77+
waitUntil: (p: Promise<void> | PyFuture<void>) => void;
78+
};
7679
cloudflareSocketsModule: any;
7780
workerEntrypoint: any;
7881
patchWaitUntil: typeof patchWaitUntil;

src/workerd/server/tests/python/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ py_wd_test("env-param")
2525

2626
py_wd_test("asgi")
2727

28-
py_wd_test("asgi-sse")
29-
3028
py_wd_test("random")
3129

3230
py_wd_test("subdirectory")

src/workerd/server/tests/python/asgi-sse/asgi-sse.wd-test

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/workerd/server/tests/python/asgi-sse/worker.py

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)