Skip to content

Commit 11df51a

Browse files
committed
Fire close event for server WebSocket close, with allowHalfOpen opt-out.
1 parent 7373d52 commit 11df51a

17 files changed

+246
-29
lines changed

src/workerd/api/tests/BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,12 @@ wd_test(
514514
data = ["url-test.js"],
515515
)
516516

517+
wd_test(
518+
src = "websocket-allow-half-open-test.wd-test",
519+
args = ["--experimental"],
520+
data = ["websocket-allow-half-open-test.js"],
521+
)
522+
517523
wd_test(
518524
src = "websocket-constructor-test.wd-test",
519525
args = ["--experimental"],

src/workerd/api/tests/http-test.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ export const test = {
224224
assert.notStrictEqual(webSocket, null);
225225
// The server-side WebSocketPair socket's binaryType depends on the compat flag.
226226
const bt = new WebSocketPair()[0].binaryType;
227-
const wsStr = `WebSocket {\n readyState: 1,\n url: null,\n protocol: '',\n extensions: '',\n binaryType: '${bt}'\n }`;
227+
const aho = new WebSocketPair()[0].allowHalfOpen;
228+
const wsStr = `WebSocket {\n readyState: 1,\n url: null,\n protocol: '',\n extensions: '',\n binaryType: '${bt}',\n allowHalfOpen: ${aho}\n }`;
228229
const messagePromise = new Promise((resolve) => {
229230
webSocket.addEventListener('message', (event) => {
230231
assert.strictEqual(

src/workerd/api/tests/http-test.wd-test

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const unitTests :Workerd.Config = (
1111
( name = "SERVICE", service = "http-test" ),
1212
( name = "CACHE_ENABLED", json = "false" ),
1313
],
14-
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
14+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "no_web_socket_half_open_by_default"],
1515
)
1616
),
1717
( name = "http-test-cache-option-enabled",
@@ -23,7 +23,7 @@ const unitTests :Workerd.Config = (
2323
( name = "SERVICE", service = "http-test-cache-option-enabled" ),
2424
( name = "CACHE_ENABLED", json = "true" ),
2525
],
26-
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_enabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
26+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_enabled", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "no_web_socket_half_open_by_default"],
2727
))
2828
],
2929
);

src/workerd/api/tests/tail-worker-test.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/workerd/api/tests/tail-worker-test.wd-test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const unitTests :Workerd.Config = (
1212
( name = "SERVICE", service = "http-test" ),
1313
( name = "CACHE_ENABLED", json = "false" ),
1414
],
15-
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "queues_json_messages", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url"],
15+
compatibilityFlags = ["nodejs_compat", "service_binding_extra_handlers", "cache_option_disabled", "queues_json_messages", "url_standard", "workers_api_getters_setters_on_prototype", "fetch_legacy_url", "no_web_socket_half_open_by_default"],
1616
streamingTails = ["log", "log-invalid"],
1717
),
1818
),
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (c) 2017-2022 Cloudflare, Inc.
2+
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
3+
// https://opensource.org/licenses/Apache-2.0
4+
5+
import { strictEqual } from 'node:assert';
6+
7+
// Test that allowHalfOpen defaults to false when the compat flag is enabled.
8+
export const defaultAllowHalfOpenWithCompatFlag = {
9+
async test() {
10+
const pair = new WebSocketPair();
11+
const [client, server] = Object.values(pair);
12+
strictEqual(client.allowHalfOpen, false);
13+
strictEqual(server.allowHalfOpen, false);
14+
},
15+
};
16+
17+
// Test that the setter works.
18+
export const setterOverridesDefault = {
19+
async test() {
20+
const pair = new WebSocketPair();
21+
const [client, server] = Object.values(pair);
22+
strictEqual(client.allowHalfOpen, false);
23+
client.allowHalfOpen = true;
24+
strictEqual(client.allowHalfOpen, true);
25+
// Setting on one end doesn't affect the other.
26+
strictEqual(server.allowHalfOpen, false);
27+
},
28+
};
29+
30+
// Test that when allowHalfOpen is false, a server-initiated close sets readyState to CLOSED.
31+
export const autoCloseReplyWhenNotHalfOpen = {
32+
async test() {
33+
const pair = new WebSocketPair();
34+
const [client, server] = Object.values(pair);
35+
36+
client.accept();
37+
server.accept();
38+
39+
// Ensure allowHalfOpen is false (spec-compliant behavior).
40+
strictEqual(client.allowHalfOpen, false);
41+
42+
const closePromise = new Promise((resolve) => {
43+
client.addEventListener('close', (event) => {
44+
// When allowHalfOpen is false, the runtime auto-sends a close reply,
45+
// so both closedIncoming and closedOutgoing are true — readyState should be CLOSED.
46+
resolve({
47+
readyState: client.readyState,
48+
code: event.code,
49+
wasClean: event.wasClean,
50+
});
51+
});
52+
});
53+
54+
// Server initiates close.
55+
server.close(1000, 'server closing');
56+
57+
const result = await closePromise;
58+
strictEqual(result.readyState, WebSocket.CLOSED);
59+
strictEqual(result.code, 1000);
60+
strictEqual(result.wasClean, true);
61+
},
62+
};
63+
64+
// Test that when allowHalfOpen is true, a server-initiated close sets readyState to CLOSING.
65+
export const halfOpenCloseKeepsClosingState = {
66+
async test() {
67+
const pair = new WebSocketPair();
68+
const [client, server] = Object.values(pair);
69+
70+
client.accept();
71+
server.accept();
72+
73+
// Override to half-open mode via the setter.
74+
client.allowHalfOpen = true;
75+
76+
const closePromise = new Promise((resolve) => {
77+
client.addEventListener('close', (event) => {
78+
// When allowHalfOpen is true, no auto-reply is sent, so only closedIncoming
79+
// is true — readyState should be CLOSING.
80+
resolve({
81+
readyState: client.readyState,
82+
code: event.code,
83+
wasClean: event.wasClean,
84+
});
85+
});
86+
});
87+
88+
// Server initiates close.
89+
server.close(1000, 'server closing');
90+
91+
const result = await closePromise;
92+
strictEqual(result.readyState, WebSocket.CLOSING);
93+
strictEqual(result.code, 1000);
94+
strictEqual(result.wasClean, true);
95+
96+
// The client must manually close to complete the handshake.
97+
client.close(1000, 'client reply');
98+
},
99+
};
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "websocket-allow-half-open-test",
6+
worker = (
7+
modules = [
8+
(name = "worker", esModule = embed "websocket-allow-half-open-test.js")
9+
],
10+
compatibilityFlags = ["nodejs_compat", "no_web_socket_half_open_by_default"]
11+
)
12+
),
13+
],
14+
);

