Skip to content

Commit a6d175b

Browse files
authored
Merge pull request #5958 from cloudflare/milan/STOR-4832
Revert "Revert "Run alarm without syncing alarm manager if currentTim…
2 parents b98ed20 + 1ef908b commit a6d175b

File tree

7 files changed

+91
-28
lines changed

7 files changed

+91
-28
lines changed

src/workerd/api/global-scope.c++

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,9 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
438438
}
439439
}
440440

441+
auto currentTime = context.now();
441442
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(
442-
scheduledTime, context.getCurrentTraceSpan(), false, actorId)) {
443+
scheduledTime, context.getCurrentTraceSpan(), currentTime, false, actorId)) {
443444
KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) {
444445
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
445446
if (handler.alarm == kj::none) {

src/workerd/io/actor-cache-test.c++

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4938,6 +4938,8 @@ KJ_TEST("ActorCache alarm get/put") {
49384938

49394939
auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
49404940
auto twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
4941+
// Used as the "current time" parameter for armAlarmHandler in tests.
4942+
auto testCurrentTime = kj::UNIX_EPOCH;
49414943
{
49424944
// Test alarm writes happen transactionally with storage ops
49434945
test.setAlarm(oneMs);
@@ -4979,7 +4981,8 @@ KJ_TEST("ActorCache alarm get/put") {
49794981

49804982
{
49814983
// we have a cached time == nullptr, so we should not attempt to run an alarm
4982-
auto armResult = test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, nullptr);
4984+
auto armResult =
4985+
test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, nullptr, testCurrentTime);
49834986
KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
49844987
auto cancelResult = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>());
49854988
KJ_ASSERT(cancelResult.waitBeforeCancel.poll(ws));
@@ -4997,7 +5000,7 @@ KJ_TEST("ActorCache alarm get/put") {
49975000
{
49985001
// Test that alarm handler handle clears alarm when dropped with no writes
49995002
{
5000-
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
5003+
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
50015004
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50025005
}
50035006
mockStorage->expectCall("deleteAlarm", ws)
@@ -5010,7 +5013,7 @@ KJ_TEST("ActorCache alarm get/put") {
50105013

50115014
// Test that alarm handler handle does not clear alarm when dropped with writes
50125015
{
5013-
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
5016+
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
50145017
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50155018
test.setAlarm(twoMs);
50165019
}
@@ -5024,7 +5027,7 @@ KJ_TEST("ActorCache alarm get/put") {
50245027

50255028
// Test that alarm handler handle does not cache delete when it fails
50265029
{
5027-
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr);
5030+
auto armResult = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
50285031
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50295032
}
50305033
mockStorage->expectCall("deleteAlarm", ws)
@@ -5036,7 +5039,7 @@ KJ_TEST("ActorCache alarm get/put") {
50365039
{
50375040
// Test that alarm handler handle does not cache alarm delete when noCache == true
50385041
{
5039-
auto armResult = test.cache.armAlarmHandler(twoMs, nullptr, true);
5042+
auto armResult = test.cache.armAlarmHandler(twoMs, nullptr, testCurrentTime, true);
50405043
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50415044
}
50425045
mockStorage->expectCall("deleteAlarm", ws)
@@ -5073,6 +5076,7 @@ KJ_TEST("ActorCache alarm delete when flush fails") {
50735076
auto& mockStorage = test.mockStorage;
50745077

50755078
auto oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH;
5079+
auto testCurrentTime = kj::UNIX_EPOCH;
50765080

50775081
{
50785082
auto time = expectUncached(test.getAlarm());
@@ -5090,7 +5094,7 @@ KJ_TEST("ActorCache alarm delete when flush fails") {
50905094
// we want to test that even if a flush is retried
50915095
// that the post-delete actions for a checked delete happen.
50925096
{
5093-
auto handle = test.cache.armAlarmHandler(oneMs, nullptr);
5097+
auto handle = test.cache.armAlarmHandler(oneMs, nullptr, testCurrentTime);
50945098

50955099
auto time = expectCached(test.getAlarm());
50965100
KJ_ASSERT(time == kj::none);

src/workerd/io/actor-cache.c++

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,11 @@ kj::Maybe<kj::Promise<void>> ActorCache::evictStale(kj::Date now) {
164164
}
165165

166166
kj::OneOf<ActorCache::CancelAlarmHandler, ActorCache::RunAlarmHandler> ActorCache::armAlarmHandler(
167-
kj::Date scheduledTime, SpanParent parentSpan, bool noCache, kj::StringPtr actorId) {
167+
kj::Date scheduledTime,
168+
SpanParent parentSpan,
169+
kj::Date currentTime KJ_UNUSED,
170+
bool noCache,
171+
kj::StringPtr actorId) {
168172
noCache = noCache || lru.options.noCache;
169173

170174
KJ_ASSERT(!currentAlarmTime.is<DeferredAlarmDelete>());

src/workerd/io/actor-cache.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,8 +243,12 @@ class ActorCacheInterface: public ActorCacheOps {
243243
};
244244

245245
// Call when entering the alarm handler.
246+
//
247+
// `currentTime` is used to determine if an overdue alarm should run immediately even when
248+
// the local alarm state differs from the scheduled time (to avoid blocking on storage sync).
246249
virtual kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
247250
SpanParent parentSpan,
251+
kj::Date currentTime,
248252
bool noCache = false,
249253
kj::StringPtr actorId = "") = 0;
250254

@@ -363,6 +367,7 @@ class ActorCache final: public ActorCacheInterface {
363367

364368
kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
365369
SpanParent parentSpan,
370+
kj::Date currentTime,
366371
bool noCache = false,
367372
kj::StringPtr actorId = "") override;
368373
void cancelDeferredAlarmDeletion() override;

src/workerd/io/actor-sqlite-test.c++

Lines changed: 47 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2222
static constexpr kj::Date threeMs = 3 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2323
static constexpr kj::Date fourMs = 4 * kj::MILLISECONDS + kj::UNIX_EPOCH;
2424
static constexpr kj::Date fiveMs = 5 * kj::MILLISECONDS + kj::UNIX_EPOCH;
25+
// Used as the "current time" parameter for armAlarmHandler in tests.
26+
// Set to epoch (before all test alarm times) so existing tests aren't affected by
27+
// the overdue alarm check.
28+
static constexpr kj::Date testCurrentTime = kj::UNIX_EPOCH;
2529

2630
template <typename T>
2731
kj::Promise<T> eagerlyReportExceptions(kj::Promise<T> promise, kj::SourceLocation location = {}) {
@@ -588,7 +592,7 @@ KJ_TEST("tells alarm handler to cancel when committed alarm is empty") {
588592
ActorSqliteTest test;
589593

590594
{
591-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
595+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
592596
// We expect armAlarmHandler() to tell us to cancel the alarm.
593597
KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
594598
auto waitPromise = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>().waitBeforeCancel);
@@ -614,7 +618,7 @@ KJ_TEST("tells alarm handler to reschedule when handler alarm is later than comm
614618
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
615619

616620
// Request handler run at 2ms. Expect cancellation with rescheduling.
617-
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
621+
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
618622
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
619623
auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());
620624

@@ -638,7 +642,7 @@ KJ_TEST("tells alarm handler to reschedule when handler alarm is earlier than co
638642
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
639643

640644
// Expect that armAlarmHandler() tells caller to cancel after rescheduling completes.
641-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
645+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
642646
KJ_ASSERT(armResult.is<ActorSqlite::CancelAlarmHandler>());
643647
auto cancelResult = kj::mv(armResult.get<ActorSqlite::CancelAlarmHandler>());
644648

@@ -651,6 +655,32 @@ KJ_TEST("tells alarm handler to reschedule when handler alarm is earlier than co
651655
waitBeforeCancel.wait(test.ws);
652656
}
653657

658+
KJ_TEST("runs overdue alarm immediately when local alarm time is in the past") {
659+
ActorSqliteTest test;
660+
661+
// Initialize alarm state to 2ms.
662+
test.setAlarm(twoMs);
663+
test.pollAndExpectCalls({"scheduleRun(2ms)"})[0]->fulfill();
664+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
665+
test.pollAndExpectCalls({});
666+
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
667+
668+
// The local state says the alarm is due to fire at 2ms, but we're saying the AlarmManager has 1ms,
669+
// usually this would result in a rescheduling of the alarm, but since our currentTime is 5ms, we
670+
// will just run the alarm now since it's already overdue.
671+
{
672+
auto overdueCurrentTime = fiveMs;
673+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, overdueCurrentTime);
674+
675+
// Should run the handler immediately instead of canceling/rescheduling.
676+
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
677+
}
678+
679+
// commit and delete the alarm after we drop the alarm handler (this is a deferred delete).
680+
test.pollAndExpectCalls({"commit"})[0]->fulfill();
681+
test.pollAndExpectCalls({"scheduleRun(none)"})[0]->fulfill();
682+
}
683+
654684
KJ_TEST("does not cancel handler when local db alarm state is later than scheduled alarm") {
655685
ActorSqliteTest test;
656686

@@ -663,7 +693,7 @@ KJ_TEST("does not cancel handler when local db alarm state is later than schedul
663693

664694
test.setAlarm(twoMs);
665695
{
666-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
696+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
667697
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
668698
}
669699
test.pollAndExpectCalls({"commit"})[0]->fulfill();
@@ -682,7 +712,7 @@ KJ_TEST("does not cancel handler when local db alarm state is earlier than sched
682712

683713
test.setAlarm(oneMs);
684714
{
685-
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
715+
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
686716
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
687717
}
688718
test.pollAndExpectCalls({"scheduleRun(1ms)"})[0]->fulfill();
@@ -700,7 +730,7 @@ KJ_TEST("getAlarm() returns null during handler") {
700730
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
701731

702732
{
703-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
733+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
704734
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
705735
test.pollAndExpectCalls({});
706736

@@ -721,7 +751,7 @@ KJ_TEST("alarm handler handle clears alarm when dropped with no writes") {
721751
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
722752

723753
{
724-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
754+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
725755
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
726756
}
727757
test.pollAndExpectCalls({"commit"})[0]->fulfill();
@@ -740,7 +770,7 @@ KJ_TEST("alarm deleter does not clear alarm when dropped with writes") {
740770
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
741771

742772
{
743-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
773+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
744774
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
745775
test.setAlarm(twoMs);
746776
}
@@ -761,7 +791,7 @@ KJ_TEST("can cancel deferred alarm deletion during handler") {
761791
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
762792

763793
{
764-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
794+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
765795
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
766796
test.actor.cancelDeferredAlarmDeletion();
767797
}
@@ -780,7 +810,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler has no effect") {
780810
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
781811

782812
{
783-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
813+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
784814
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
785815
}
786816
test.pollAndExpectCalls({"commit"})[0]->fulfill();
@@ -805,7 +835,7 @@ KJ_TEST("canceling deferred alarm deletion outside handler edge case") {
805835
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
806836

807837
{
808-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
838+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
809839
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
810840
}
811841
test.actor.cancelDeferredAlarmDeletion();
@@ -827,7 +857,7 @@ KJ_TEST("canceling deferred alarm deletion is idempotent") {
827857
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
828858

829859
{
830-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
860+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
831861
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
832862
test.actor.cancelDeferredAlarmDeletion();
833863
test.actor.cancelDeferredAlarmDeletion();
@@ -848,7 +878,7 @@ KJ_TEST("alarm handler cleanup succeeds when output gate is broken") {
848878
test.pollAndExpectCalls({});
849879
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
850880

851-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
881+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
852882
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
853883
auto deferredDelete = kj::mv(armResult.get<ActorSqlite::RunAlarmHandler>().deferredDelete);
854884

@@ -895,7 +925,7 @@ KJ_TEST("handler alarm is not deleted when commit fails") {
895925
KJ_ASSERT(expectSync(test.getAlarm()) == oneMs);
896926

897927
{
898-
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr);
928+
auto armResult = test.actor.armAlarmHandler(oneMs, nullptr, testCurrentTime);
899929
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
900930

901931
KJ_ASSERT(expectSync(test.getAlarm()) == kj::none);
@@ -1342,7 +1372,7 @@ KJ_TEST("rolling back transaction leaves deferred alarm deletion in expected sta
13421372
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
13431373

13441374
{
1345-
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
1375+
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
13461376
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
13471377

13481378
auto txn = test.actor.startTransaction();
@@ -1375,7 +1405,7 @@ KJ_TEST("committing transaction leaves deferred alarm deletion in expected state
13751405
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
13761406

13771407
{
1378-
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
1408+
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
13791409
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
13801410

13811411
auto txn = test.actor.startTransaction();
@@ -1406,7 +1436,7 @@ KJ_TEST("rolling back nested transaction leaves deferred alarm deletion in expec
14061436
KJ_ASSERT(expectSync(test.getAlarm()) == twoMs);
14071437

14081438
{
1409-
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr);
1439+
auto armResult = test.actor.armAlarmHandler(twoMs, nullptr, testCurrentTime);
14101440
KJ_ASSERT(armResult.is<ActorSqlite::RunAlarmHandler>());
14111441

14121442
auto txn1 = test.actor.startTransaction();

src/workerd/io/actor-sqlite.c++

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -883,8 +883,11 @@ void ActorSqlite::shutdown(kj::Maybe<const kj::Exception&> maybeException) {
883883
}
884884

885885
kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSqlite::
886-
armAlarmHandler(
887-
kj::Date scheduledTime, SpanParent parentSpan, bool noCache, kj::StringPtr actorId) {
886+
armAlarmHandler(kj::Date scheduledTime,
887+
SpanParent parentSpan,
888+
kj::Date currentTime,
889+
bool noCache,
890+
kj::StringPtr actorId) {
888891
KJ_ASSERT(!inAlarmHandler);
889892

890893
if (haveDeferredDelete) {
@@ -896,6 +899,20 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
896899
auto localAlarmState = metadata.getAlarm();
897900
if (localAlarmState != scheduledTime) {
898901
if (localAlarmState == lastConfirmedAlarmDbState) {
902+
// If the local alarm time is already in the past, just run the handler now. This avoids
903+
// blocking alarm execution on the AlarmManager sync when storage is overloaded. The alarm
904+
// will either delete itself on success or reschedule on failure.
905+
if ((willFireEarlier(localAlarmState, currentTime))) {
906+
LOG_WARNING_PERIODICALLY(
907+
"NOSENTRY SQLite alarm overdue, running despite AlarmManager mismatch", scheduledTime,
908+
KJ_ASSERT_NONNULL(localAlarmState), currentTime, actorId);
909+
haveDeferredDelete = true;
910+
inAlarmHandler = true;
911+
deferredAlarmSpan = kj::mv(parentSpan);
912+
static const DeferredAlarmDeleter disposer;
913+
return RunAlarmHandler{.deferredDelete = kj::Own<void>(this, disposer)};
914+
}
915+
899916
// If there's a clean db time that differs from the requested handler's scheduled time, this
900917
// run should be canceled.
901918
if (willFireEarlier(scheduledTime, localAlarmState)) {
@@ -929,10 +946,11 @@ kj::OneOf<ActorSqlite::CancelAlarmHandler, ActorSqlite::RunAlarmHandler> ActorSq
929946
// which suggests that either the alarm manager is working with stale data or that local
930947
// alarm time has somehow gotten out of sync with the scheduled alarm time.
931948

932-
// Only log if the alarm manager is significantly late (>10 seconds behind SQLite)
933949
// We know localAlarmState has a value here because we're in the branch where it's earlier
934950
// than scheduledTime (not equal, and not later).
935951
auto localTime = KJ_ASSERT_NONNULL(localAlarmState);
952+
953+
// Only log if the alarm manager is significantly late (>10 seconds behind SQLite)
936954
if (scheduledTime - localTime > 10 * kj::SECONDS) {
937955
LOG_WARNING_PERIODICALLY(
938956
"NOSENTRY SQLite alarm handler canceled.", scheduledTime, actorId, localTime);

src/workerd/io/actor-sqlite.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH
9696
void shutdown(kj::Maybe<const kj::Exception&> maybeException) override;
9797
kj::OneOf<CancelAlarmHandler, RunAlarmHandler> armAlarmHandler(kj::Date scheduledTime,
9898
SpanParent parentSpan,
99+
kj::Date currentTime,
99100
bool noCache = false,
100101
kj::StringPtr actorId = "") override;
101102
void cancelDeferredAlarmDeletion() override;

0 commit comments

Comments
 (0)