Skip to content

Commit 3c0272b

Browse files
authored
Merge pull request #5985 from cloudflare/gv/set-egress-binding-pr
container: Implement setEgressHttp for containers
2 parents 02c8669 + 73210ab commit 3c0272b

File tree

18 files changed

+933
-56
lines changed

18 files changed

+933
-56
lines changed

build/deps/gen/deps.MODULE.bazel

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ bazel_dep(name = "brotli", version = "1.2.0")
2727
# capnp-cpp
2828
http.archive(
2929
name = "capnp-cpp",
30-
sha256 = "58a883721d220a3d8d75531a7e7ede3fd87c3d6923caf645faff0c78f8807b23",
31-
strip_prefix = "capnproto-capnproto-79b3170/c++",
30+
sha256 = "2e8519d77eb453463b1f2b1e22f40959fe560c143d78e7a51c606ce3bca30c5b",
31+
strip_prefix = "capnproto-capnproto-ac7d90a/c++",
3232
type = "tgz",
33-
url = "https://github.com/capnproto/capnproto/tarball/79b317039adad92da1204929f4047f84dfd17350",
33+
url = "https://github.com/capnproto/capnproto/tarball/ac7d90ae2e171a714be4348718fed6f26b0b85f2",
3434
)
3535
use_repo(http, "capnp-cpp")
3636

build/deps/oci.MODULE.bazel

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,12 @@ oci.pull(
1212
"linux/arm64/v8",
1313
],
1414
)
15-
use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8")
15+
oci.pull(
16+
name = "proxy_everything",
17+
image = "docker.io/cloudflare/proxy-everything:main",
18+
platforms = [
19+
"linux/amd64",
20+
"linux/arm64",
21+
],
22+
)
23+
use_repo(oci, "node_25_slim", "node_25_slim_linux_amd64", "node_25_slim_linux_arm64_v8", "proxy_everything", "proxy_everything_linux_amd64", "proxy_everything_linux_arm64")

images/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ load("@rules_multirun//:defs.bzl", "command", "multirun")
22

33
IMAGES = {
44
"container-client-test": "//images/container-client-test:load",
5+
"proxy-everything": "//images/container-client-test:load-proxy-everything",
56
}
67