src/workerd/api/web-socket.c++

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ WebSocket::WebSocket(
5757
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
5858
: BinaryType::ARRAYBUFFER),
5959
serializedAttachment(kj::mv(package.serializedAttachment)),
60+
allowHalfOpen(package.allowHalfOpen),
6061
farNative(initNative(ioContext,
6162
ws,
6263
kj::mv(KJ_REQUIRE_NONNULL(package.maybeTags)),
@@ -76,6 +77,7 @@ WebSocket::WebSocket(jsg::Lock& js, kj::Own<kj::WebSocket> native)
7677
url(kj::none),
7778
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
7879
: BinaryType::ARRAYBUFFER),
80+
allowHalfOpen(!FeatureFlags::get(js).getWebSocketCloseReadyStateClosed()),
7981
farNative(nullptr),
8082
outgoingMessages(IoContext::current().addObject(kj::heap<OutgoingMessagesMap>())) {
8183
auto nativeObj = kj::heap<Native>();
@@ -88,6 +90,7 @@ WebSocket::WebSocket(jsg::Lock& js, kj::String url)
8890
url(kj::mv(url)),
8991
binaryType_(FeatureFlags::get(js).getWebsocketBinaryTypeDefault() ? BinaryType::BLOB
9092
: BinaryType::ARRAYBUFFER),
93+
allowHalfOpen(!FeatureFlags::get(js).getWebSocketCloseReadyStateClosed()),
9194
farNative(nullptr),
9295
outgoingMessages(IoContext::current().addObject(kj::heap<OutgoingMessagesMap>())) {
9396
auto nativeObj = kj::heap<Native>();
@@ -605,11 +608,8 @@ void WebSocket::send(jsg::Lock& js, kj::OneOf<kj::Array<byte>, kj::String> messa
605608
KJ_UNREACHABLE;
606609
}();
607610

608-
auto pendingAutoResponses =
609-
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
610-
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();
611611
outgoingMessages->insert(
612-
GatedMessage{kj::mv(maybeOutputLock), kj::mv(msg), pendingAutoResponses});
612+
GatedMessage{kj::mv(maybeOutputLock), kj::mv(msg), getPendingAutoResponseCount()});
613613

614614
ensurePumping(js);
615615
}
@@ -680,22 +680,13 @@ void WebSocket::close(
680680

681681
assertNoError(js);
682682

683-
// pendingAutoResponses stores the number of queuedAutoResponses that will be pumped before sending
684-
// the current GatedMessage, guaranteeing order.
685-
// queuedAutoResponses stores the total number of auto-response messages that are already in accounted
686-
// for in previous GatedMessages. This is useful to easily calculate the number of pendingAutoResponses
687-
// for each new GateMessage.
688-
auto pendingAutoResponses =
689-
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
690-
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();
691-
692683
outgoingMessages->insert(GatedMessage{IoContext::current().waitForOutputLocksIfNecessary(),
693684
kj::WebSocket::Close{
694685
// Code 1005 actually translates to sending a close message with no body on the wire.
695686
static_cast<uint16_t>(code.orDefault(1005)),
696687
kj::mv(reason).orDefault(jsg::USVString(kj::str())),
697688
},
698-
pendingAutoResponses});
689+
getPendingAutoResponseCount()});
699690

