Skip to content

Commit 7967ac1

Browse files
authored
Merge pull request #864 from cloudflare/jsnell/coroutines-conversion-blobinputstream
2 parents 66e857b + 1160fb2 commit 7967ac1

File tree

2 files changed

+34
-13
lines changed

2 files changed

+34
-13
lines changed

src/workerd/api/blob.c++

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,23 @@ public:
144144
blob(kj::mv(blob)) {}
145145

146146
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
147+
// Attempt to read a maximum of maxBytes from the remaining unread content of the blob
148+
// into the given buffer. It is the caller's responsibility to ensure that buffer has
149+
// enough capacity for at least maxBytes.
150+
// The minBytes argument is ignored in this implementation of tryRead.
151+
// The buffer must be kept alive by the caller until the returned promise is fulfilled.
152+
// The returned promise is fulfilled with the actual number of bytes read.
147153
size_t amount = kj::min(maxBytes, unread.size());
148-
memcpy(buffer, unread.begin(), amount);
149-
unread = unread.slice(amount, unread.size());
154+
if (amount > 0) {
155+
memcpy(buffer, unread.begin(), amount);
156+
unread = unread.slice(amount, unread.size());
157+
}
150158
return amount;
151159
}
152160

153161
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override {
162+
// Returns the number of bytes remaining to be read for the given encoding if that
163+
// encoding is supported. This implementation only supports StreamEncoding::IDENTITY.
154164
if (encoding == StreamEncoding::IDENTITY) {
155165
return unread.size();
156166
} else {
@@ -159,20 +169,22 @@ public:
159169
}
160170

161171
kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
162-
if (unread.size() == 0) {
163-
return addNoopDeferredProxy(kj::READY_NOW);
164-
}
172+
// Write all of the remaining unread content of the blob to output.
173+
// If end is true, output.end() will be called once the write has been completed.
174+
// Importantly, the WritableStreamSink must be kept alive by the caller until the
175+
// returned promise is fulfilled.
176+
if (unread.size() != 0) {
177+
auto promise = output.write(unread.begin(), unread.size());
178+
unread = nullptr;
165179

166-
auto promise = output.write(unread.begin(), unread.size());
167-
unread = nullptr;
180+
co_await promise;
168181

169-
if (end) {
170-
promise = promise.then([&output]() { return output.end(); });
182+
if (end) co_await output.end();
171183
}
172184

173185
// We can't defer the write to the proxy stage since it depends on `blob` which lives in the
174186
// isolate.
175-
return addNoopDeferredProxy(kj::mv(promise));
187+
co_return newNoopDeferredProxy();
176188
}
177189

178190
private:

src/workerd/api/util.h

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,24 @@ struct DeferredProxy {
132132
kj::Promise<T> proxyTask;
133133
};
134134

135+
inline DeferredProxy<void> newNoopDeferredProxy() {
136+
return DeferredProxy<void> { kj::READY_NOW };
137+
}
138+
139+
template <typename T>
140+
inline DeferredProxy<T> newNoopDeferredProxy(T&& value) {
141+
return DeferredProxy<T> { kj::mv(value) };
142+
}
143+
135144
template <typename T>
136145
inline kj::Promise<DeferredProxy<T>> addNoopDeferredProxy(kj::Promise<T> promise) {
137146
// Helper method to use when you need to return `Promise<DeferredProxy<T>>` but no part of the
138147
// operation you are returning is eligible to be deferred past the IoContext lifetime.
139-
140-
return promise.then([](T&& value) { return DeferredProxy<T> { kj::mv(value) }; });
148+
co_return newNoopDeferredProxy(co_await promise);
141149
}
142150
inline kj::Promise<DeferredProxy<void>> addNoopDeferredProxy(kj::Promise<void> promise) {
143-
return promise.then([]() { return DeferredProxy<void> { kj::READY_NOW }; });
151+
co_await promise;
152+
co_return newNoopDeferredProxy();
144153
}
145154

146155
kj::Maybe<jsg::V8Ref<v8::Object>> cloneRequestCf(

0 commit comments

Comments
 (0)