Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ rules_foreign_cc_dependencies()

http_archive(
name = "capnp-cpp",
sha256 = "4a642173569caf4869150d6ec08e40644158b5148f485979bbf15244c4f09df2",
strip_prefix = "capnproto-capnproto-6a1dcb8/c++",
sha256 = "70129ae82560062a09b21646e29800d399ff607af694dcc4efd1a48174853f8e",
strip_prefix = "capnproto-capnproto-035e223/c++",
type = "tgz",
urls = ["https://github.com/capnproto/capnproto/tarball/6a1dcb8e4b2864b95e4be43ed6314f5334d457fa"],
urls = ["https://github.com/capnproto/capnproto/tarball/035e2233c986526de862c27c567ba2d92c8159a3"],
)

http_archive(
Expand Down
101 changes: 93 additions & 8 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,45 @@ public:
recvHttp200(expectedResponse, loc);
}

bool isEof() {
// Return true if the stream is at EOF.

if (premature != nullptr) {
// We still have unread data so we're definitely not at EOF.
return false;
}

char c;
auto promise = stream->tryRead(&c, 1, 1);
if (!promise.poll(ws)) {
// Read didn't complete immediately. We have no data available, but we're not at EOF.
return false;
}

size_t n = promise.wait(ws);
if (n == 0) {
return true;
} else {
// Oops, the stream had data available and we accidentally read a byte of it. Store that off
// to the side.
KJ_ASSERT(n == 1);
premature = c;
return false;
}
}

private:
kj::WaitScope& ws;
kj::Own<kj::AsyncIoStream> stream;

kj::Maybe<char> premature;
// isEof() may prematurely read a character. Keep it off to the side for the next actual read.

kj::String readAllAvailable() {
kj::Vector<char> buffer(256);
KJ_IF_MAYBE(p, premature) {
buffer.add(*p);
}

// Continuously try to read until there's nothing to read (or we've gone way past the size
// expected).
Expand Down Expand Up @@ -190,11 +223,11 @@ public:
}
}

