1
2
3
4
5 package poll
6
7 import "sync/atomic"
8
9
10
11
12 type fdMutex struct {
13 state uint64
14 rsema uint32
15 wsema uint32
16 }
17
18
19
20
21
22
23
24
25 const (
26 mutexClosed = 1 << 0
27 mutexRLock = 1 << 1
28 mutexWLock = 1 << 2
29 mutexRef = 1 << 3
30 mutexRefMask = (1<<20 - 1) << 3
31 mutexRWait = 1 << 23
32 mutexRMask = (1<<20 - 1) << 23
33 mutexWWait = 1 << 43
34 mutexWMask = (1<<20 - 1) << 43
35 )
36
37 const (
38 readlock = true
39 writeLock = false
40 waitLock = true
41 tryLock = false
42 )
43
44 const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 func (mu *fdMutex) incref() bool {
61 for {
62 old := atomic.LoadUint64(&mu.state)
63 if old&mutexClosed != 0 {
64 return false
65 }
66 new := old + mutexRef
67 if new&mutexRefMask == 0 {
68 panic(overflowMsg)
69 }
70 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
71 return true
72 }
73 }
74 }
75
76
77
78 func (mu *fdMutex) increfAndClose() bool {
79 for {
80 old := atomic.LoadUint64(&mu.state)
81 if old&mutexClosed != 0 {
82 return false
83 }
84
85 new := (old | mutexClosed) + mutexRef
86 if new&mutexRefMask == 0 {
87 panic(overflowMsg)
88 }
89
90 new &^= mutexRMask | mutexWMask
91 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
92
93
94 for old&mutexRMask != 0 {
95 old -= mutexRWait
96 runtime_Semrelease(&mu.rsema)
97 }
98 for old&mutexWMask != 0 {
99 old -= mutexWWait
100 runtime_Semrelease(&mu.wsema)
101 }
102 return true
103 }
104 }
105 }
106
107
108
109 func (mu *fdMutex) decref() bool {
110 for {
111 old := atomic.LoadUint64(&mu.state)
112 if old&mutexRefMask == 0 {
113 panic("inconsistent poll.fdMutex")
114 }
115 new := old - mutexRef
116 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
117 return new&(mutexClosed|mutexRefMask) == mutexClosed
118 }
119 }
120 }
121
122
123
124
125 func (mu *fdMutex) rwlock(read bool, wait bool) bool {
126 var mutexBit, mutexWait, mutexMask uint64
127 var mutexSema *uint32
128 if read {
129 mutexBit = mutexRLock
130 mutexWait = mutexRWait
131 mutexMask = mutexRMask
132 mutexSema = &mu.rsema
133 } else {
134 mutexBit = mutexWLock
135 mutexWait = mutexWWait
136 mutexMask = mutexWMask
137 mutexSema = &mu.wsema
138 }
139 for {
140 old := atomic.LoadUint64(&mu.state)
141 if old&mutexClosed != 0 {
142 return false
143 }
144 var new uint64
145 if old&mutexBit == 0 {
146
147 new = (old | mutexBit) + mutexRef
148 if new&mutexRefMask == 0 {
149 panic(overflowMsg)
150 }
151 } else {
152
153 if !wait {
154 return false
155 }
156 new = old + mutexWait
157 if new&mutexMask == 0 {
158 panic(overflowMsg)
159 }
160 }
161 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
162 if old&mutexBit == 0 {
163 return true
164 }
165 runtime_Semacquire(mutexSema)
166
167 }
168 }
169 }
170
171
172
173 func (mu *fdMutex) rwunlock(read bool) bool {
174 var mutexBit, mutexWait, mutexMask uint64
175 var mutexSema *uint32
176 if read {
177 mutexBit = mutexRLock
178 mutexWait = mutexRWait
179 mutexMask = mutexRMask
180 mutexSema = &mu.rsema
181 } else {
182 mutexBit = mutexWLock
183 mutexWait = mutexWWait
184 mutexMask = mutexWMask
185 mutexSema = &mu.wsema
186 }
187 for {
188 old := atomic.LoadUint64(&mu.state)
189 if old&mutexBit == 0 || old&mutexRefMask == 0 {
190 panic("inconsistent poll.fdMutex")
191 }
192
193 new := (old &^ mutexBit) - mutexRef
194 if old&mutexMask != 0 {
195 new -= mutexWait
196 }
197 if atomic.CompareAndSwapUint64(&mu.state, old, new) {
198 if old&mutexMask != 0 {
199 runtime_Semrelease(mutexSema)
200 }
201 return new&(mutexClosed|mutexRefMask) == mutexClosed
202 }
203 }
204 }
205
206
207 func runtime_Semacquire(sema *uint32)
208 func runtime_Semrelease(sema *uint32)
209
210
211
212 func (fd *FD) incref() error {
213 if !fd.fdmu.incref() {
214 return errClosing(fd.isFile)
215 }
216 return nil
217 }
218
219
220
221
222 func (fd *FD) decref() error {
223 if fd.fdmu.decref() {
224 return fd.destroy()
225 }
226 return nil
227 }
228
229
230
231 func (fd *FD) readLock() error {
232 if !fd.fdmu.rwlock(readlock, waitLock) {
233 return errClosing(fd.isFile)
234 }
235 return nil
236 }
237
238
239
240
241 func (fd *FD) readUnlock() {
242 if fd.fdmu.rwunlock(readlock) {
243 fd.destroy()
244 }
245 }
246
247
248
249 func (fd *FD) writeLock() error {
250 if !fd.fdmu.rwlock(writeLock, waitLock) {
251 return errClosing(fd.isFile)
252 }
253 return nil
254 }
255
256
257
258
259 func (fd *FD) writeUnlock() {
260 if fd.fdmu.rwunlock(writeLock) {
261 fd.destroy()
262 }
263 }
264
265
266
267 func (fd *FD) readWriteLock() error {
268 if !fd.fdmu.rwlock(readlock, waitLock) {
269 return errClosing(fd.isFile)
270 }
271 if !fd.fdmu.rwlock(writeLock, waitLock) {
272 if fd.fdmu.rwunlock(readlock) {
273 fd.destroy()
274 }
275 return errClosing(fd.isFile)
276 }
277 return nil
278 }
279
280
281
282
283 func (fd *FD) tryReadWriteLock() (bool, error) {
284 if !fd.fdmu.rwlock(readlock, tryLock) {
285 if fd.closing() {
286 return false, errClosing(fd.isFile)
287 }
288 return false, nil
289 }
290 if !fd.fdmu.rwlock(writeLock, tryLock) {
291 if fd.fdmu.rwunlock(readlock) {
292 fd.destroy()
293 }
294 if fd.closing() {
295 return false, errClosing(fd.isFile)
296 }
297 return false, nil
298 }
299 return true, nil
300 }
301
302
303
304
305 func (fd *FD) readWriteUnlock() {
306 fd.fdmu.rwunlock(readlock)
307 if fd.fdmu.rwunlock(writeLock) {
308 fd.destroy()
309 }
310 }
311
312
313 func (fd *FD) closing() bool {
314 return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
315 }
316
View as plain text