700691
native.closedOutgoing = true;
701692
closedOutgoingForHib = true;
@@ -716,6 +707,14 @@ int WebSocket::getReadyState() {
716707
return READY_STATE_OPEN;
717708
}
718709

710+
bool WebSocket::getAllowHalfOpen() {
711+
return allowHalfOpen.toBool();
712+
}
713+
714+
void WebSocket::setAllowHalfOpen(bool value) {
715+
allowHalfOpen = AllowHalfOpen(value);
716+
}
717+
719718
bool WebSocket::isAccepted() {
720719
return farNative->state.is<Accepted>();
721720
}
@@ -990,6 +989,13 @@ kj::Promise<void> WebSocket::pump(IoContext& context,
990989
completed = true;
991990
}
992991

992+
size_t WebSocket::getPendingAutoResponseCount() {
993+
auto count =
994+
autoResponseStatus.pendingAutoResponseDeque.size() - autoResponseStatus.queuedAutoResponses;
995+
autoResponseStatus.queuedAutoResponses = autoResponseStatus.pendingAutoResponseDeque.size();
996+
return count;
997+
}
998+
993999
void WebSocket::tryReleaseNative(jsg::Lock& js) {
9941000
// If the native WebSocket is no longer needed (the connection closed) and there are no more
9951001
// messages to send, we can discard the underlying connection.
@@ -1057,6 +1063,23 @@ kj::Promise<kj::Maybe<kj::Exception>> WebSocket::readLoop(
10571063
}
10581064
KJ_CASE_ONEOF(close, kj::WebSocket::Close) {
10591065
native.closedIncoming = true;
1066+
if (!allowHalfOpen.toBool() && !native.closedOutgoing && !native.outgoingAborted &&
1067+
!native.state.is<Released>()) {
1068+
// When allowHalfOpen is false (the spec-compliant default with the
1069+
// no_web_socket_half_open_by_default compat flag), automatically send a reciprocal
1070+
// Close frame through the outgoing message pump so that readyState is CLOSED (3)
1071+
// when the close event fires. Skip if a close frame was already sent (e.g. the
1072+
// application called close() before the server sent its Close), or if the outgoing
1073+
// side is otherwise unusable.
1074+
outgoingMessages->insert(
1075+
GatedMessage{IoContext::current().waitForOutputLocksIfNecessary(),
1076+
kj::WebSocket::Close{close.code, kj::str(close.reason)},
1077+
getPendingAutoResponseCount()});
1078+
1079+
native.closedOutgoing = true;
1080+
closedOutgoingForHib = true;
1081+
ensurePumping(js);
1082+
}
10601083
dispatchEventImpl(js, js.alloc<CloseEvent>(close.code, kj::mv(close.reason), true));
10611084
// Native WebSocket no longer needed; release.
10621085
tryReleaseNative(js);
@@ -1202,6 +1225,7 @@ WebSocket::HibernationPackage WebSocket::buildPackageForHibernation() {
12021225
.serializedAttachment = kj::mv(serializedAttachment),
12031226
.maybeTags = kj::none,
12041227
.closedOutgoingConnection = closedOutgoingForHib,
1228+
.allowHalfOpen = allowHalfOpen,
12051229
};
12061230
}
12071231

src/workerd/api/web-socket.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <workerd/io/observer.h>
1212
#include <workerd/jsg/jsg.h>
1313
#include <workerd/util/checked-queue.h>
14+
#include <workerd/util/strong-bool.h>
1415
#include <workerd/util/weak-refs.h>
1516

1617
#include <kj/compat/http.h>
@@ -88,6 +89,8 @@ class CloseEvent: public Event {
8889
bool clean;
8990
};
9091

92+
WD_STRONG_BOOL(AllowHalfOpen);
93+
9194
// The forward declaration is necessary so we can make some
9295
// WebSocket methods accessible to WebSocketPair via friend declaration.
9396
class WebSocket;
@@ -207,6 +210,9 @@ class WebSocket: public EventTarget {
207210

208211
// True forever once the JS WebSocket calls `close()`.
209212
bool closedOutgoingConnection = false;
213+
214+
// Whether the WebSocket allows half-open close state.
215+
AllowHalfOpen allowHalfOpen = AllowHalfOpen::YES;
210216
};
211217

212218
~WebSocket() noexcept(false) {
@@ -288,6 +294,11 @@ class WebSocket: public EventTarget {
288294
// JS API.
289295

290296
// Creates a new outbound WebSocket.
297+
// Note: we intentionally do not accept an options parameter here. The WPT
298+
// constructor.any.js test verifies that extra arguments beyond (url, protocols)
299+
// are silently ignored. Adding a third parameter would cause JSG to attempt
300+
// deserialization of the stray argument, breaking that test. Instead, callers
301+
// can set allowHalfOpen via the property after construction.
291302
static jsg::Ref<WebSocket> constructor(jsg::Lock& js,
292303
kj::String url,
293304
jsg::Optional<kj::OneOf<kj::Array<kj::String>, kj::String>> protocols);
@@ -328,6 +339,9 @@ class WebSocket: public EventTarget {
328339

329340
int getReadyState();
330341

342+
bool getAllowHalfOpen();
343+
void setAllowHalfOpen(bool value);
344+
331345
bool isAccepted();
332346
bool isReleased();
333347

@@ -373,12 +387,14 @@ class WebSocket: public EventTarget {
373387
JSG_READONLY_PROTOTYPE_PROPERTY(protocol, getProtocol);
374388
JSG_READONLY_PROTOTYPE_PROPERTY(extensions, getExtensions);
375389
JSG_PROTOTYPE_PROPERTY(binaryType, getBinaryType, setBinaryType);
390+
JSG_PROTOTYPE_PROPERTY(allowHalfOpen, getAllowHalfOpen, setAllowHalfOpen);
376391
} else {
377392
JSG_READONLY_INSTANCE_PROPERTY(readyState, getReadyState);
378393
JSG_READONLY_INSTANCE_PROPERTY(url, getUrl);
379394
JSG_READONLY_INSTANCE_PROPERTY(protocol, getProtocol);
380395
JSG_READONLY_INSTANCE_PROPERTY(extensions, getExtensions);
381396
JSG_INSTANCE_PROPERTY(binaryType, getBinaryType, setBinaryType);
397+
JSG_INSTANCE_PROPERTY(allowHalfOpen, getAllowHalfOpen, setAllowHalfOpen);
382398
}
383399

384400
JSG_TS_DEFINE(type WebSocketEventMap = {
@@ -390,6 +406,8 @@ class WebSocket: public EventTarget {
390406
JSG_TS_OVERRIDE(extends EventTarget<WebSocketEventMap> {
391407
get binaryType(): "blob" | "arraybuffer";
392408
set binaryType(value: "blob" | "arraybuffer");
409+
constructor(url: string, protocols?: string[] | string);
410+
allowHalfOpen: boolean;
393411
});
394412
}
395413

@@ -421,6 +439,14 @@ class WebSocket: public EventTarget {
421439
// `close()`, thereby preventing calls to `send()` even after we wake from hibernation.
422440
bool closedOutgoingForHib = false;
423441

442+
// When YES, a server-initiated close does NOT automatically send a reciprocal close frame,
443+
// leaving readyState as CLOSING (2) when the close event fires. The application is then
444+
// responsible for calling close() explicitly. When NO (spec-compliant default with the
445+
// no_web_socket_half_open_by_default compat flag), a close reply is sent automatically and
446+
// readyState is CLOSED (3) when the close event fires.
447+
// Default is YES (legacy behavior); overridden from the compat flag at construction time.
448+
AllowHalfOpen allowHalfOpen = AllowHalfOpen::YES;
449+
424450
// Maximum allowed size for WebSocket messages
425451
inline static const size_t SUGGESTED_MAX_MESSAGE_SIZE = 1u << 20;
426452

@@ -615,6 +641,11 @@ class WebSocket: public EventTarget {
615641

616642
void ensurePumping(jsg::Lock& js);
617643

644+
// Returns the number of pending auto-responses that should be sent before the next outgoing
645+
// message, and advances the queuedAutoResponses counter. Called each time a GatedMessage is
646+
// inserted into outgoingMessages to guarantee auto-response ordering.
647+
size_t getPendingAutoResponseCount();
648+
618649
// Write messages from `outgoingMessages` into `ws`.
619650
//
620651
// These are not necessarily called under isolate lock, but they are called on the given

0 commit comments

Comments
 (0)