1
2
3
4
5 package sql
6
7 import (
8 "sync"
9 "sync/atomic"
10 )
11
12
13
14
15 type closingMutex struct {
16
17
18
19
20
21 state atomic.Int64
22 mu sync.Mutex
23 read *sync.Cond
24 write *sync.Cond
25 }
26
27 func (m *closingMutex) RLock() {
28 if m.TryRLock() {
29 return
30 }
31
32
33 m.mu.Lock()
34 defer m.mu.Unlock()
35 for {
36 if m.TryRLock() {
37 return
38 }
39 m.init()
40 m.read.Wait()
41 }
42 }
43
44 func (m *closingMutex) RUnlock() {
45 for {
46 x := m.state.Load()
47 if x < 2 {
48 panic("runlock of un-rlocked mutex")
49 }
50 if m.state.CompareAndSwap(x, x-2) {
51 if x-2 == 1 {
52
53
54 m.mu.Lock()
55 defer m.mu.Unlock()
56 m.write.Broadcast()
57 }
58 return
59 }
60 }
61 }
62
63 func (m *closingMutex) Lock() {
64 m.mu.Lock()
65 defer m.mu.Unlock()
66 for {
67 x := m.state.Load()
68 if (x == 0 || x == 1) && m.state.CompareAndSwap(x, -1) {
69 return
70 }
71
72 if x&1 == 0 && !m.state.CompareAndSwap(x, x|1) {
73 continue
74 }
75 m.init()
76 m.write.Wait()
77 }
78 }
79
80 func (m *closingMutex) Unlock() {
81 m.mu.Lock()
82 defer m.mu.Unlock()
83 if !m.state.CompareAndSwap(-1, 0) {
84 panic("unlock of unlocked mutex")
85 }
86 if m.read != nil {
87 m.read.Broadcast()
88 m.write.Broadcast()
89 }
90 }
91
92 func (m *closingMutex) TryRLock() bool {
93 for {
94 x := m.state.Load()
95 if x < 0 {
96 return false
97 }
98 if m.state.CompareAndSwap(x, x+2) {
99 return true
100 }
101 }
102 }
103
104 func (m *closingMutex) init() {
105
106
107 if m.read == nil {
108 m.read = sync.NewCond(&m.mu)
109 m.write = sync.NewCond(&m.mu)
110 }
111 }
112
View as plain text