Skip to content

Commit 4f89b3f

Browse files
MoonBoi9001claude
andcommitted
perf(index-node): batch block hash lookups for POI queries
Pre-fetch all block hashes in a single batch query before parallel POI processing, reducing database round-trips from 10+ to 1-2 per batch. - Add block_hashes_by_block_numbers batch method to ChainStore trait - Add get_public_proof_of_indexing_with_block_hash to StatusStore trait - Modify resolver to group requests by network and batch-fetch hashes - Pass pre-fetched hashes to avoid redundant lookups during parallel POI Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 888a12e commit 4f89b3f

File tree

6 files changed

+307
-25
lines changed

6 files changed

+307
-25
lines changed

graph/src/blockchain/mock.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,12 @@ impl ChainStore for MockChainStore {
550550
) -> Result<Vec<BlockHash>, Error> {
551551
unimplemented!()
552552
}
553+
async fn block_hashes_by_block_numbers(
554+
&self,
555+
_numbers: &[BlockNumber],
556+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error> {
557+
unimplemented!()
558+
}
553559
async fn confirm_block_hash(
554560
&self,
555561
_number: BlockNumber,

graph/src/components/store/traits.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,6 +602,12 @@ pub trait ChainStore: ChainHeadStore {
602602
number: BlockNumber,
603603
) -> Result<Vec<BlockHash>, Error>;
604604

605+
/// Return the hashes of all blocks with the given numbers (batch version)
606+
async fn block_hashes_by_block_numbers(
607+
&self,
608+
numbers: &[BlockNumber],
609+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error>;
610+
605611
/// Confirm that block number `number` has hash `hash` and that the store
606612
/// may purge any other blocks with that number
607613
async fn confirm_block_hash(
@@ -790,6 +796,19 @@ pub trait StatusStore: Send + Sync + 'static {
790796
block_number: BlockNumber,
791797
fetch_block_ptr: &dyn BlockPtrForNumber,
792798
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
799+
800+
/// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes
801+
/// to avoid redundant database lookups when processing batches of POI requests.
802+
async fn get_public_proof_of_indexing_with_block_hash(
803+
&self,
804+
subgraph_id: &DeploymentHash,
805+
block_number: BlockNumber,
806+
prefetched_hashes: Option<&Vec<BlockHash>>,
807+
fetch_block_ptr: &dyn BlockPtrForNumber,
808+
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError>;
809+
810+
/// Get the network for a deployment
811+
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError>;
793812
}
794813

795814
#[async_trait]

server/index-node/src/resolver.rs

Lines changed: 90 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::BTreeMap;
1+
use std::collections::{BTreeMap, HashMap};
22

33
use async_trait::async_trait;
44
use graph::data::query::Trace;
@@ -417,38 +417,103 @@ where
417417
return Err(QueryExecutionError::TooExpensive);
418418
}
419419

420-
// Process all POI requests in parallel for better throughput
421-
let poi_futures: Vec<_> = requests
422-
.into_iter()
423-
.map(|request| async move {
424-
let poi_result = match self
425-
.store
426-
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
420+
// Step 1: Group requests by network and collect block numbers for batch lookup
421+
let mut requests_by_network: HashMap<String, Vec<(usize, BlockNumber)>> = HashMap::new();
422+
let mut request_networks: Vec<Option<String>> = Vec::with_capacity(requests.len());
423+
424+
for (idx, request) in requests.iter().enumerate() {
425+
match self.store.network_for_deployment(&request.deployment).await {
426+
Ok(network) => {
427+
requests_by_network
428+
.entry(network.clone())
429+
.or_default()
430+
.push((idx, request.block_number));
431+
request_networks.push(Some(network));
432+
}
433+
Err(_) => {
434+
request_networks.push(None);
435+
}
436+
}
437+
}
438+
439+
// Step 2: Pre-fetch all block hashes per network in batch
440+
let mut block_hash_cache: HashMap<
441+
(String, BlockNumber),
442+
Vec<graph::blockchain::BlockHash>,
443+
> = HashMap::new();
444+
445+
for (network, network_requests) in &requests_by_network {
446+
let block_numbers: Vec<BlockNumber> =
447+
network_requests.iter().map(|(_, num)| *num).collect();
448+
449+
if let Some(chain_store) = self.store.block_store().chain_store(network).await {
450+
match chain_store
451+
.block_hashes_by_block_numbers(&block_numbers)
427452
.await
428453
{
429-
Ok(Some(poi)) => Some(poi),
430-
Ok(None) => None,
454+
Ok(hashes) => {
455+
for (num, hash_vec) in hashes {
456+
block_hash_cache.insert((network.clone(), num), hash_vec);
457+
}
458+
}
431459
Err(e) => {
432-
error!(
460+
debug!(
433461
self.logger,
434-
"Failed to query public proof of indexing";
435-
"subgraph" => &request.deployment,
436-
"block" => format!("{}", request.block_number),
462+
"Failed to batch fetch block hashes for network";
463+
"network" => network,
437464
"error" => format!("{:?}", e)
438465
);
439-
None
466+
// Continue without pre-fetched hashes - will fall back to individual lookups
467+
}
468+
}
469+
}
470+
}
471+
472+
// Step 3: Process all POI requests in parallel, using cached block hashes
473+
let poi_futures: Vec<_> = requests
474+
.into_iter()
475+
.zip(request_networks.into_iter())
476+
.map(|(request, network_opt)| {
477+
let cache = &block_hash_cache;
478+
async move {
479+
let prefetched_hashes = network_opt
480+
.as_ref()
481+
.and_then(|network| cache.get(&(network.clone(), request.block_number)));
482+
483+
let poi_result = match self
484+
.store
485+
.get_public_proof_of_indexing_with_block_hash(
486+
&request.deployment,
487+
request.block_number,
488+
prefetched_hashes,
489+
self,
490+
)
491+
.await
492+
{
493+
Ok(Some(poi)) => Some(poi),
494+
Ok(None) => None,
495+
Err(e) => {
496+
error!(
497+
self.logger,
498+
"Failed to query public proof of indexing";
499+
"subgraph" => &request.deployment,
500+
"block" => format!("{}", request.block_number),
501+
"error" => format!("{:?}", e)
502+
);
503+
None
504+
}
505+
};
506+
507+
PublicProofOfIndexingResult {
508+
deployment: request.deployment,
509+
block: match poi_result {
510+
Some((ref block, _)) => block.clone(),
511+
None => PartialBlockPtr::from(request.block_number),
512+
},
513+
proof_of_indexing: poi_result.map(|(_, poi)| poi),
440514
}
441-
};
442-
443-
PublicProofOfIndexingResult {
444-
deployment: request.deployment,
445-
block: match poi_result {
446-
Some((ref block, _)) => block.clone(),
447-
None => PartialBlockPtr::from(request.block_number),
448-
},
449-
proof_of_indexing: poi_result.map(|(_, poi)| poi),
515+
.into_value()
450516
}
451-
.into_value()
452517
})
453518
.collect();
454519

store/postgres/src/chain_store.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -739,6 +739,58 @@ mod data {
739739
}
740740
}
741741

742+
/// Return the hashes of all blocks with the given block numbers (batch version)
743+
pub(super) async fn block_hashes_by_block_numbers(
744+
&self,
745+
conn: &mut AsyncPgConnection,
746+
chain: &str,
747+
numbers: &[BlockNumber],
748+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error> {
749+
if numbers.is_empty() {
750+
return Ok(HashMap::new());
751+
}
752+
753+
match self {
754+
Storage::Shared => {
755+
use public::ethereum_blocks as b;
756+
757+
let results = b::table
758+
.select((b::number, b::hash))
759+
.filter(b::network_name.eq(chain))
760+
.filter(b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))))
761+
.load::<(i64, String)>(conn)
762+
.await?;
763+
764+
let mut map: HashMap<BlockNumber, Vec<BlockHash>> = HashMap::new();
765+
for (num, hash) in results {
766+
let block_hash = hash.parse()?;
767+
map.entry(num as BlockNumber).or_default().push(block_hash);
768+
}
769+
Ok(map)
770+
}
771+
Storage::Private(Schema { blocks, .. }) => {
772+
let results = blocks
773+
.table()
774+
.select((blocks.number(), blocks.hash()))
775+
.filter(
776+
blocks
777+
.number()
778+
.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))),
779+
)
780+
.load::<(i64, Vec<u8>)>(conn)
781+
.await?;
782+
783+
let mut map: HashMap<BlockNumber, Vec<BlockHash>> = HashMap::new();
784+
for (num, hash) in results {
785+
map.entry(num as BlockNumber)
786+
.or_default()
787+
.push(BlockHash::from(hash));
788+
}
789+
Ok(map)
790+
}
791+
}
792+
}
793+
742794
pub(super) async fn confirm_block_hash(
743795
&self,
744796
conn: &mut AsyncPgConnection,
@@ -2971,6 +3023,16 @@ impl ChainStoreTrait for ChainStore {
29713023
.await
29723024
}
29733025

3026+
async fn block_hashes_by_block_numbers(
3027+
&self,
3028+
numbers: &[BlockNumber],
3029+
) -> Result<HashMap<BlockNumber, Vec<BlockHash>>, Error> {
3030+
let mut conn = self.pool.get_permitted().await?;
3031+
self.storage
3032+
.block_hashes_by_block_numbers(&mut conn, &self.chain, numbers)
3033+
.await
3034+
}
3035+
29743036
async fn confirm_block_hash(
29753037
&self,
29763038
number: BlockNumber,

store/postgres/src/store.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,28 @@ impl StatusStore for Store {
171171
.await
172172
}
173173

174+
async fn get_public_proof_of_indexing_with_block_hash(
175+
&self,
176+
subgraph_id: &DeploymentHash,
177+
block_number: BlockNumber,
178+
prefetched_hashes: Option<&Vec<graph::blockchain::BlockHash>>,
179+
fetch_block_ptr: &dyn BlockPtrForNumber,
180+
) -> Result<Option<(PartialBlockPtr, [u8; 32])>, StoreError> {
181+
self.subgraph_store
182+
.get_public_proof_of_indexing_with_block_hash(
183+
subgraph_id,
184+
block_number,
185+
prefetched_hashes,
186+
self.block_store().clone(),
187+
fetch_block_ptr,
188+
)
189+
.await
190+
}
191+
192+
async fn network_for_deployment(&self, id: &DeploymentHash) -> Result<String, StoreError> {
193+
self.subgraph_store.network_for_deployment(id).await
194+
}
195+
174196
async fn query_permit(&self) -> QueryPermit {
175197
// Status queries go to the primary shard.
176198
self.block_store.query_permit_primary().await

0 commit comments

Comments
 (0)