Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 21 additions & 70 deletions src/workerd/io/actor-cache-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,9 @@ KJ_TEST("ActorCache single-key basics") {
{
KJ_ASSERT(expectCached(test.delete_("foo")));

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["foo"]))
.thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

{
Expand Down Expand Up @@ -362,12 +359,9 @@ KJ_TEST("ActorCache multi-key basics") {
{
KJ_ASSERT(expectCached(test.delete_({"foo"_kj, "bar"_kj, "baz"_kj, "qux"_kj})) == 2);

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["foo", "bar"]))
.thenReturn(CAPNP(numDeleted = 2));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

{
Expand Down Expand Up @@ -430,18 +424,14 @@ KJ_TEST("ActorCache more deletes") {
// Value is immediately in cache.
KJ_ASSERT(expectCached(test.get("foo")) == nullptr);

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");

KJ_ASSERT(!promise.poll(ws));
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["foo"]))
.thenReturn(CAPNP(numDeleted = 1));
auto mockDelete = mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["foo"]));

// Still in cache during flush.
KJ_ASSERT(!promise.poll(ws));
KJ_ASSERT(expectCached(test.get("foo")) == nullptr);

mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
kj::mv(mockDelete).thenReturn(CAPNP(numDeleted = 1));

// Delete call returned true due to numDeleted = 1.
KJ_ASSERT(promise.wait(ws));
Expand All @@ -454,12 +444,9 @@ KJ_TEST("ActorCache more deletes") {
{
auto promise = expectUncached(test.delete_("bar"));

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["bar"]))
.thenReturn(CAPNP(numDeleted = 0));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);

// Delete call returned false due to numDeleted = 0.
KJ_ASSERT(!promise.wait(ws));
Expand Down Expand Up @@ -488,12 +475,9 @@ KJ_TEST("ActorCache more deletes") {
{
KJ_ASSERT(expectCached(test.delete_("foo")));

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["foo"]))
.thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

KJ_ASSERT(expectCached(test.get("foo")) == nullptr);
Expand Down Expand Up @@ -2375,12 +2359,9 @@ KJ_TEST("ActorCache list() delete endpoint") {

// Acknowledge the delete transaction.
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["qux"]))
.thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

KJ_ASSERT(deletePromise.wait(ws) == 1);
Expand Down Expand Up @@ -2427,12 +2408,9 @@ KJ_TEST("ActorCache list() delete endpoint empty range") {

// Acknowledge the delete transaction.
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["qux"]))
.thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

KJ_ASSERT(deletePromise.wait(ws) == 1);
Expand Down Expand Up @@ -2502,12 +2480,9 @@ KJ_TEST("ActorCache list() interleave streaming with other ops") {
.thenReturn(CAPNP());
}
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["garply"]))
.thenReturn(CAPNP());
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}
}

Expand All @@ -2519,11 +2494,8 @@ KJ_TEST("ActorCache list() end of first block deleted at inopportune time") {
// Do a delete, wait for the commit... and then hold it open.
auto deletePromise = expectUncached(test.delete_("corge"));

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge"]))
.thenReturn(CAPNP());
auto commitCall = mockTxn->expectCall("commit", ws);
auto mockDelete = mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge"]));

// Now do a list.
auto promise = expectUncached(test.list("bar", "qux"));
Expand All @@ -2538,8 +2510,7 @@ KJ_TEST("ActorCache list() end of first block deleted at inopportune time") {

// Let the delete finish. So now the last key in the first block is cached as a negative
// clean entry.
kj::mv(commitCall).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
kj::mv(mockDelete).thenReturn(CAPNP());

// Continue on.
stream.call("end", CAPNP()).expectReturns(CAPNP(), ws);
Expand Down Expand Up @@ -3380,12 +3351,9 @@ KJ_TEST("ActorCache listReverse() delete endpoint") {

// Acknowledge the delete transaction.
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge"]))
.thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}

{
Expand Down Expand Up @@ -3451,12 +3419,9 @@ KJ_TEST("ActorCache listReverse() interleave streaming with other ops") {
.thenReturn(CAPNP());
}
{
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["bar"]))
.thenReturn(CAPNP());
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}
}

