Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/workerd/api/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,12 @@ wd_test(
data = ["url-test.js"],
)

wd_test(
src = "websocket-allow-half-open-test.wd-test",
args = ["--experimental"],
data = ["websocket-allow-half-open-test.js"],
)

wd_test(
src = "websocket-constructor-test.wd-test",
args = ["--experimental"],
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/tests/http-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const unitTests :Workerd.Config = (
( name = "SERVICE", service = "http-test" ),
( name = "CACHE_ENABLED", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "web_socket_auto_reply_to_close"],
)
),
( name = "http-test-cache-option-enabled",
Expand All @@ -23,7 +23,7 @@ const unitTests :Workerd.Config = (
( name = "SERVICE", service = "http-test-cache-option-enabled" ),
( name = "CACHE_ENABLED", json = "true" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_enabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_enabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "web_socket_auto_reply_to_close"],
))
],
);
2 changes: 1 addition & 1 deletion src/workerd/api/tests/tail-worker-test.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/workerd/api/tests/tail-worker-test.wd-test
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const unitTests :Workerd.Config = (
( name = "SERVICE", service = "http-test" ),
( name = "CACHE_ENABLED", json = "false" ),
],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "queues_json_messages", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "queues_json_messages", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "web_socket_auto_reply_to_close"],
streamingTails = ["log", "log-invalid"],
),
),
Expand Down
152 changes: 152 additions & 0 deletions src/workerd/api/tests/websocket-allow-half-open-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright (c) 2026 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import { strictEqual, doesNotThrow } from 'node:assert';

// Test that when allowHalfOpen is false (default with compat flag), a server-initiated
// close sets readyState to CLOSED.
export const autoCloseReplyWhenNotHalfOpen = {
async test() {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

// accept() without options — allowHalfOpen defaults to false with the compat flag.
client.accept();
server.accept();

const closePromise = new Promise((resolve) => {
client.addEventListener('close', (event) => {
// When allowHalfOpen is false, the runtime auto-sends a close reply,
// so both closedIncoming and closedOutgoing are true — readyState should be CLOSED.
resolve({
readyState: client.readyState,
code: event.code,
wasClean: event.wasClean,
});
});
});

// Server initiates close.
server.close(1000, 'server closing');

const result = await closePromise;
strictEqual(result.readyState, WebSocket.CLOSED);
strictEqual(result.code, 1000);
strictEqual(result.wasClean, true);
},
};

// Test that calling close() inside the close event handler after the automatic close
// handshake is silently ignored (doesn't throw). This is the realistic pattern for
// users who are already manually replying to close frames today — when they update
// their compat date and get web_socket_auto_reply_to_close, their existing close()
// call must not break.
export const closeInsideHandlerAfterAutoCloseIsIgnored = {
async test() {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

client.accept();
server.accept();

const closePromise = new Promise((resolve) => {
client.addEventListener('close', (event) => {
// This is what existing user code typically looks like: replying to close
// inside the close handler. With the auto-reply already sent, closedOutgoing
// is true but the native state is still Accepted (tryReleaseNative hasn't
// run yet), so this exercises the closedOutgoing early-return in close().
doesNotThrow(() => {
client.close(1000, 'manual reply inside handler');
});

resolve({
readyState: client.readyState,
code: event.code,
wasClean: event.wasClean,
});
});
});

// Server initiates close; the runtime auto-replies because allowHalfOpen is false.
server.close(1000, 'server closing');

const result = await closePromise;
strictEqual(result.readyState, WebSocket.CLOSED);
strictEqual(result.code, 1000);
strictEqual(result.wasClean, true);
},
};

