Source file src/internal/poll/fd_mutex.go

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll
     6  
     7  import "sync/atomic"
     8  
     9  // fdMutex is a specialized synchronization primitive that manages
    10  // lifetime of an fd and serializes access to Read, Write and Close
    11  // methods on FD.
    12  type fdMutex struct {
    13  	state uint64
    14  	rsema uint32
    15  	wsema uint32
    16  }
    17  
    18  // fdMutex.state is organized as follows:
    19  // 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
    20  // 1 bit - lock for read operations.
    21  // 1 bit - lock for write operations.
    22  // 20 bits - total number of references (read+write+misc).
    23  // 20 bits - number of outstanding read waiters.
    24  // 20 bits - number of outstanding write waiters.
    25  const (
    26  	mutexClosed  = 1 << 0
    27  	mutexRLock   = 1 << 1
    28  	mutexWLock   = 1 << 2
    29  	mutexRef     = 1 << 3
    30  	mutexRefMask = (1<<20 - 1) << 3
    31  	mutexRWait   = 1 << 23
    32  	mutexRMask   = (1<<20 - 1) << 23
    33  	mutexWWait   = 1 << 43
    34  	mutexWMask   = (1<<20 - 1) << 43
    35  )
    36  
    37  const (
    38  	readlock  = true
    39  	writeLock = false
    40  	waitLock  = true
    41  	tryLock   = false
    42  )
    43  
    44  const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
    45  
    46  // Read operations must do rwlock(readlock, waitLock)/rwunlock(readlock).
    47  //
    48  // Write operations must do rwlock(writeLock, waitLock)/rwunlock(writeLock).
    49  //
    50  // Misc operations must do incref/decref.
    51  // Misc operations include functions like setsockopt and setDeadline.
    52  // They need to use incref/decref to ensure that they operate on the
    53  // correct fd in presence of a concurrent close call (otherwise fd can
    54  // be closed under their feet).
    55  //
    56  // Close operations must do increfAndClose/decref.
    57  
    58  // incref adds a reference to mu.
    59  // It reports whether mu is available for reading or writing.
    60  func (mu *fdMutex) incref() bool {
    61  	for {
    62  		old := atomic.LoadUint64(&mu.state)
    63  		if old&mutexClosed != 0 {
    64  			return false
    65  		}
    66  		new := old + mutexRef
    67  		if new&mutexRefMask == 0 {
    68  			panic(overflowMsg)
    69  		}
    70  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    71  			return true
    72  		}
    73  	}
    74  }
    75  
    76  // increfAndClose sets the state of mu to closed.
    77  // It returns false if the file was already closed.
    78  func (mu *fdMutex) increfAndClose() bool {
    79  	for {
    80  		old := atomic.LoadUint64(&mu.state)
    81  		if old&mutexClosed != 0 {
    82  			return false
    83  		}
    84  		// Mark as closed and acquire a reference.
    85  		new := (old | mutexClosed) + mutexRef
    86  		if new&mutexRefMask == 0 {
    87  			panic(overflowMsg)
    88  		}
    89  		// Remove all read and write waiters.
    90  		new &^= mutexRMask | mutexWMask
    91  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
    92  			// Wake all read and write waiters,
    93  			// they will observe closed flag after wakeup.
    94  			for old&mutexRMask != 0 {
    95  				old -= mutexRWait
    96  				runtime_Semrelease(&mu.rsema)
    97  			}
    98  			for old&mutexWMask != 0 {
    99  				old -= mutexWWait
   100  				runtime_Semrelease(&mu.wsema)
   101  			}
   102  			return true
   103  		}
   104  	}
   105  }
   106  
   107  // decref removes a reference from mu.
   108  // It reports whether there is no remaining reference.
   109  func (mu *fdMutex) decref() bool {
   110  	for {
   111  		old := atomic.LoadUint64(&mu.state)
   112  		if old&mutexRefMask == 0 {
   113  			panic("inconsistent poll.fdMutex")
   114  		}
   115  		new := old - mutexRef
   116  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   117  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   118  		}
   119  	}
   120  }
   121  
   122  // lock adds a reference to mu and locks mu.
   123  // It reports whether mu is available for reading or writing.
   124  // If wait is false, lock fails immediately if mu is not available.
   125  func (mu *fdMutex) rwlock(read bool, wait bool) bool {
   126  	var mutexBit, mutexWait, mutexMask uint64
   127  	var mutexSema *uint32
   128  	if read {
   129  		mutexBit = mutexRLock
   130  		mutexWait = mutexRWait
   131  		mutexMask = mutexRMask
   132  		mutexSema = &mu.rsema
   133  	} else {
   134  		mutexBit = mutexWLock
   135  		mutexWait = mutexWWait
   136  		mutexMask = mutexWMask
   137  		mutexSema = &mu.wsema
   138  	}
   139  	for {
   140  		old := atomic.LoadUint64(&mu.state)
   141  		if old&mutexClosed != 0 {
   142  			return false
   143  		}
   144  		var new uint64
   145  		if old&mutexBit == 0 {
   146  			// Lock is free, acquire it.
   147  			new = (old | mutexBit) + mutexRef
   148  			if new&mutexRefMask == 0 {
   149  				panic(overflowMsg)
   150  			}
   151  		} else {
   152  			// Wait for lock.
   153  			if !wait {
   154  				return false
   155  			}
   156  			new = old + mutexWait
   157  			if new&mutexMask == 0 {
   158  				panic(overflowMsg)
   159  			}
   160  		}
   161  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   162  			if old&mutexBit == 0 {
   163  				return true
   164  			}
   165  			runtime_Semacquire(mutexSema)
   166  			// The signaller has subtracted mutexWait.
   167  		}
   168  	}
   169  }
   170  
   171  // unlock removes a reference from mu and unlocks mu.
   172  // It reports whether there is no remaining reference.
   173  func (mu *fdMutex) rwunlock(read bool) bool {
   174  	var mutexBit, mutexWait, mutexMask uint64
   175  	var mutexSema *uint32
   176  	if read {
   177  		mutexBit = mutexRLock
   178  		mutexWait = mutexRWait
   179  		mutexMask = mutexRMask
   180  		mutexSema = &mu.rsema
   181  	} else {
   182  		mutexBit = mutexWLock
   183  		mutexWait = mutexWWait
   184  		mutexMask = mutexWMask
   185  		mutexSema = &mu.wsema
   186  	}
   187  	for {
   188  		old := atomic.LoadUint64(&mu.state)
   189  		if old&mutexBit == 0 || old&mutexRefMask == 0 {
   190  			panic("inconsistent poll.fdMutex")
   191  		}
   192  		// Drop lock, drop reference and wake read waiter if present.
   193  		new := (old &^ mutexBit) - mutexRef
   194  		if old&mutexMask != 0 {
   195  			new -= mutexWait
   196  		}
   197  		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
   198  			if old&mutexMask != 0 {
   199  				runtime_Semrelease(mutexSema)
   200  			}
   201  			return new&(mutexClosed|mutexRefMask) == mutexClosed
   202  		}
   203  	}
   204  }
   205  
   206  // Implemented in runtime package.
   207  func runtime_Semacquire(sema *uint32)
   208  func runtime_Semrelease(sema *uint32)
   209  
   210  // incref adds a reference to fd.
   211  // It returns an error when fd cannot be used.
   212  func (fd *FD) incref() error {
   213  	if !fd.fdmu.incref() {
   214  		return errClosing(fd.isFile)
   215  	}
   216  	return nil
   217  }
   218  
   219  // decref removes a reference from fd.
   220  // It also closes fd when the state of fd is set to closed and there
   221  // is no remaining reference.
   222  func (fd *FD) decref() error {
   223  	if fd.fdmu.decref() {
   224  		return fd.destroy()
   225  	}
   226  	return nil
   227  }
   228  
   229  // readLock adds a reference to fd and locks fd for reading.
   230  // It returns an error when fd cannot be used for reading.
   231  func (fd *FD) readLock() error {
   232  	if !fd.fdmu.rwlock(readlock, waitLock) {
   233  		return errClosing(fd.isFile)
   234  	}
   235  	return nil
   236  }
   237  
   238  // readUnlock removes a reference from fd and unlocks fd for reading.
   239  // It also closes fd when the state of fd is set to closed and there
   240  // is no remaining reference.
   241  func (fd *FD) readUnlock() {
   242  	if fd.fdmu.rwunlock(readlock) {
   243  		fd.destroy()
   244  	}
   245  }
   246  
   247  // writeLock adds a reference to fd and locks fd for writing.
   248  // It returns an error when fd cannot be used for writing.
   249  func (fd *FD) writeLock() error {
   250  	if !fd.fdmu.rwlock(writeLock, waitLock) {
   251  		return errClosing(fd.isFile)
   252  	}
   253  	return nil
   254  }
   255  
   256  // writeUnlock removes a reference from fd and unlocks fd for writing.
   257  // It also closes fd when the state of fd is set to closed and there
   258  // is no remaining reference.
   259  func (fd *FD) writeUnlock() {
   260  	if fd.fdmu.rwunlock(writeLock) {
   261  		fd.destroy()
   262  	}
   263  }
   264  
   265  // readWriteLock adds a reference to fd and locks fd for reading and writing.
   266  // It returns an error when fd cannot be used for reading and writing.
   267  func (fd *FD) readWriteLock() error {
   268  	if !fd.fdmu.rwlock(readlock, waitLock) {
   269  		return errClosing(fd.isFile)
   270  	}
   271  	if !fd.fdmu.rwlock(writeLock, waitLock) {
   272  		if fd.fdmu.rwunlock(readlock) {
   273  			fd.destroy()
   274  		}
   275  		return errClosing(fd.isFile)
   276  	}
   277  	return nil
   278  }
   279  
   280  // tryReadWriteLock tries to add a reference to fd and lock fd for reading and writing.
   281  // It returns (false, nil) when fd is not available for reading and writing but is not closing.
   282  // It returns (false, errClosing) when fd is closing.
   283  func (fd *FD) tryReadWriteLock() (bool, error) {
   284  	if !fd.fdmu.rwlock(readlock, tryLock) {
   285  		if fd.closing() {
   286  			return false, errClosing(fd.isFile)
   287  		}
   288  		return false, nil
   289  	}
   290  	if !fd.fdmu.rwlock(writeLock, tryLock) {
   291  		if fd.fdmu.rwunlock(readlock) {
   292  			fd.destroy()
   293  		}
   294  		if fd.closing() {
   295  			return false, errClosing(fd.isFile)
   296  		}
   297  		return false, nil
   298  	}
   299  	return true, nil
   300  }
   301  
   302  // readWriteUnlock removes a reference from fd and unlocks fd for reading and writing.
   303  // It also closes fd when the state of fd is set to closed and there
   304  // is no remaining reference.
   305  func (fd *FD) readWriteUnlock() {
   306  	fd.fdmu.rwunlock(readlock)
   307  	if fd.fdmu.rwunlock(writeLock) {
   308  		fd.destroy()
   309  	}
   310  }
   311  
   312  // closing returns true if fd is closing.
   313  func (fd *FD) closing() bool {
   314  	return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
   315  }
   316  

View as plain text