"""
Pipe
----
A module for working with pipe objects in Windows.
"""
from collections import namedtuple
from six import integer_types
from pywincffi.core import dist
from pywincffi.core.checks import NON_ZERO, input_check, error_check, NoneType
from pywincffi.wintypes import SECURITY_ATTRIBUTES, HANDLE, wintype_to_cdata
PeekNamedPipeResult = namedtuple(
"PeekNamedPipeResult",
("lpBuffer", "lpBytesRead", "lpTotalBytesAvail",
"lpBytesLeftThisMessage")
)
[docs]def CreatePipe(lpPipeAttributes=None, nSize=0):
"""
Creates an anonymous pipe and returns the read and write handles.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365152
https://msdn.microsoft.com/en-us/library/aa379560
>>> from pywincffi.core import dist
>>> from pywincffi.kernel32 import CreatePipe
>>> from pywincffi.wintypes import SECURITY_ATTRIBUTES
>>> lpPipeAttributes = SECURITY_ATTRIBUTES()
>>> lpPipeAttributes.bInheritHandle = True
>>> reader, writer = CreatePipe(lpPipeAttributes=lpPipeAttributes)
:keyword pywincffi.wintypes.SECURITY_ATTRIBUTES lpPipeAttributes:
The security attributes to apply to the handle. By default
``NULL`` will be passed in, meaning the handle we create
cannot be inherited. For more detailed information see the links
below.
:keyword int nSize:
The size of the buffer in bytes. Passing in 0, which is the default
will cause the system to use the default buffer size.
:return:
Returns a tuple of :class:`pywincffi.wintype.HANDLE` containing the
reader and writer ends of the pipe that was created. The user of this
function is responsible for calling CloseHandle at some point.
"""
input_check("nSize", nSize, integer_types)
input_check(
"lpPipeAttributes", lpPipeAttributes,
allowed_types=(NoneType, SECURITY_ATTRIBUTES)
)
lpPipeAttributes = wintype_to_cdata(lpPipeAttributes)
ffi, library = dist.load()
hReadPipe = ffi.new("PHANDLE")
hWritePipe = ffi.new("PHANDLE")
code = library.CreatePipe(hReadPipe, hWritePipe, lpPipeAttributes, nSize)
error_check("CreatePipe", code=code, expected=NON_ZERO)
return HANDLE(hReadPipe[0]), HANDLE(hWritePipe[0])
[docs]def SetNamedPipeHandleState(
hNamedPipe,
lpMode=None, lpMaxCollectionCount=None, lpCollectDataTimeout=None):
"""
Sets the read and blocking mode of the specified ``hNamedPipe``.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365787
:param pywincffi.wintypes.HANDLE hNamedPipe:
A handle to the named pipe instance.
:keyword int lpMode:
The new pipe mode which is a combination of read mode:
* ``PIPE_READMODE_BYTE``
* ``PIPE_READMODE_MESSAGE``
And a wait-mode flag:
* ``PIPE_WAIT``
* ``PIPE_NOWAIT``
:keyword int lpMaxCollectionCount:
The maximum number of bytes collected.
:keyword int lpCollectDataTimeout:
The maximum time, in milliseconds, that can pass before a
remote named pipe transfers information
"""
input_check("hNamedPipe", hNamedPipe, HANDLE)
ffi, library = dist.load()
if lpMode is None:
lpMode = ffi.NULL
else:
input_check("lpMode", lpMode, integer_types)
lpMode = ffi.new("LPDWORD", lpMode)
if lpMaxCollectionCount is None:
lpMaxCollectionCount = ffi.NULL
else:
input_check(
"lpMaxCollectionCount", lpMaxCollectionCount, integer_types)
lpMaxCollectionCount = ffi.new("LPDWORD", lpMaxCollectionCount)
if lpCollectDataTimeout is None:
lpCollectDataTimeout = ffi.NULL
else:
input_check(
"lpCollectDataTimeout", lpCollectDataTimeout, integer_types)
lpCollectDataTimeout = ffi.new("LPDWORD", lpCollectDataTimeout)
code = library.SetNamedPipeHandleState(
wintype_to_cdata(hNamedPipe),
lpMode,
lpMaxCollectionCount,
lpCollectDataTimeout
)
error_check("SetNamedPipeHandleState", code=code, expected=NON_ZERO)
[docs]def PeekNamedPipe(hNamedPipe, nBufferSize):
"""
Copies data from a pipe into a buffer without removing it
from the pipe.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365779
:param pywincffi.wintypes.HANDLE hNamedPipe:
The handele to the pipe object we want to peek into.
:param int nBufferSize:
The number of bytes to 'peek' into the pipe.
:rtype: PeekNamedPipeResult
:return:
Returns an instance of :class:`PeekNamedPipeResult` which
contains the buffer read, number of bytes read and the result.
"""
input_check("hNamedPipe", hNamedPipe, HANDLE)
input_check("nBufferSize", nBufferSize, integer_types)
ffi, library = dist.load()
# Outputs
lpBuffer = ffi.new("LPVOID[%d]" % nBufferSize)
lpBytesRead = ffi.new("LPDWORD")
lpTotalBytesAvail = ffi.new("LPDWORD")
lpBytesLeftThisMessage = ffi.new("LPDWORD")
code = library.PeekNamedPipe(
wintype_to_cdata(hNamedPipe),
lpBuffer,
nBufferSize,
lpBytesRead,
lpTotalBytesAvail,
lpBytesLeftThisMessage
)
error_check("PeekNamedPipe", code=code, expected=NON_ZERO)
return PeekNamedPipeResult(
lpBuffer=lpBuffer,
lpBytesRead=lpBytesRead[0],
lpTotalBytesAvail=lpTotalBytesAvail[0],
lpBytesLeftThisMessage=lpBytesLeftThisMessage[0]
)