Expand All @@ -3468,11 +3433,8 @@ KJ_TEST("ActorCache listReverse() end of first block deleted at inopportune time
// Do a delete, wait for the commit... and then hold it open.
auto deletePromise = expectUncached(test.delete_("corge"));

auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
mockTxn->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge"]))
.thenReturn(CAPNP());
auto commitCall = mockTxn->expectCall("commit", ws);
auto mockDelete = mockStorage->expectCall("delete", ws)
.withParams(CAPNP(keys = ["corge"]));

// Now do a list.
auto promise = expectUncached(test.listReverse("bar", "qux"));
Expand All @@ -3487,8 +3449,7 @@ KJ_TEST("ActorCache listReverse() end of first block deleted at inopportune time

// Let the delete finish. So now the last key in the first block is cached as a negative
// clean entry.
kj::mv(commitCall).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
kj::mv(mockDelete).thenReturn(CAPNP());

// Continue on.
stream.call("end", CAPNP()).expectReturns(CAPNP(), ws);
Expand Down Expand Up @@ -4969,16 +4930,11 @@ KJ_TEST("ActorCache can wait for flush") {
test.delete_("foo");

verify([&](){
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
return InFlightRequest {
.op = mockTxn->expectCall("delete", ws).withParams(CAPNP(keys = ["foo"])),
.maybeTxn = kj::mv(mockTxn),
.op = mockStorage->expectCall("delete", ws).withParams(CAPNP(keys = ["foo"])),
};
}, [&](auto req) {
auto& mockTxn = KJ_ASSERT_NONNULL(req.maybeTxn);
kj::mv(req.op).thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}, {
.skipSecondOperation = false,
});
Expand All @@ -5005,16 +4961,11 @@ KJ_TEST("ActorCache can wait for flush") {
test.delete_("foo", ActorCacheWriteOptions{.allowUnconfirmed = true});

verify([&](){
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
return InFlightRequest {
.op = mockTxn->expectCall("delete", ws).withParams(CAPNP(keys = ["foo"])),
.maybeTxn = kj::mv(mockTxn),
.op = mockStorage->expectCall("delete", ws).withParams(CAPNP(keys = ["foo"])),
};
}, [&](auto req) {
auto& mockTxn = KJ_ASSERT_NONNULL(req.maybeTxn);
kj::mv(req.op).thenReturn(CAPNP(numDeleted = 1));
mockTxn->expectCall("commit", ws).thenReturn(CAPNP());
mockTxn->expectDropped(ws);
}, {
.skipSecondOperation = false,
});
Expand Down
109 changes: 90 additions & 19 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2401,33 +2401,42 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {
countedDeleteFlushes.releaseAsArray(), kj::mv(maybeAlarmChange));
};

if (mutedDeleteFlush.batches.size() > 0 || countedDeleteFlushes.size() > 0) {
// We have deletes, we have to use a transaction.
useTransactionToFlush();
} else if (putFlush.batches.size() > 1) {
// We have more than a single batch of puts, we have to use a transaction.
useTransactionToFlush();
} else if (putFlush.batches.size() == 1) {
// We have a single batch of puts, can we use an optimization?
uint typesOfDataToFlush = 0;
if (putFlush.batches.size() > 0) { ++typesOfDataToFlush; }
if (mutedDeleteFlush.batches.size() > 0) { ++typesOfDataToFlush; }
if (countedDeleteFlushes.size() > 0) { ++typesOfDataToFlush; }
if (maybeAlarmChange.is<DirtyAlarm>()) { ++typesOfDataToFlush; }

if (maybeAlarmChange.is<CleanAlarm>()) {
// As an optimization for the common case where there are only puts and they all fit in a
// single batch, just send a simple put rather than complicating things with a transaction.
flushProm = flushImplUsingSinglePut(kj::mv(putFlush));
} else {
// We have an alarm to go along with our puts, we have to use a transaction.
useTransactionToFlush();
}
} else if (deleteAllUpcoming) {
if (deleteAllUpcoming && KJ_ASSERT_NONNULL(requestedDeleteAll).deletedDirty.empty()) {
// There were no dirty entries before deleteAll() was called, so we can move on to invoking
// deleteAll() itself.
// NOTE: We put this as the first check to maintain legacy behavior even if there isn't a
// particularly compelling reason to do this before checking whether the alarm is dirty.
return flushImplDeleteAll();
} else if (typesOfDataToFlush == 0) {
// Oh, nothing to do.
return kj::READY_NOW;
} else if (typesOfDataToFlush > 1) {
// We have multiple types of operations, so we have to use a transaction.
useTransactionToFlush();
} else if (maybeAlarmChange.is<DirtyAlarm>()) {
// We only had an alarm, we can skip the transaction.
flushProm = flushImplAlarmOnly(maybeAlarmChange.get<DirtyAlarm>());
} else if (putFlush.batches.size() == 1) {
// As an optimization for the common case where there are only puts and they all fit in a
// single batch, just send a simple put rather than complicating things with a transaction.
flushProm = flushImplUsingSinglePut(kj::mv(putFlush));
} else if (mutedDeleteFlush.batches.size() == 1) {
// Same as for puts, but for muted deletes.
flushProm = flushImplUsingSingleMutedDelete(kj::mv(mutedDeleteFlush));
} else if (countedDeleteFlushes.size() == 1 && countedDeleteFlushes[0].batches.size() == 1) {
// Same as for puts, but for muted deletes.
flushProm = flushImplUsingSingleCountedDelete(kj::mv(countedDeleteFlushes[0]));
countedDeleteFlushes.clear();
} else {
// Oh, nothing to do.
return kj::READY_NOW;
// None of the special cases above triggered. Default to using a transaction in all other cases,
// such as when there are so many keys to be flushed that they don't fit into a single batch.
useTransactionToFlush();
}

return oomCanceler.wrap(kj::mv(flushProm)).then([this, deleteAllUpcoming]() -> kj::Promise<void> {
Expand Down Expand Up @@ -2582,6 +2591,68 @@ kj::Promise<void> ActorCache::flushImplUsingSinglePut(PutFlush putFlush) {
co_await request.send().ignoreResult();
}

kj::Promise<void> ActorCache::flushImplUsingSingleMutedDelete(MutedDeleteFlush mutedFlush) {
KJ_ASSERT(mutedFlush.batches.size() == 1);
auto& batch = mutedFlush.batches[0];

KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS);
KJ_ASSERT(batch.pairCount == mutedFlush.entries.size());

auto request = storage.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 });
auto listBuilder = request.initKeys(batch.pairCount);
auto entryIt = mutedFlush.entries.begin();
for (size_t i = 0; i < batch.pairCount; ++i) {
auto& entry = **(entryIt++);
listBuilder.set(i, entry.key.asBytes());
}

// We're done with the batching instructions, free them before we go async.
mutedFlush.entries.clear();
mutedFlush.batches.clear();

// See the comment in flushImplUsingTxn for why we need to construct our RPC and then wait on
// reads before actually sending the write. The same exact logic applies here.
co_await waitForPastReads();
co_await request.send().ignoreResult();
}

kj::Promise<void> ActorCache::flushImplUsingSingleCountedDelete(CountedDeleteFlush countedFlush) {
KJ_ASSERT(countedFlush.batches.size() == 1);
auto& batch = countedFlush.batches[0];

KJ_ASSERT(batch.wordCount < MAX_ACTOR_STORAGE_RPC_WORDS);
KJ_ASSERT(batch.pairCount == countedFlush.entries.size());

auto request = storage.deleteRequest(capnp::MessageSize { 4 + batch.wordCount, 0 });
auto listBuilder = request.initKeys(batch.pairCount);
auto entryIt = countedFlush.entries.begin();
for (size_t i = 0; i < batch.pairCount; ++i) {
auto& entry = **(entryIt++);
listBuilder.set(i, entry.key.asBytes());
}

// We're done with the batching instructions, free them before we go async.
countedFlush.entries.clear();
countedFlush.batches.clear();

auto countedDelete = kj::mv(countedFlush.countedDelete);

// See the comment in flushImplUsingTxn for why we need to construct our RPC and then wait on
// reads before actually sending the write. The same exact logic applies here.
co_await waitForPastReads();
try {
auto response = co_await request.send();
countedDelete->resultFulfiller->fulfill(response.getNumDeleted());
} catch (kj::Exception& e) {
if (e.getType() == kj::Exception::Type::DISCONNECTED) {
// This deletion will be retried, so don't touch the fulfiller.
} else {
countedDelete->resultFulfiller->reject(kj::cp(e));
}
throw kj::mv(e);
}
}

kj::Promise<void> ActorCache::flushImplAlarmOnly(DirtyAlarm dirty) {
co_await waitForPastReads();

Expand Down
2 changes: 2 additions & 0 deletions src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ class ActorCache final: public ActorCacheInterface {
};
using CountedDeleteFlushes = kj::Array<CountedDeleteFlush>;
kj::Promise<void> flushImplUsingSinglePut(PutFlush putFlush);
kj::Promise<void> flushImplUsingSingleMutedDelete(MutedDeleteFlush mutedFlush);
kj::Promise<void> flushImplUsingSingleCountedDelete(CountedDeleteFlush countedFlush);
kj::Promise<void> flushImplAlarmOnly(DirtyAlarm dirty);
kj::Promise<void> flushImplUsingTxn(
PutFlush putFlush, MutedDeleteFlush mutedDeleteFlush,
Expand Down