pipe.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // +build windows
  2. package winio
  3. import (
  4. "errors"
  5. "io"
  6. "net"
  7. "os"
  8. "syscall"
  9. "time"
  10. "unsafe"
  11. )
  12. //sys connectNamedPipe(pipe syscall.Handle, o *syscall.Overlapped) (err error) = ConnectNamedPipe
  13. //sys createNamedPipe(name string, flags uint32, pipeMode uint32, maxInstances uint32, outSize uint32, inSize uint32, defaultTimeout uint32, sa *syscall.SecurityAttributes) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateNamedPipeW
  14. //sys createFile(name string, access uint32, mode uint32, sa *syscall.SecurityAttributes, createmode uint32, attrs uint32, templatefile syscall.Handle) (handle syscall.Handle, err error) [failretval==syscall.InvalidHandle] = CreateFileW
  15. //sys getNamedPipeInfo(pipe syscall.Handle, flags *uint32, outSize *uint32, inSize *uint32, maxInstances *uint32) (err error) = GetNamedPipeInfo
  16. //sys getNamedPipeHandleState(pipe syscall.Handle, state *uint32, curInstances *uint32, maxCollectionCount *uint32, collectDataTimeout *uint32, userName *uint16, maxUserNameSize uint32) (err error) = GetNamedPipeHandleStateW
  17. //sys localAlloc(uFlags uint32, length uint32) (ptr uintptr) = LocalAlloc
  18. const (
  19. cERROR_PIPE_BUSY = syscall.Errno(231)
  20. cERROR_NO_DATA = syscall.Errno(232)
  21. cERROR_PIPE_CONNECTED = syscall.Errno(535)
  22. cERROR_SEM_TIMEOUT = syscall.Errno(121)
  23. cPIPE_ACCESS_DUPLEX = 0x3
  24. cFILE_FLAG_FIRST_PIPE_INSTANCE = 0x80000
  25. cSECURITY_SQOS_PRESENT = 0x100000
  26. cSECURITY_ANONYMOUS = 0
  27. cPIPE_REJECT_REMOTE_CLIENTS = 0x8
  28. cPIPE_UNLIMITED_INSTANCES = 255
  29. cNMPWAIT_USE_DEFAULT_WAIT = 0
  30. cNMPWAIT_NOWAIT = 1
  31. cPIPE_TYPE_MESSAGE = 4
  32. cPIPE_READMODE_MESSAGE = 2
  33. )
  34. var (
  35. // ErrPipeListenerClosed is returned for pipe operations on listeners that have been closed.
  36. // This error should match net.errClosing since docker takes a dependency on its text.
  37. ErrPipeListenerClosed = errors.New("use of closed network connection")
  38. errPipeWriteClosed = errors.New("pipe has been closed for write")
  39. )
  40. type win32Pipe struct {
  41. *win32File
  42. path string
  43. }
  44. type win32MessageBytePipe struct {
  45. win32Pipe
  46. writeClosed bool
  47. readEOF bool
  48. }
  49. type pipeAddress string
  50. func (f *win32Pipe) LocalAddr() net.Addr {
  51. return pipeAddress(f.path)
  52. }
  53. func (f *win32Pipe) RemoteAddr() net.Addr {
  54. return pipeAddress(f.path)
  55. }
  56. func (f *win32Pipe) SetDeadline(t time.Time) error {
  57. f.SetReadDeadline(t)
  58. f.SetWriteDeadline(t)
  59. return nil
  60. }
  61. // CloseWrite closes the write side of a message pipe in byte mode.
  62. func (f *win32MessageBytePipe) CloseWrite() error {
  63. if f.writeClosed {
  64. return errPipeWriteClosed
  65. }
  66. err := f.win32File.Flush()
  67. if err != nil {
  68. return err
  69. }
  70. _, err = f.win32File.Write(nil)
  71. if err != nil {
  72. return err
  73. }
  74. f.writeClosed = true
  75. return nil
  76. }
  77. // Write writes bytes to a message pipe in byte mode. Zero-byte writes are ignored, since
  78. // they are used to implement CloseWrite().
  79. func (f *win32MessageBytePipe) Write(b []byte) (int, error) {
  80. if f.writeClosed {
  81. return 0, errPipeWriteClosed
  82. }
  83. if len(b) == 0 {
  84. return 0, nil
  85. }
  86. return f.win32File.Write(b)
  87. }
  88. // Read reads bytes from a message pipe in byte mode. A read of a zero-byte message on a message
  89. // mode pipe will return io.EOF, as will all subsequent reads.
  90. func (f *win32MessageBytePipe) Read(b []byte) (int, error) {
  91. if f.readEOF {
  92. return 0, io.EOF
  93. }
  94. n, err := f.win32File.Read(b)
  95. if err == io.EOF {
  96. // If this was the result of a zero-byte read, then
  97. // it is possible that the read was due to a zero-size
  98. // message. Since we are simulating CloseWrite with a
  99. // zero-byte message, ensure that all future Read() calls
  100. // also return EOF.
  101. f.readEOF = true
  102. } else if err == syscall.ERROR_MORE_DATA {
  103. // ERROR_MORE_DATA indicates that the pipe's read mode is message mode
  104. // and the message still has more bytes. Treat this as a success, since
  105. // this package presents all named pipes as byte streams.
  106. err = nil
  107. }
  108. return n, err
  109. }
  110. func (s pipeAddress) Network() string {
  111. return "pipe"
  112. }
  113. func (s pipeAddress) String() string {
  114. return string(s)
  115. }
  116. // DialPipe connects to a named pipe by path, timing out if the connection
  117. // takes longer than the specified duration. If timeout is nil, then we use
  118. // a default timeout of 5 seconds. (We do not use WaitNamedPipe.)
  119. func DialPipe(path string, timeout *time.Duration) (net.Conn, error) {
  120. var absTimeout time.Time
  121. if timeout != nil {
  122. absTimeout = time.Now().Add(*timeout)
  123. } else {
  124. absTimeout = time.Now().Add(time.Second * 2)
  125. }
  126. var err error
  127. var h syscall.Handle
  128. for {
  129. h, err = createFile(path, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.OPEN_EXISTING, syscall.FILE_FLAG_OVERLAPPED|cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  130. if err != cERROR_PIPE_BUSY {
  131. break
  132. }
  133. if time.Now().After(absTimeout) {
  134. return nil, ErrTimeout
  135. }
  136. // Wait 10 msec and try again. This is a rather simplistic
  137. // view, as we always try each 10 milliseconds.
  138. time.Sleep(time.Millisecond * 10)
  139. }
  140. if err != nil {
  141. return nil, &os.PathError{Op: "open", Path: path, Err: err}
  142. }
  143. var flags uint32
  144. err = getNamedPipeInfo(h, &flags, nil, nil, nil)
  145. if err != nil {
  146. return nil, err
  147. }
  148. f, err := makeWin32File(h)
  149. if err != nil {
  150. syscall.Close(h)
  151. return nil, err
  152. }
  153. // If the pipe is in message mode, return a message byte pipe, which
  154. // supports CloseWrite().
  155. if flags&cPIPE_TYPE_MESSAGE != 0 {
  156. return &win32MessageBytePipe{
  157. win32Pipe: win32Pipe{win32File: f, path: path},
  158. }, nil
  159. }
  160. return &win32Pipe{win32File: f, path: path}, nil
  161. }
  162. type acceptResponse struct {
  163. f *win32File
  164. err error
  165. }
  166. type win32PipeListener struct {
  167. firstHandle syscall.Handle
  168. path string
  169. securityDescriptor []byte
  170. config PipeConfig
  171. acceptCh chan (chan acceptResponse)
  172. closeCh chan int
  173. doneCh chan int
  174. }
  175. func makeServerPipeHandle(path string, securityDescriptor []byte, c *PipeConfig, first bool) (syscall.Handle, error) {
  176. var flags uint32 = cPIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
  177. if first {
  178. flags |= cFILE_FLAG_FIRST_PIPE_INSTANCE
  179. }
  180. var mode uint32 = cPIPE_REJECT_REMOTE_CLIENTS
  181. if c.MessageMode {
  182. mode |= cPIPE_TYPE_MESSAGE
  183. }
  184. sa := &syscall.SecurityAttributes{}
  185. sa.Length = uint32(unsafe.Sizeof(*sa))
  186. if securityDescriptor != nil {
  187. len := uint32(len(securityDescriptor))
  188. sa.SecurityDescriptor = localAlloc(0, len)
  189. defer localFree(sa.SecurityDescriptor)
  190. copy((*[0xffff]byte)(unsafe.Pointer(sa.SecurityDescriptor))[:], securityDescriptor)
  191. }
  192. h, err := createNamedPipe(path, flags, mode, cPIPE_UNLIMITED_INSTANCES, uint32(c.OutputBufferSize), uint32(c.InputBufferSize), 0, sa)
  193. if err != nil {
  194. return 0, &os.PathError{Op: "open", Path: path, Err: err}
  195. }
  196. return h, nil
  197. }
  198. func (l *win32PipeListener) makeServerPipe() (*win32File, error) {
  199. h, err := makeServerPipeHandle(l.path, l.securityDescriptor, &l.config, false)
  200. if err != nil {
  201. return nil, err
  202. }
  203. f, err := makeWin32File(h)
  204. if err != nil {
  205. syscall.Close(h)
  206. return nil, err
  207. }
  208. return f, nil
  209. }
  210. func (l *win32PipeListener) makeConnectedServerPipe() (*win32File, error) {
  211. p, err := l.makeServerPipe()
  212. if err != nil {
  213. return nil, err
  214. }
  215. // Wait for the client to connect.
  216. ch := make(chan error)
  217. go func(p *win32File) {
  218. ch <- connectPipe(p)
  219. }(p)
  220. select {
  221. case err = <-ch:
  222. if err != nil {
  223. p.Close()
  224. p = nil
  225. }
  226. case <-l.closeCh:
  227. // Abort the connect request by closing the handle.
  228. p.Close()
  229. p = nil
  230. err = <-ch
  231. if err == nil || err == ErrFileClosed {
  232. err = ErrPipeListenerClosed
  233. }
  234. }
  235. return p, err
  236. }
  237. func (l *win32PipeListener) listenerRoutine() {
  238. closed := false
  239. for !closed {
  240. select {
  241. case <-l.closeCh:
  242. closed = true
  243. case responseCh := <-l.acceptCh:
  244. var (
  245. p *win32File
  246. err error
  247. )
  248. for {
  249. p, err = l.makeConnectedServerPipe()
  250. // If the connection was immediately closed by the client, try
  251. // again.
  252. if err != cERROR_NO_DATA {
  253. break
  254. }
  255. }
  256. responseCh <- acceptResponse{p, err}
  257. closed = err == ErrPipeListenerClosed
  258. }
  259. }
  260. syscall.Close(l.firstHandle)
  261. l.firstHandle = 0
  262. // Notify Close() and Accept() callers that the handle has been closed.
  263. close(l.doneCh)
  264. }
  265. // PipeConfig contain configuration for the pipe listener.
  266. type PipeConfig struct {
  267. // SecurityDescriptor contains a Windows security descriptor in SDDL format.
  268. SecurityDescriptor string
  269. // MessageMode determines whether the pipe is in byte or message mode. In either
  270. // case the pipe is read in byte mode by default. The only practical difference in
  271. // this implementation is that CloseWrite() is only supported for message mode pipes;
  272. // CloseWrite() is implemented as a zero-byte write, but zero-byte writes are only
  273. // transferred to the reader (and returned as io.EOF in this implementation)
  274. // when the pipe is in message mode.
  275. MessageMode bool
  276. // InputBufferSize specifies the size the input buffer, in bytes.
  277. InputBufferSize int32
  278. // OutputBufferSize specifies the size the input buffer, in bytes.
  279. OutputBufferSize int32
  280. }
  281. // ListenPipe creates a listener on a Windows named pipe path, e.g. \\.\pipe\mypipe.
  282. // The pipe must not already exist.
  283. func ListenPipe(path string, c *PipeConfig) (net.Listener, error) {
  284. var (
  285. sd []byte
  286. err error
  287. )
  288. if c == nil {
  289. c = &PipeConfig{}
  290. }
  291. if c.SecurityDescriptor != "" {
  292. sd, err = SddlToSecurityDescriptor(c.SecurityDescriptor)
  293. if err != nil {
  294. return nil, err
  295. }
  296. }
  297. h, err := makeServerPipeHandle(path, sd, c, true)
  298. if err != nil {
  299. return nil, err
  300. }
  301. // Create a client handle and connect it. This results in the pipe
  302. // instance always existing, so that clients see ERROR_PIPE_BUSY
  303. // rather than ERROR_FILE_NOT_FOUND. This ties the first instance
  304. // up so that no other instances can be used. This would have been
  305. // cleaner if the Win32 API matched CreateFile with ConnectNamedPipe
  306. // instead of CreateNamedPipe. (Apparently created named pipes are
  307. // considered to be in listening state regardless of whether any
  308. // active calls to ConnectNamedPipe are outstanding.)
  309. h2, err := createFile(path, 0, 0, nil, syscall.OPEN_EXISTING, cSECURITY_SQOS_PRESENT|cSECURITY_ANONYMOUS, 0)
  310. if err != nil {
  311. syscall.Close(h)
  312. return nil, err
  313. }
  314. // Close the client handle. The server side of the instance will
  315. // still be busy, leading to ERROR_PIPE_BUSY instead of
  316. // ERROR_NOT_FOUND, as long as we don't close the server handle,
  317. // or disconnect the client with DisconnectNamedPipe.
  318. syscall.Close(h2)
  319. l := &win32PipeListener{
  320. firstHandle: h,
  321. path: path,
  322. securityDescriptor: sd,
  323. config: *c,
  324. acceptCh: make(chan (chan acceptResponse)),
  325. closeCh: make(chan int),
  326. doneCh: make(chan int),
  327. }
  328. go l.listenerRoutine()
  329. return l, nil
  330. }
  331. func connectPipe(p *win32File) error {
  332. c, err := p.prepareIo()
  333. if err != nil {
  334. return err
  335. }
  336. defer p.wg.Done()
  337. err = connectNamedPipe(p.handle, &c.o)
  338. _, err = p.asyncIo(c, nil, 0, err)
  339. if err != nil && err != cERROR_PIPE_CONNECTED {
  340. return err
  341. }
  342. return nil
  343. }
  344. func (l *win32PipeListener) Accept() (net.Conn, error) {
  345. ch := make(chan acceptResponse)
  346. select {
  347. case l.acceptCh <- ch:
  348. response := <-ch
  349. err := response.err
  350. if err != nil {
  351. return nil, err
  352. }
  353. if l.config.MessageMode {
  354. return &win32MessageBytePipe{
  355. win32Pipe: win32Pipe{win32File: response.f, path: l.path},
  356. }, nil
  357. }
  358. return &win32Pipe{win32File: response.f, path: l.path}, nil
  359. case <-l.doneCh:
  360. return nil, ErrPipeListenerClosed
  361. }
  362. }
  363. func (l *win32PipeListener) Close() error {
  364. select {
  365. case l.closeCh <- 1:
  366. <-l.doneCh
  367. case <-l.doneCh:
  368. }
  369. return nil
  370. }
  371. func (l *win32PipeListener) Addr() net.Addr {
  372. return pipeAddress(l.path)
  373. }