78
[

images/container-client-test/BUILD.bazel

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,10 @@ oci_load(
4040
repo_tags = ["cloudflare/workerd/container-client-test:latest"],
4141
visibility = ["//visibility:public"],
4242
)
43+
44+
oci_load(
45+
name = "load-proxy-everything",
46+
image = "@proxy_everything",
47+
repo_tags = ["cloudflare/proxy-everything:main"],
48+
visibility = ["//visibility:public"],
49+
)

images/container-client-test/app.js

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
const { createServer } = require('http');
22

33
const webSocketEnabled = process.env.WS_ENABLED === 'true';
4+
const wsProxyTarget = process.env.WS_PROXY_TARGET || null;
45

5-
// Create HTTP server
66
const server = createServer(function (req, res) {
77
if (req.url === '/ws') {
8-
// WebSocket upgrade will be handled by the WebSocket server
98
return;
109
}
1110

@@ -34,6 +33,25 @@ const server = createServer(function (req, res) {
3433
res.write(`Error reading /proc/1/cmdline: ${err.message}`);
3534
res.end();
3635
}
36+
37+
return;
38+
}
39+
40+
if (req.url === '/intercept') {
41+
const targetHost = req.headers['x-host'] || '11.0.0.1';
42+
fetch(`http://${targetHost}`)
43+
.then((result) => result.text())
44+
.then((body) => {
45+
res.writeHead(200);
46+
res.write(body);
47+
res.end();
48+
})
49+
.catch((err) => {
50+
res.writeHead(500);
51+
res.write(`${targetHost} ${err.message}`);
52+
res.end();
53+
});
54+
3755
return;
3856
}
3957

@@ -42,30 +60,30 @@ const server = createServer(function (req, res) {
4260
res.end();
4361
});
4462

45-
// Check if WebSocket functionality is enabled
4663
if (webSocketEnabled) {
4764
const WebSocket = require('ws');
65+
const wss = new WebSocket.Server({ server, path: '/ws' });
4866

49-
// Create WebSocket server
50-
const wss = new WebSocket.Server({
51-
server: server,
52-
path: '/ws',
53-
});
54-
55-
wss.on('connection', function connection(ws) {
56-
console.log('WebSocket connection established');
67+
wss.on('connection', function (clientWs) {
68+
if (wsProxyTarget) {
69+
const targetWs = new WebSocket(`ws://${wsProxyTarget}/ws`);
70+
const ready = new Promise(function (resolve) {
71+
targetWs.on('open', resolve);
72+
});
5773

58-
ws.on('message', function message(data) {
59-
console.log('Received:', data.toString());
60-
// Echo the message back with prefix
61-
ws.send('Echo: ' + data.toString());
62-
});
74+
targetWs.on('message', (data) => clientWs.send(data));
75+
clientWs.on('message', async function (data) {
76+
await ready;
77+
targetWs.send(data);
78+
});
6379

64-
ws.on('close', function close() {
65-
console.log('WebSocket connection closed');
66-
});
67-
68-
ws.on('error', console.error);
80+
clientWs.on('close', targetWs.close);
81+
targetWs.on('close', clientWs.close);
82+
} else {
83+
clientWs.on('message', function (data) {
84+
clientWs.send('Echo: ' + data.toString());
85+
});
86+
}
6987
});
7088
}
7189

src/workerd/api/container.c++

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,39 @@ jsg::Promise<void> Container::setInactivityTimeout(jsg::Lock& js, int64_t durati
7373
return IoContext::current().awaitIo(js, req.sendIgnoringResult());
7474
}
7575

76+
jsg::Promise<void> Container::interceptOutboundHttp(
77+
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
78+
auto& ioctx = IoContext::current();
79+
auto channel = binding->getSubrequestChannel(ioctx);
80+
81+
// Get a channel token for RPC usage, the container runtime can use this
82+
// token later to redeem a Fetcher.
83+
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
84+
85+
auto req = rpcClient->setEgressHttpRequest();
86+
req.setHostPort(addr);
87+
req.setChannelToken(token);
88+
return ioctx.awaitIo(js, req.sendIgnoringResult());
89+
}
90+
91+
jsg::Promise<void> Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding) {
92+
auto& ioctx = IoContext::current();
93+
auto channel = binding->getSubrequestChannel(ioctx);
94+
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
95+
96+
// Register for all IPv4 and IPv6 addresses (on port 80)
97+
auto reqV4 = rpcClient->setEgressHttpRequest();
98+
reqV4.setHostPort("0.0.0.0/0"_kj);
99+
reqV4.setChannelToken(token);
100+
101+
auto reqV6 = rpcClient->setEgressHttpRequest();
102+
reqV6.setHostPort("::/0"_kj);
103+
reqV6.setChannelToken(token);
104+
105+
return ioctx.awaitIo(js,
106+
kj::joinPromisesFailFast(kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult())));
107+
}
108+
76109
jsg::Promise<void> Container::monitor(jsg::Lock& js) {
77110
JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running.");
78111

src/workerd/api/container.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class Container: public jsg::Object {
6262
void signal(jsg::Lock& js, int signo);
6363
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);
6464
jsg::Promise<void> setInactivityTimeout(jsg::Lock& js, int64_t durationMs);
65+
jsg::Promise<void> interceptOutboundHttp(
66+
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding);
67+
jsg::Promise<void> interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding);
6568

6669
// TODO(containers): listenTcp()
6770

@@ -73,6 +76,11 @@ class Container: public jsg::Object {
7376
JSG_METHOD(signal);
7477
JSG_METHOD(getTcpPort);
7578
JSG_METHOD(setInactivityTimeout);
79+
80+
if (flags.getWorkerdExperimental()) {
81+
JSG_METHOD(interceptOutboundHttp);
82+
JSG_METHOD(interceptAllOutboundHttp);
83+
}
7684
}
7785

7886
void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {

src/workerd/io/container.capnp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,13 @@ interface Container @0x9aaceefc06523bca {
114114
# Note that if there is an open connection to the container, the runtime must not shutdown the container.
115115
# If there is no activity timeout duration configured and no container connection, it's up to the runtime
116116
# to decide when to signal the container to exit.
117+
118+
setEgressHttp @8 (hostPort :Text, channelToken :Data);
119+
# Configures egress HTTP routing for the container. When the container attempts to connect to the
120+
# specified host:port, the connection should be routed back to the Workers runtime using the channel token.
121+
# The format of hostPort can be '<ip|cidr>[':'<port>]'. If port is omitted, it's assumed to only cover port 80.
122+
# This method does not support HTTPs yet.
123+
124+
125+
# TODO: setEgressTcp
117126
}

src/workerd/server/BUILD.bazel

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,19 +204,29 @@ wd_cc_library(
204204
}),
205205
)
206206

207+
wd_cc_library(
208+
name = "channel-token",
209+
srcs = ["channel-token.c++"],
210+
hdrs = ["channel-token.h"],
211+
deps = [
212+
":channel-token_capnp",
213+
"//src/workerd/io",
214+
"//src/workerd/util:entropy",
215+
],
216+
)
217+
207218
wd_cc_library(
208219
name = "server",
209220
srcs = [
210-
"channel-token.c++",
211221
"server.c++",
212222
],
213223
hdrs = [
214-
"channel-token.h",
215224
"server.h",
216225
],
217226
deps = [
218227
":actor-id-impl",
219228
":alarm-scheduler",
229+
":channel-token",
220230
":channel-token_capnp",
221231
":container-client",
222232
":facet-tree-index",
@@ -268,7 +278,9 @@ wd_cc_library(
268278
hdrs = ["container-client.h"],
269279
visibility = ["//visibility:public"],
270280
deps = [
281+
":channel-token",
271282
":docker-api_capnp",
283+
"//src/workerd/io",
272284
"//src/workerd/io:container_capnp",
273285
"//src/workerd/jsg",
274286
"@capnp-cpp//src/capnp/compat:http-over-capnp",

0 commit comments

Comments
 (0)