// Same as above, but calling close() after the handler has returned and the native
// WebSocket has been released. This exercises the state.is<Released>() early-return
// in close(), which is a different code path from the closedOutgoing check above.
export const closeAfterAutoCloseAndReleaseIsIgnored = {
async test() {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

client.accept();
server.accept();

const closePromise = new Promise((resolve) => {
client.addEventListener('close', (event) => {
resolve({
readyState: client.readyState,
code: event.code,
wasClean: event.wasClean,
});
});
});

// Server initiates close; the runtime auto-replies because allowHalfOpen is false.
server.close(1000, 'server closing');

const result = await closePromise;
strictEqual(result.readyState, WebSocket.CLOSED);

// By now tryReleaseNative has run and the state is Released.
// close() should still be silently ignored.
doesNotThrow(() => {
client.close(1000, 'manual reply after release');
});

strictEqual(client.readyState, WebSocket.CLOSED);
},
};

// Test that when allowHalfOpen is true via accept(), a server-initiated close sets
// readyState to CLOSING.
export const halfOpenCloseKeepsClosingState = {
async test() {
const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

// Opt into half-open mode via accept() options.
client.accept({ allowHalfOpen: true });
server.accept();

const closePromise = new Promise((resolve) => {
client.addEventListener('close', (event) => {
// When allowHalfOpen is true, no auto-reply is sent, so only closedIncoming
// is true — readyState should be CLOSING.
resolve({
readyState: client.readyState,
code: event.code,
wasClean: event.wasClean,
});
});
});

// Server initiates close.
server.close(1000, 'server closing');

const result = await closePromise;
strictEqual(result.readyState, WebSocket.CLOSING);
strictEqual(result.code, 1000);
strictEqual(result.wasClean, true);

// The client must manually close to complete the handshake.
client.close(1000, 'client reply');
},
};
14 changes: 14 additions & 0 deletions src/workerd/api/tests/websocket-allow-half-open-test.wd-test
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "websocket-allow-half-open-test",
worker = (
modules = [
(name = "worker", esModule = embed "websocket-allow-half-open-test.js")
],
compatibilityFlags = ["nodejs_compat", "web_socket_auto_reply_to_close"]
)
),
],
);
52 changes: 37 additions & 15 deletions src/workerd/api/web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ WebSocket::WebSocket(
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
: BinaryType::ARRAYBUFFER),
serializedAttachment(kj::mv(package.serializedAttachment)),
allowHalfOpen(package.allowHalfOpen),
farNative(initNative(ioContext,
ws,
kj::mv(KJ_REQUIRE_NONNULL(package.maybeTags)),
Expand All @@ -76,6 +77,7 @@ WebSocket::WebSocket(jsg::Lock& js, kj::Own<kj::WebSocket> native)
url(kj::none),
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
: BinaryType::ARRAYBUFFER),
allowHalfOpen(!FeatureFlags::get(js).getWebSocketAutoReplyToClose()),
farNative(nullptr),
outgoingMessages(IoContext::current().addObject(kj::heap<OutgoingMessagesMap>())) {
auto nativeObj = kj::heap<Native>();
Expand All @@ -88,6 +90,7 @@ WebSocket::WebSocket(jsg::Lock& js, kj::String url)
url(kj::mv(url)),
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
: BinaryType::ARRAYBUFFER),
allowHalfOpen(!FeatureFlags::get(js).getWebSocketAutoReplyToClose()),
farNative(nullptr),
outgoingMessages(IoContext::current().addObject(kj::heap<OutgoingMessagesMap>())) {
auto nativeObj = kj::heap<Native>();
Expand Down Expand Up @@ -398,7 +401,7 @@ kj::Promise<DeferredProxy<void>> WebSocket::couple(
co_return co_await promise;
}

void WebSocket::accept(jsg::Lock& js) {
void WebSocket::accept(jsg::Lock& js, jsg::Optional<AcceptOptions> options) {
auto& native = *farNative;
JSG_REQUIRE(!native.state.is<AwaitingConnection>(), TypeError,
"Websockets obtained from the 'new WebSocket()' constructor cannot call accept");
Expand All @@ -414,6 +417,12 @@ void WebSocket::accept(jsg::Lock& js) {
return;
}

KJ_IF_SOME(opts, options) {
KJ_IF_SOME(value, opts.allowHalfOpen) {
allowHalfOpen = AllowHalfOpen(value);
}
}

internalAccept(js, IoContext::current().getCriticalSection());
}

Expand Down Expand Up @@ -605,11 +614,8 @@ void WebSocket::send(jsg::Lock& js, kj::OneOf<kj::Array<byte>, kj::String> messa
KJ_UNREACHABLE;
}();

auto pendingAutoResponses =
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();
outgoingMessages->insert(
GatedMessage{kj::mv(maybeOutputLock), kj::mv(msg), pendingAutoResponses});
GatedMessage{kj::mv(maybeOutputLock), kj::mv(msg), getPendingAutoResponseCount()});

ensurePumping(js);
}
Expand Down Expand Up @@ -680,22 +686,13 @@ void WebSocket::close(

assertNoError(js);

// pendingAutoResponses stores the number of queuedAutoResponses that will be pumped before sending
// the current GatedMessage, guaranteeing order.
// queuedAutoResponses stores the total number of auto-response messages that are already in accounted
// for in previous GatedMessages. This is useful to easily calculate the number of pendingAutoResponses
// for each new GateMessage.
auto pendingAutoResponses =
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();

outgoingMessages->insert(GatedMessage{IoContext::current().waitForOutputLocksIfNecessary(),
kj::WebSocket::Close{
// Code 1005 actually translates to sending a close message with no body on the wire.
static_cast<uint16_t>(code.orDefault(1005)),
kj::mv(reason).orDefault(jsg::USVString(kj::str())),
},
pendingAutoResponses});
getPendingAutoResponseCount()});

