diff --git a/packages/connect-node/src/node-universal-handler.ts b/packages/connect-node/src/node-universal-handler.ts index 1c650b9c8..72ddfd636 100644 --- a/packages/connect-node/src/node-universal-handler.ts +++ b/packages/connect-node/src/node-universal-handler.ts @@ -138,19 +138,23 @@ export async function universalResponseToNodeResponse( universalResponse: UniversalServerResponse, nodeResponse: NodeServerResponse, ): Promise { + const it = universalResponse.body?.[Symbol.asyncIterator](); + let isWriteError = false; try { - if (universalResponse.body !== undefined) { - for await (const chunk of universalResponse.body) { - // we deliberately send headers *in* this loop, not before, - // because we have to give the implementation a chance to - // set response headers - if (!nodeResponse.headersSent) { - nodeResponse.writeHead( - universalResponse.status, - webHeaderToNodeHeaders(universalResponse.header), - ); - } - await write(nodeResponse, chunk); + if (it !== undefined) { + let chunk = await it.next(); + isWriteError = true; + // we deliberately send headers after first read, not before, + // because we have to give the implementation a chance to + // set response headers + nodeResponse.writeHead( + universalResponse.status, + webHeaderToNodeHeaders(universalResponse.header), + ); + isWriteError = false; + for (; chunk.done !== true; chunk = await it.next()) { + isWriteError = true; + await write(nodeResponse, chunk.value); if ( "flush" in nodeResponse && typeof nodeResponse.flush == "function" @@ -166,6 +170,7 @@ export async function universalResponseToNodeResponse( // https://github.com/expressjs/compression/blob/ad5113b98cafe1382a0ece30bb4673707ac59ce7/index.js#L70 nodeResponse.flush(); } + isWriteError = false; } } if (!nodeResponse.headersSent) { @@ -186,7 +191,13 @@ export async function universalResponseToNodeResponse( nodeResponse.end(); }); } catch (e) { + // Report write errors to the handler. + if (isWriteError) { + it?.throw?.(e).catch(() => {}); + } throw connectErrorFromNodeReason(e); + } finally { + it?.return?.().catch(() => {}); } }