Skip to content

Commit 1442997

Browse files
authored
Add a MultiGet API to reduce lock contention (#145)
* Add a MultiGet API to reduce lock contention * update comment
1 parent f89a867 commit 1442997

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

cache.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,47 @@ func (cache *Cache) Get(key []byte) (value []byte, err error) {
8686
return
8787
}
8888

89+
// MultiGet returns values and errors for the given keys. The returned slices
90+
// have the same length as keys; values[i] and errs[i] correspond to keys[i].
91+
// A miss is represented by values[i] == nil and errs[i] == ErrNotFound.
92+
// MultiGet reduces lock contention by grouping keys by segment and acquiring
93+
// each segment lock at most once.
94+
// Note that MultiGet holds each segment lock longer than a single Get (for
95+
// the duration of all keys in that segment), which can increase Get tail
96+
// latency when MultiGet and Get run concurrently.
97+
func (cache *Cache) MultiGet(keys [][]byte) (values [][]byte, errs []error) {
98+
n := len(keys)
99+
if n == 0 {
100+
return nil, nil
101+
}
102+
values = make([][]byte, n)
103+
errs = make([]error, n)
104+
type keyLoc struct {
105+
idx int
106+
hashVal uint64
107+
}
108+
var groups [segmentCount][]keyLoc
109+
for i, key := range keys {
110+
hashVal := hashFunc(key)
111+
segID := hashVal & segmentAndOpVal
112+
groups[segID] = append(groups[segID], keyLoc{idx: i, hashVal: hashVal})
113+
}
114+
for segID := 0; segID < segmentCount; segID++ {
115+
batch := groups[segID]
116+
if len(batch) == 0 {
117+
continue
118+
}
119+
cache.locks[segID].Lock()
120+
for _, loc := range batch {
121+
value, _, err := cache.segments[segID].get(keys[loc.idx], nil, loc.hashVal, false)
122+
values[loc.idx] = value
123+
errs[loc.idx] = err
124+
}
125+
cache.locks[segID].Unlock()
126+
}
127+
return values, errs
128+
}
129+
89130
// GetFn is equivalent to Get or GetWithBuf, but it attempts to be zero-copy,
90131
// calling the provided function with slice view over the current underlying
91132
// value of the key in memory. The slice is constrained in length and capacity.

cache_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,41 @@ func TestPeekWithExpiration(t *testing.T) {
315315
})
316316
}
317317

318+
func TestMultiGet(t *testing.T) {
319+
cache := NewCache(1024)
320+
cache.Set([]byte("k1"), []byte("v1"), 0)
321+
cache.Set([]byte("k2"), []byte("v2"), 0)
322+
cache.Set([]byte("k3"), []byte("v3"), 0)
323+
324+
keys := [][]byte{[]byte("k1"), []byte("k2"), []byte("missing"), []byte("k3")}
325+
values, errs := cache.MultiGet(keys)
326+
if len(values) != len(keys) || len(errs) != len(keys) {
327+
t.Fatalf("len(values)=%d, len(errs)=%d, want %d", len(values), len(errs), len(keys))
328+
}
329+
if errs[0] != nil || !bytes.Equal(values[0], []byte("v1")) {
330+
t.Errorf("keys[0]: value=%q, err=%v", values[0], errs[0])
331+
}
332+
if errs[1] != nil || !bytes.Equal(values[1], []byte("v2")) {
333+
t.Errorf("keys[1]: value=%q, err=%v", values[1], errs[1])
334+
}
335+
if errs[2] != ErrNotFound || values[2] != nil {
336+
t.Errorf("keys[2] (missing): value=%v, err=%v", values[2], errs[2])
337+
}
338+
if errs[3] != nil || !bytes.Equal(values[3], []byte("v3")) {
339+
t.Errorf("keys[3]: value=%q, err=%v", values[3], errs[3])
340+
}
341+
342+
// Empty keys
343+
values, errs = cache.MultiGet(nil)
344+
if values != nil || errs != nil {
345+
t.Errorf("MultiGet(nil): got values=%v, errs=%v", values, errs)
346+
}
347+
values, errs = cache.MultiGet([][]byte{})
348+
if values != nil || errs != nil {
349+
t.Errorf("MultiGet(empty): got values=%v, errs=%v", values, errs)
350+
}
351+
}
352+
318353
func TestGetWithExpirationAndBuf(t *testing.T) {
319354
cache := NewCache(1024)
320355
key := []byte("abcd")
@@ -1070,6 +1105,65 @@ func BenchmarkCacheGetWithExpiration(b *testing.B) {
10701105
}
10711106
}
10721107

1108+
const (
1109+
benchDataSize = 100_00
1110+
benchBatchSize = 100
1111+
)
1112+
1113+
func BenchmarkParallelCacheGetBatched(b *testing.B) {
1114+
b.ReportAllocs()
1115+
b.StopTimer()
1116+
cache := NewCache(256 * 1024 * 1024)
1117+
buf := make([]byte, 64)
1118+
var key [8]byte
1119+
for i := 0; i < benchDataSize; i++ {
1120+
binary.LittleEndian.PutUint64(key[:], uint64(i))
1121+
cache.Set(key[:], buf, 0)
1122+
}
1123+
b.StartTimer()
1124+
b.RunParallel(func(pb *testing.PB) {
1125+
keys := make([][]byte, benchBatchSize)
1126+
for i := range keys {
1127+
keys[i] = make([]byte, 8)
1128+
}
1129+
i := 0
1130+
for pb.Next() {
1131+
for j := 0; j < benchBatchSize; j++ {
1132+
binary.LittleEndian.PutUint64(key[:], uint64((i+j)%benchDataSize))
1133+
cache.Get(key[:])
1134+
}
1135+
i++
1136+
}
1137+
})
1138+
}
1139+
1140+
func BenchmarkParallelCacheMultiGetBatched(b *testing.B) {
1141+
b.ReportAllocs()
1142+
b.StopTimer()
1143+
cache := NewCache(256 * 1024 * 1024)
1144+
buf := make([]byte, 64)
1145+
var key [8]byte
1146+
for i := 0; i < benchDataSize; i++ {
1147+
binary.LittleEndian.PutUint64(key[:], uint64(i))
1148+
cache.Set(key[:], buf, 0)
1149+
}
1150+
b.StartTimer()
1151+
b.RunParallel(func(pb *testing.PB) {
1152+
keys := make([][]byte, benchBatchSize)
1153+
for i := range keys {
1154+
keys[i] = make([]byte, 8)
1155+
}
1156+
i := 0
1157+
for pb.Next() {
1158+
for j := 0; j < benchBatchSize; j++ {
1159+
binary.LittleEndian.PutUint64(keys[j], uint64((i+j)%benchDataSize))
1160+
}
1161+
cache.MultiGet(keys)
1162+
i++
1163+
}
1164+
})
1165+
}
1166+
10731167
func BenchmarkMapGet(b *testing.B) {
10741168
b.StopTimer()
10751169
m := make(map[string][]byte)

0 commit comments

Comments
 (0)