void start() {
void start(kj::Promise<void> drainWhen = kj::NEVER_DONE) {
// Start the server. Call before connect().

KJ_REQUIRE(runTask == nullptr);
auto task = server.run(v8System, *config)
auto task = server.run(v8System, *config, kj::mv(drainWhen))
.eagerlyEvaluate([](kj::Exception&& e) {
KJ_FAIL_EXPECT(e);
});
Expand Down Expand Up @@ -1524,6 +1557,58 @@ KJ_TEST("Server: inject headers on incoming request/response") {
)"_blockquote);
}

KJ_TEST("Server: drain incoming HTTP connections") {
TestServer test(singleWorker(R"((
compatibilityDate = "2022-08-17",
serviceWorkerScript =
`addEventListener("fetch", event => {
` event.respondWith(new Response("hello"));
`})
))"_kj));

auto paf = kj::newPromiseAndFulfiller<void>();

test.start(kj::mv(paf.promise));

auto conn = test.connect("test-addr");
auto conn2 = test.connect("test-addr");

// Send a request on each connection, get a response.
conn.httpGet200("/", "hello");
conn2.httpGet200("/", "hello");

// Send a partial request on conn2.
conn2.send("GET");

// No EOF yet.
KJ_EXPECT(!conn.isEof());
KJ_EXPECT(!conn2.isEof());

// Drain the server.
paf.fulfiller->fulfill();

// Now we get EOF on conn.
KJ_EXPECT(conn.isEof());

// But conn2 is still open.
KJ_EXPECT(!conn2.isEof());

// Finish the request on conn2.
conn2.send(" / HTTP/1.1\nHost: foo\n\n");

// We receive a response with Connection: close
conn2.recv(R"(
HTTP/1.1 200 OK
Connection: close
Content-Length: 5
Content-Type: text/plain;charset=UTF-8

hello)"_blockquote);

// And then the connection is, in fact, closed.
KJ_EXPECT(conn2.isEof());
}

// =======================================================================================
// Test alternate service types
//
Expand Down Expand Up @@ -2103,8 +2188,10 @@ KJ_TEST("Server: disk service allow dotfiles") {
KJ_EXPECT(test.root->openFile(kj::Path({"secret"}))->readAllText() == "this is super-secret");
}

// =======================================================================================
// Test Cache API

KJ_TEST("Cache: If no cache service is defined, access to the cache API should error") {
KJ_TEST("Server: If no cache service is defined, access to the cache API should error") {
TestServer test(singleWorker(R"((
compatibilityDate = "2022-08-17",
modules = [
Expand All @@ -2130,8 +2217,7 @@ KJ_TEST("Cache: If no cache service is defined, access to the cache API should e

}


KJ_TEST("Cache: cached response") {
KJ_TEST("Server: cached response") {
TestServer test(R"((
services = [
( name = "hello",
Expand Down Expand Up @@ -2191,8 +2277,7 @@ KJ_TEST("Cache: cached response") {

}


KJ_TEST("Cache: cache name is passed through to service") {
KJ_TEST("Server: cache name is passed through to service") {
TestServer test(R"((
services = [
( name = "hello",
Expand Down Expand Up @@ -2250,8 +2335,8 @@ KJ_TEST("Cache: cache name is passed through to service") {
CF-Cache-Status: HIT

cached)"_blockquote);

}

// =======================================================================================

// TODO(beta): Test TLS (send and receive)
Expand Down
76 changes: 53 additions & 23 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,7 @@ kj::Own<Server::Service> Server::makeDiskDirectoryService(
}

// =======================================================================================

class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
// Implements the interface for the devtools inspector protocol.
//
Expand Down Expand Up @@ -997,7 +998,7 @@ public:
return kj::READY_NOW;
} else {
// If it's any other kind of error, propagate it!
throw ex;
kj::throwFatalException(kj::mv(ex));
}
});
} else {
Expand Down Expand Up @@ -1081,6 +1082,18 @@ public:
}

kj::Promise<void> listen(kj::Own<kj::ConnectionReceiver> listener) {
// Note that we intentionally do not make inspector connections be part of the usual drain()
// procedure. Inspector connections are always long-lived WebSockets, and we do not want the
// existence of such a connection to hold the server open. We do, however, want the connection
// to stay open until all other requests are drained, for debugging purposes.
//
// Thus:
// * We let connection loop tasks live on `HttpServer`'s own `TaskSet`, rather than our
// server's main `TaskSet` which we wait to become empty on drain.
// * We do not add this `HttpServer` to the server's `httpServers` list, so it will not receive
// drain() requests. (However, our caller does cancel listening on the server port as soon
// as we begin draining, since we may want new connections to go to a new instance of the
// server.)
return server.listenHttp(*listener).attach(kj::mv(listener));
}

Expand Down Expand Up @@ -1967,16 +1980,15 @@ Server::Service& Server::lookupService(

// =======================================================================================

class Server::HttpListener final: private kj::TaskSet::ErrorHandler {
class Server::HttpListener final: public kj::Refcounted {
public:
HttpListener(kj::Own<kj::ConnectionReceiver> listener, Service& service,
kj::StringPtr physicalProtocol, kj::Own<HttpRewriter> rewriter,
HttpListener(Server& owner, kj::Own<kj::ConnectionReceiver> listener, Service& service,
kj::StringPtr physicalProtocol, kj::Own<HttpRewriter> rewriter,
kj::HttpHeaderTable& headerTable, kj::Timer& timer)
: listener(kj::mv(listener)), service(service),
: owner(owner), listener(kj::mv(listener)), service(service),
headerTable(headerTable), timer(timer),
physicalProtocol(physicalProtocol),
rewriter(kj::mv(rewriter)),
tasks(*this) {}
rewriter(kj::mv(rewriter)) {}

kj::Promise<void> run() {
return listener->acceptAuthenticated()
Expand Down Expand Up @@ -2018,36 +2030,44 @@ public:
}

auto conn = kj::heap<Connection>(*this, kj::mv(cfBlobJson));
tasks.add(conn->http.listenHttp(kj::mv(stream.stream)).attach(kj::mv(conn)));

auto promise = kj::evalNow([&]() {
return conn->listedHttp.httpServer.listenHttp(kj::mv(stream.stream))
.attach(kj::mv(conn))
.attach(kj::addRef(*this)); // two attach()es because `this` must outlive `conn`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought .attach(a, b) is always equivalent to .attach(a).attach(b). But, from looking at the code it looks like that's only true for kj::Own<T>::attach(), and with kj::Promise<T>::attach() the destruction order is reversed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unfortunately, I didn't think hard enough when creating kj::Promise::attach() and it's probably too late to safely reverse the ordering. To avoid any confusion I use multiple attaches when the ordering matters.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I suppose we could deprecate both attach()es in favor of a fixed attach2() for a couple capnproto releases, then deprecate-and-rename back to attach().

});

// Run the connection handler loop in the global task set, so that run() waits for open
// connections to finish before returning, even if the listener loop is canceled. However,
// do not consider exceptions from a specific connection to be fatal.
owner.tasks.add(promise.catch_([](kj::Exception&& exception) {
KJ_LOG(ERROR, exception);
}));

return run();
});
}

private:
Server& owner;
kj::Own<kj::ConnectionReceiver> listener;
Service& service;
kj::HttpHeaderTable& headerTable;
kj::Timer& timer;
kj::StringPtr physicalProtocol;
kj::Own<HttpRewriter> rewriter;
kj::TaskSet tasks;

void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}

struct Connection final: public kj::HttpService, public kj::HttpServerErrorHandler {
Connection(HttpListener& parent, kj::Maybe<kj::String> cfBlobJson)
: parent(parent), cfBlobJson(kj::mv(cfBlobJson)),
http(parent.timer, parent.headerTable, *this, kj::HttpServerSettings {
listedHttp(parent.owner, parent.timer, parent.headerTable, *this, kj::HttpServerSettings {
.errorHandler = *this,
.webSocketCompressionMode = kj::HttpServerSettings::MANUAL_COMPRESSION
}) {}

HttpListener& parent;
kj::Maybe<kj::String> cfBlobJson;
kj::HttpServer http;
ListedHttpServer listedHttp;

class ResponseWrapper final: public kj::HttpService::Response {
public:
Expand Down Expand Up @@ -2122,29 +2142,39 @@ private:
kj::Promise<void> Server::listenHttp(
kj::Own<kj::ConnectionReceiver> listener, Service& service,
kj::StringPtr physicalProtocol, kj::Own<HttpRewriter> rewriter) {
auto obj = kj::heap<HttpListener>(kj::mv(listener), service,
physicalProtocol, kj::mv(rewriter),
globalContext->headerTable, timer);
auto obj = kj::refcounted<HttpListener>(*this, kj::mv(listener), service,
physicalProtocol, kj::mv(rewriter),
globalContext->headerTable, timer);
return obj->run().attach(kj::mv(obj));
}

// =======================================================================================

kj::Promise<void> Server::run(jsg::V8System& v8System, config::Config::Reader config) {
kj::Promise<void> Server::run(jsg::V8System& v8System, config::Config::Reader config,
kj::Promise<void> drainWhen) {
kj::HttpHeaderTable::Builder headerTableBuilder;
globalContext = kj::heap<GlobalContext>(*this, v8System, headerTableBuilder);
invalidConfigServiceSingleton = kj::heap<InvalidConfigService>();

auto [ fatalPromise, fatalFulfiller ] = kj::newPromiseAndFulfiller<void>();
this->fatalFulfiller = kj::mv(fatalFulfiller);

auto forkedDrainWhen = drainWhen
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the behavior correct if drainWhen is rejected for some reason? I guess that would just let the server keep running.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah I suppose you could argue that drain() should be called on HttpServers in the failure case too.

However, in practice, if drainWhen fails, then by virtue of it being exclusiveJoin()ed with the listen tasks, those tasks will fail, causing Server::taskFailed() to be invoked, which fulfills the "fatal fulfiller", which in turn causes Server::run() itself to fail with the given error. So the effect is, the drain is skipped and the server shuts down immediately.

I think that works out just fine so I think I'm OK with the code as it is.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me, too.

.then([this]() mutable {
// Tell all HttpServers to drain. This causes them to disconnect any connections that don't
// have a request in-flight.
for (auto& httpServer: httpServers) {
httpServer.httpServer.drain();
}
}).fork();

// ---------------------------------------------------------------------------
// Configure inspector.

KJ_IF_MAYBE(inspector, inspectorOverride) {
// Create the special inspector service.
maybeInspectorService = makeInspectorService(headerTableBuilder);
auto& inspectorService = *KJ_ASSERT_NONNULL(maybeInspectorService);
auto& inspectorService = *maybeInspectorService.emplace(
makeInspectorService(headerTableBuilder));

// Configure and start the inspector socket.
static constexpr uint DEFAULT_PORT = 9229;
Expand All @@ -2158,7 +2188,7 @@ kj::Promise<void> Server::run(jsg::V8System& v8System, config::Config::Reader co
[&inspectorService](kj::Own<kj::ConnectionReceiver> listener) mutable {
KJ_LOG(INFO, "Inspector is listening");
return inspectorService.listen(kj::mv(listener));
}));
}).exclusiveJoin(forkedDrainWhen.addBranch()));
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -2339,7 +2369,7 @@ kj::Promise<void> Server::run(jsg::V8System& v8System, config::Config::Reader co
.then([this, &service, rewriter = kj::mv(rewriter), physicalProtocol]
(kj::Own<kj::ConnectionReceiver> listener) mutable {
return listenHttp(kj::mv(listener), service, physicalProtocol, kj::mv(rewriter));
}));
}).exclusiveJoin(forkedDrainWhen.addBranch()));
}

for (auto& unmatched: socketOverrides) {
Expand Down
23 changes: 22 additions & 1 deletion src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class Server: private kj::TaskSet::ErrorHandler {
inspectorOverride = kj::mv(addr);
}

kj::Promise<void> run(jsg::V8System& v8System, config::Config::Reader conf);
kj::Promise<void> run(jsg::V8System& v8System, config::Config::Reader conf,
kj::Promise<void> drainWhen = kj::NEVER_DONE);
// Runs the server using the given config.

private:
Expand Down Expand Up @@ -96,6 +97,26 @@ class Server: private kj::TaskSet::ErrorHandler {

kj::Own<kj::PromiseFulfiller<void>> fatalFulfiller;

struct ListedHttpServer {
// An HttpServer object maintained in a linked list.

Server& owner;
kj::HttpServer httpServer;
kj::ListLink<ListedHttpServer> link;

template <typename... Params>
ListedHttpServer(Server& owner, Params&&... params)
: owner(owner), httpServer(kj::fwd<Params>(params)...) {
owner.httpServers.add(*this);
};
~ListedHttpServer() noexcept(false) {
owner.httpServers.remove(*this);
}
};

kj::List<ListedHttpServer, &ListedHttpServer::link> httpServers;
// All active HttpServer objects -- used to implement drain().

kj::TaskSet tasks;
// Especially includes server loop tasks to listen on socokets. Any error is considered fatal.

Expand Down
Loading