native.closedOutgoing = true;
closedOutgoingForHib = true;
Expand Down Expand Up @@ -990,6 +987,13 @@ kj::Promise<void> WebSocket::pump(IoContext& context,
completed = true;
}

size_t WebSocket::getPendingAutoResponseCount() {
auto count =
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();
return count;
}

void WebSocket::tryReleaseNative(jsg::Lock& js) {
// If the native WebSocket is no longer needed (the connection closed) and there are no more
// messages to send, we can discard the underlying connection.
Expand Down Expand Up @@ -1057,6 +1061,23 @@ kj::Promise<kj::Maybe<kj::Exception>> WebSocket::readLoop(
}
KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
native.closedIncoming = true;
if (!allowHalfOpen.toBool() && !native.closedOutgoing && !native.outgoingAborted &&
!native.state.is<Released>()) {
// When allowHalfOpen is false (the spec-compliant default with the
// web_socket_auto_reply_to_close compat flag), automatically send a reciprocal
// Close frame through the outgoing message pump so that readyState is CLOSED (3)
// when the close event fires. Skip if a close frame was already sent (e.g. the
// application called close() before the server sent its Close), or if the outgoing
// side is otherwise unusable.
outgoingMessages->insert(
GatedMessage{IoContext::current().waitForOutputLocksIfNecessary(),
kj::WebSocket::Close{close.code, kj::str(close.reason)},
getPendingAutoResponseCount()});

native.closedOutgoing = true;
closedOutgoingForHib = true;
ensurePumping(js);
}
dispatchEventImpl(js, js.alloc<CloseEvent>(close.code, kj::mv(close.reason), true));
// Native WebSocket no longer needed; release.
tryReleaseNative(js);
Expand Down Expand Up @@ -1202,6 +1223,7 @@ WebSocket::HibernationPackage WebSocket::buildPackageForHibernation() {
.serializedAttachment = kj::mv(serializedAttachment),
.maybeTags = kj::none,
.closedOutgoingConnection = closedOutgoingForHib,
.allowHalfOpen = allowHalfOpen,
};
}

Expand Down
Loading