Skip to content

Commit 85f5435

Browse files
kevin-dpclaudeautofix-ci[bot]samwillis
authored
fix: defer query refresh until pending offline transactions complete (#1390)
* test(query-db-collection): add e2e test for offline tx + query refresh race Adds an integration test that reproduces the race condition where a query-backed collection reverts to stale server state when coming back online with pending offline transactions. The queryFn refetch returns pre-mutation data before the offline transaction reaches the server, and when the transaction completes the optimistic state is cleaned up with nothing to replace it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes * fix(query-db-collection): exclude e2e tests from default vitest run The e2e tests import @tanstack/offline-transactions which may not be built during the default `pnpm test` CI step. Adds a vitest.config.ts that excludes the e2e directory from the default run while preserving typecheck on the regular test files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes * fix(query-db-collection): fix type error in e2e test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: build offline-transactions before query-db-collection e2e tests The new e2e test imports @tanstack/offline-transactions, which needs to be built before the query e2e tests run. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(query-db-collection): fix type cast in e2e test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: defer query refresh until pending offline transactions complete When coming back online with pending offline transactions, query-backed collections would refetch stale server state before the mutations reached the server. After the transaction completed, the optimistic state was cleaned up but syncedData still had the stale data, causing items to temporarily disappear. The fix adds a `deferDataRefresh` property on Collection that the offline executor sets while replaying pending transactions. The query-db-collection checks this barrier in handleQueryResult: stale results are skipped, and a fresh refetch is triggered once the barrier resolves. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes * chore: add changeset for offline query refresh fix Made-with: Cursor * fix(query-db-collection): resolve e2e test typecheck setup Made-with: Cursor --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Sam Willis <sam.willis@gmail.com>
1 parent 7dc025c commit 85f5435

File tree

10 files changed

+385
-8
lines changed

10 files changed

+385
-8
lines changed

.changeset/olive-coins-sleep.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
'@tanstack/db': patch
3+
'@tanstack/offline-transactions': patch
4+
'@tanstack/query-db-collection': patch
5+
---
6+
7+
fix: prevent stale query refreshes from overwriting optimistic offline changes on reconnect
8+
9+
When reconnecting with pending offline transactions, query-backed collections now defer processing query refreshes until queued writes finish replaying, avoiding temporary reverts to stale server data.

.github/workflows/e2e-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ jobs:
4141
pnpm --filter @tanstack/db-ivm build
4242
pnpm --filter @tanstack/db build
4343
pnpm --filter @tanstack/electric-db-collection build
44+
pnpm --filter @tanstack/offline-transactions build
4445
pnpm --filter @tanstack/query-db-collection build
4546
4647
- name: Run Electric E2E tests

packages/db/src/collection/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,13 @@ export class CollectionImpl<
301301
// and for debugging
302302
public _state: CollectionStateManager<TOutput, TKey, TSchema, TInput>
303303

304+
/**
305+
* When set, collection consumers should defer processing incoming data
306+
* refreshes until this promise resolves. This prevents stale data from
307+
* overwriting optimistic state while pending writes are being applied.
308+
*/
309+
public deferDataRefresh: Promise<void> | null = null
310+
304311
private comparisonOpts: StringCollationConfig
305312

306313
/**

packages/offline-transactions/src/OfflineExecutor.ts

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,36 @@ export class OfflineExecutor {
221221
this.unsubscribeOnline = this.onlineDetector.subscribe(() => {
222222
if (this.isOfflineEnabled && this.executor) {
223223
this.executor.resetRetryDelays()
224-
this.executor.executeAll().catch((error) => {
225-
console.warn(
226-
`Failed to execute transactions on connectivity change:`,
227-
error,
228-
)
229-
})
224+
225+
if (this.scheduler.getPendingCount() > 0) {
226+
const barrierPromise = this.executor.executeAll()
227+
228+
for (const collection of Object.values(this.config.collections)) {
229+
collection.deferDataRefresh = barrierPromise
230+
}
231+
232+
barrierPromise
233+
.catch((error) => {
234+
console.warn(
235+
`Failed to execute transactions on connectivity change:`,
236+
error,
237+
)
238+
})
239+
.finally(() => {
240+
for (const collection of Object.values(this.config.collections)) {
241+
if (collection.deferDataRefresh === barrierPromise) {
242+
collection.deferDataRefresh = null
243+
}
244+
}
245+
})
246+
} else {
247+
this.executor.executeAll().catch((error) => {
248+
console.warn(
249+
`Failed to execute transactions on connectivity change:`,
250+
error,
251+
)
252+
})
253+
}
230254
}
231255
})
232256
}
@@ -568,6 +592,10 @@ export class OfflineExecutor {
568592
}
569593

570594
dispose(): void {
595+
for (const collection of Object.values(this.config.collections)) {
596+
collection.deferDataRefresh = null
597+
}
598+
571599
if (this.unsubscribeOnline) {
572600
this.unsubscribeOnline()
573601
this.unsubscribeOnline = null
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
/**
2+
* Integration test: offline transactions + query collection refresh
3+
*
4+
* Verifies that a query-backed collection does not revert to stale server
5+
* state when coming back online with pending offline transactions.
6+
*/
7+
8+
import { describe, expect, it, vi } from 'vitest'
9+
import { createCollection } from '@tanstack/db'
10+
import { QueryClient } from '@tanstack/query-core'
11+
import { startOfflineExecutor } from '@tanstack/offline-transactions'
12+
import { queryCollectionOptions } from '../src/query'
13+
import type { Collection } from '@tanstack/db'
14+
import type {
15+
LeaderElection,
16+
OfflineConfig,
17+
OnlineDetector,
18+
StorageAdapter,
19+
} from '@tanstack/offline-transactions'
20+
21+
// --- Browser API mocks needed by @tanstack/offline-transactions ---
22+
// jsdom doesn't provide navigator.locks, which the WebLocksLeader uses.
23+
// We pass custom implementations (FakeLeaderElection, ManualOnlineDetector,
24+
// FakeStorageAdapter) so these mocks just prevent initialization errors.
25+
26+
if (!(globalThis.navigator as any)?.locks) {
27+
Object.defineProperty(globalThis.navigator, `locks`, {
28+
value: { request: vi.fn().mockResolvedValue(false) },
29+
configurable: true,
30+
})
31+
}
32+
33+
// --- Test helpers ---
34+
35+
const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0))
36+
37+
class ManualOnlineDetector implements OnlineDetector {
38+
private listeners = new Set<() => void>()
39+
private online: boolean
40+
41+
constructor(initialOnline: boolean) {
42+
this.online = initialOnline
43+
}
44+
45+
subscribe(callback: () => void): () => void {
46+
this.listeners.add(callback)
47+
return () => {
48+
this.listeners.delete(callback)
49+
}
50+
}
51+
52+
notifyOnline(): void {
53+
for (const listener of this.listeners) {
54+
listener()
55+
}
56+
}
57+
58+
isOnline(): boolean {
59+
return this.online
60+
}
61+
62+
setOnline(isOnline: boolean): void {
63+
this.online = isOnline
64+
if (isOnline) {
65+
this.notifyOnline()
66+
}
67+
}
68+
69+
dispose(): void {
70+
this.listeners.clear()
71+
}
72+
}
73+
74+
class FakeStorageAdapter implements StorageAdapter {
75+
private store = new Map<string, string>()
76+
77+
get(key: string): Promise<string | null> {
78+
return Promise.resolve(this.store.has(key) ? this.store.get(key)! : null)
79+
}
80+
81+
set(key: string, value: string): Promise<void> {
82+
this.store.set(key, value)
83+
return Promise.resolve()
84+
}
85+
86+
delete(key: string): Promise<void> {
87+
this.store.delete(key)
88+
return Promise.resolve()
89+
}
90+
91+
keys(): Promise<Array<string>> {
92+
return Promise.resolve(Array.from(this.store.keys()))
93+
}
94+
95+
clear(): Promise<void> {
96+
this.store.clear()
97+
return Promise.resolve()
98+
}
99+
}
100+
101+
class FakeLeaderElection implements LeaderElection {
102+
private listeners = new Set<(isLeader: boolean) => void>()
103+
private leader = true
104+
105+
requestLeadership(): Promise<boolean> {
106+
this.notify(this.leader)
107+
return Promise.resolve(this.leader)
108+
}
109+
110+
releaseLeadership(): void {
111+
this.leader = false
112+
this.notify(false)
113+
}
114+
115+
isLeader(): boolean {
116+
return this.leader
117+
}
118+
119+
onLeadershipChange(callback: (isLeader: boolean) => void): () => void {
120+
this.listeners.add(callback)
121+
return () => {
122+
this.listeners.delete(callback)
123+
}
124+
}
125+
126+
private notify(isLeader: boolean): void {
127+
for (const listener of this.listeners) {
128+
listener(isLeader)
129+
}
130+
}
131+
}
132+
133+
// --- Test item type ---
134+
135+
interface TestItem {
136+
id: string
137+
value: string
138+
}
139+
140+
type OfflineMutationParams = Parameters<OfflineConfig[`mutationFns`][string]>[0]
141+
142+
// --- Tests ---
143+
144+
describe(`offline transactions + query collection refresh`, () => {
145+
it(`should not revert optimistic state when query refetches before pending offline transactions complete`, async () => {
146+
// This test verifies that when a user goes offline, queues a mutation,
147+
// and comes back online, the collection does not temporarily lose the
148+
// optimistic insert. In a query-backed collection, data flows through
149+
// query refetches (queryFn), not directly from the mutation function.
150+
// When refetchOnReconnect fires before the offline transaction reaches
151+
// the server, the refetch returns stale data. The optimistic state
152+
// should remain visible until the transaction completes and a fresh
153+
// refetch confirms the data.
154+
155+
const onlineDetector = new ManualOnlineDetector(false) // Start offline
156+
const storage = new FakeStorageAdapter()
157+
158+
// --- Mock server state ---
159+
const serverItems: Array<TestItem> = [
160+
{ id: `item-1`, value: `server-data` },
161+
]
162+
163+
// Control when the mutation fn resolves
164+
let resolveMutation: (() => void) | null = null
165+
166+
const queryClient = new QueryClient({
167+
defaultOptions: {
168+
queries: {
169+
staleTime: 0,
170+
retry: false,
171+
},
172+
},
173+
})
174+
175+
// queryFn reads from serverItems (simulating a real API GET endpoint)
176+
const queryFn = vi.fn().mockImplementation(() => {
177+
return Promise.resolve([...serverItems])
178+
})
179+
180+
// Create the query-backed collection
181+
const collection = createCollection(
182+
queryCollectionOptions({
183+
id: `offline-refresh-test`,
184+
queryClient,
185+
queryKey: [`offline-refresh-test`],
186+
queryFn,
187+
getKey: (item: TestItem) => item.id,
188+
startSync: true,
189+
}),
190+
)
191+
192+
// Wait for initial query to populate the collection
193+
await vi.waitFor(() => {
194+
expect(queryFn).toHaveBeenCalledTimes(1)
195+
expect(collection.size).toBe(1)
196+
})
197+
expect(collection.get(`item-1`)?.value).toBe(`server-data`)
198+
199+
// --- Set up offline executor ---
200+
const mutationFnName = `syncData`
201+
const offlineConfig: OfflineConfig = {
202+
collections: { [`offline-refresh-test`]: collection as any },
203+
mutationFns: {
204+
[mutationFnName]: async (params: OfflineMutationParams) => {
205+
// Block until the test explicitly resolves (simulating slow API POST)
206+
await new Promise<void>((resolve) => {
207+
resolveMutation = resolve
208+
})
209+
210+
// Update server state (simulating the server processing the mutation)
211+
for (const mutation of params.transaction.mutations) {
212+
if (mutation.type === `insert`) {
213+
serverItems.push(mutation.modified as unknown as TestItem)
214+
}
215+
}
216+
217+
return { ok: true }
218+
},
219+
},
220+
storage,
221+
leaderElection: new FakeLeaderElection(),
222+
onlineDetector,
223+
}
224+
225+
const executor = startOfflineExecutor(offlineConfig)
226+
await executor.waitForInit()
227+
228+
// --- Go offline and create an offline mutation ---
229+
const offlineTx = executor.createOfflineTransaction({
230+
mutationFnName,
231+
autoCommit: false,
232+
})
233+
234+
offlineTx.mutate(() => {
235+
;(collection as Collection<TestItem, string, any>).insert({
236+
id: `item-2`,
237+
value: `offline-insert`,
238+
})
239+
})
240+
241+
// Commit while offline: persists to outbox, mutation fn NOT called yet
242+
const commitPromise = offlineTx.commit()
243+
await flushMicrotasks()
244+
245+
// Verify: item-2 is visible through optimistic state
246+
expect(collection.get(`item-2`)?.value).toBe(`offline-insert`)
247+
expect(collection.get(`item-1`)?.value).toBe(`server-data`)
248+
249+
// --- Come online ---
250+
// This triggers both:
251+
// 1. The offline executor replaying pending transactions (mutationFn called)
252+
// 2. TanStack Query potentially refetching (refetchOnReconnect default)
253+
onlineDetector.setOnline(true)
254+
await flushMicrotasks()
255+
256+
// Trigger a query refetch that returns stale server state.
257+
// The server doesn't have item-2 yet (the mutation is still in progress).
258+
// This simulates what refetchOnReconnect would do.
259+
await collection.utils.refetch()
260+
261+
// The refetch returned stale data (only item-1), but item-2 should
262+
// still be visible because the offline transaction is still pending
263+
// and the optimistic state should cover the gap.
264+
expect(collection.get(`item-2`)?.value).toBe(`offline-insert`)
265+
266+
// --- Complete the mutation (server processes it) ---
267+
expect(resolveMutation).not.toBeNull()
268+
resolveMutation!()
269+
270+
// Wait for the transaction to fully complete
271+
await commitPromise
272+
273+
// After the transaction completes, item-2 should remain visible.
274+
//
275+
// Without the fix: the stale refetch overwrote syncedData with only
276+
// item-1, the optimistic state was cleaned up, and item-2 is gone
277+
// permanently (no fresh refetch is triggered).
278+
//
279+
// With the fix: the stale refetch was skipped (barrier), and a fresh
280+
// refetch is triggered once the barrier resolves. The fresh refetch
281+
// includes item-2 because the server now has it. We use waitFor to
282+
// allow the barrier-triggered refetch to complete.
283+
await vi.waitFor(
284+
() => {
285+
expect(collection.get(`item-2`)?.value).toBe(`offline-insert`)
286+
},
287+
{ timeout: 1000 },
288+
)
289+
290+
executor.dispose()
291+
queryClient.clear()
292+
})
293+
})

packages/query-db-collection/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
"typescript": ">=4.7"
5555
},
5656
"devDependencies": {
57+
"@tanstack/offline-transactions": "workspace:*",
5758
"@tanstack/query-core": "^5.90.20",
5859
"@vitest/coverage-istanbul": "^3.2.4"
5960
}

0 commit comments

Comments
 (0)