"""
Files
-----
A module containing common Windows file functions.
"""
from collections import namedtuple
from six import integer_types
from pywincffi.core import dist
from pywincffi.core.checks import Enums, input_check, error_check, NoneType
from pywincffi.exceptions import WindowsAPIError
PeekNamedPipeResult = namedtuple(
"PeekNamedPipeResult",
("lpBuffer", "lpBytesRead", "lpTotalBytesAvail",
"lpBytesLeftThisMessage")
)
INVALID_HANDLE_VALUE = -1
[docs]def handle_from_file(python_file):
"""
Given a standard Python file object produce a Windows
handle object that be be used in Windows function calls.
:param file python_file:
The Python file object to convert to a Windows handle.
:return:
Returns a Windows handle object which is pointing at
the provided ``python_file`` object.
"""
_, library = dist.load()
input_check("python_file", python_file, Enums.PYFILE)
# WARNING:
# Be aware that passing in an invalid file descriptor
# number can crash Python. The input_check function
# above should handle this for us by checking to
# ensure the file descriptor is valid first.
return library.handle_from_fd(python_file.fileno())
[docs]def CreatePipe(nSize=0, lpPipeAttributes=None):
"""
Creates an anonymous pipe and returns the read and write handles.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365152
https://msdn.microsoft.com/en-us/library/aa379560
>>> from pywincffi.core import dist
>>> ffi, library = dist.load()
>>> lpPipeAttributes = ffi.new(
... "SECURITY_ATTRIBUTES[1]", [{
... "nLength": ffi.sizeof("SECURITY_ATTRIBUTES"),
... "bInheritHandle": True,
... "lpSecurityDescriptor": ffi.NULL
... }]
... )
>>> reader, writer = CreatePipe(lpPipeAttributes=lpPipeAttributes)
:keyword int nSize:
The size of the buffer in bytes. Passing in 0, which is the default
will cause the system to use the default buffer size.
:keyword lpPipeAttributes:
The security attributes to apply to the handle. By default
``NULL`` will be passed in meaning then handle we create
cannot be inherited. For more detailed information see the links
below.
:return:
Returns a tuple of handles containing the reader and writer
ends of the pipe that was created. The user of this function
is responsible for calling CloseHandle at some point.
"""
input_check("nSize", nSize, integer_types)
input_check("lpPipeAttributes", lpPipeAttributes, (NoneType, dict))
ffi, library = dist.load()
hReadPipe = ffi.new("PHANDLE")
hWritePipe = ffi.new("PHANDLE")
if lpPipeAttributes is None:
lpPipeAttributes = ffi.NULL
code = library.CreatePipe(hReadPipe, hWritePipe, lpPipeAttributes, nSize)
error_check("CreatePipe", code=code, expected=Enums.NON_ZERO)
return hReadPipe[0], hWritePipe[0]
[docs]def SetNamedPipeHandleState(
hNamedPipe,
lpMode=None, lpMaxCollectionCount=None, lpCollectDataTimeout=None):
"""
Sets the read and blocking mode of the specified ``hNamedPipe``.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365787
:param handle hNamedPipe:
A handle to the named pipe instance.
:keyword int lpMode:
The new pipe mode which is a combination of read mode:
* ``PIPE_READMODE_BYTE``
* ``PIPE_READMODE_MESSAGE``
And a wait-mode flag:
* ``PIPE_WAIT``
* ``PIPE_NOWAIT``
:keyword int lpMaxCollectionCount:
The maximum number of bytes collected.
:keyword int lpCollectDataTimeout:
The maximum time, in milliseconds, that can pass before a
remote named pipe transfers information
"""
input_check("hNamedPipe", hNamedPipe, Enums.HANDLE)
ffi, library = dist.load()
if lpMode is None:
lpMode = ffi.NULL
else:
input_check("lpMode", lpMode, integer_types)
lpMode = ffi.new("LPDWORD", lpMode)
if lpMaxCollectionCount is None:
lpMaxCollectionCount = ffi.NULL
else:
input_check(
"lpMaxCollectionCount", lpMaxCollectionCount, integer_types)
lpMaxCollectionCount = ffi.new("LPDWORD", lpMaxCollectionCount)
if lpCollectDataTimeout is None:
lpCollectDataTimeout = ffi.NULL
else:
input_check(
"lpCollectDataTimeout", lpCollectDataTimeout, integer_types)
lpCollectDataTimeout = ffi.new("LPDWORD", lpCollectDataTimeout)
code = library.SetNamedPipeHandleState(
hNamedPipe,
lpMode,
lpMaxCollectionCount,
lpCollectDataTimeout
)
error_check("SetNamedPipeHandleState", code=code, expected=Enums.NON_ZERO)
[docs]def PeekNamedPipe(hNamedPipe, nBufferSize):
"""
Copies data from a pipe into a buffer without removing it
from the pipe.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365779
:param handle hNamedPipe:
The handele to the pipe object we want to peek into.
:param int nBufferSize:
The number of bytes to 'peek' into the pipe.
:rtype: :class:`PeekNamedPipeResult`
:return:
Returns an instance of :class:`PeekNamedPipeResult` which
contains the buffer read, number of bytes read and the result.
"""
input_check("hNamedPipe", hNamedPipe, Enums.HANDLE)
input_check("nBufferSize", nBufferSize, integer_types)
ffi, library = dist.load()
# Outputs
lpBuffer = ffi.new("LPVOID[%d]" % nBufferSize)
lpBytesRead = ffi.new("LPDWORD")
lpTotalBytesAvail = ffi.new("LPDWORD")
lpBytesLeftThisMessage = ffi.new("LPDWORD")
code = library.PeekNamedPipe(
hNamedPipe,
lpBuffer,
nBufferSize,
lpBytesRead,
lpTotalBytesAvail,
lpBytesLeftThisMessage
)
error_check("PeekNamedPipe", code=code, expected=Enums.NON_ZERO)
return PeekNamedPipeResult(
lpBuffer=lpBuffer,
lpBytesRead=lpBytesRead[0],
lpTotalBytesAvail=lpTotalBytesAvail[0],
lpBytesLeftThisMessage=lpBytesLeftThisMessage[0]
)
[docs]def WriteFile(hFile, lpBuffer, lpOverlapped=None):
"""
Writes data to ``hFile`` which may be an I/O device for file.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365747
:param handle hFile:
The handle to write to
:type lpBuffer: bytes, string or unicode.
:param lpBuffer:
The data to be written to the file or device. We should be able
to convert this value to unicode.
:type lpOverlapped: None or OVERLAPPED
:param lpOverlapped:
None or a pointer to a ``OVERLAPPED`` structure. See Microsoft's
documentation for intended usage and below for an example of this
struct.
>>> from pywincffi.core import dist
>>> ffi, library = dist.load()
>>> hFile = None # normally, this would be a handle
>>> lpOverlapped = ffi.new(
... "OVERLAPPED[1]", [{
... "hEvent": hFile
... }]
... )
>>> bytes_written = WriteFile(
... hFile, "Hello world", lpOverlapped=lpOverlapped)
:returns:
Returns the number of bytes written
"""
ffi, library = dist.load()
if lpOverlapped is None:
lpOverlapped = ffi.NULL
input_check("hFile", hFile, Enums.HANDLE)
input_check("lpBuffer", lpBuffer, Enums.UTF8)
input_check("lpOverlapped", lpOverlapped, Enums.OVERLAPPED)
# Prepare string and outputs
nNumberOfBytesToWrite = len(lpBuffer)
lpBuffer = ffi.new("wchar_t[%d]" % nNumberOfBytesToWrite, lpBuffer)
bytes_written = ffi.new("LPDWORD")
code = library.WriteFile(
hFile, lpBuffer, ffi.sizeof(lpBuffer), bytes_written, lpOverlapped)
error_check("WriteFile", code=code, expected=Enums.NON_ZERO)
return bytes_written[0]
[docs]def ReadFile(hFile, nNumberOfBytesToRead, lpOverlapped=None):
"""
Read the specified number of bytes from ``hFile``.
.. seealso::
https://msdn.microsoft.com/en-us/library/aa365467
:param handle hFile:
The handle to read from
:param int nNumberOfBytesToRead:
The number of bytes to read from ``hFile``
:type lpOverlapped: None or OVERLAPPED
:param lpOverlapped:
None or a pointer to a ``OVERLAPPED`` structure. See Microsoft's
documentation for intended usage and below for an example of this
struct.
>>> from pywincffi.core import dist
>>> ffi, library = dist.load()
>>> hFile = None # normally, this would be a handle
>>> lpOverlapped = ffi.new(
... "OVERLAPPED[1]", [{
... "hEvent": hFile
... }]
... )
>>> read_data = ReadFile( # read 12 bytes from hFile
... hFile, 12, lpOverlapped=lpOverlapped)
:returns:
Returns the data read from ``hFile``
"""
ffi, library = dist.load()
if lpOverlapped is None:
lpOverlapped = ffi.NULL
input_check("hFile", hFile, Enums.HANDLE)
input_check("nNumberOfBytesToRead", nNumberOfBytesToRead, integer_types)
input_check("lpOverlapped", lpOverlapped, Enums.OVERLAPPED)
lpBuffer = ffi.new("wchar_t[%d]" % nNumberOfBytesToRead)
bytes_read = ffi.new("LPDWORD")
code = library.ReadFile(
hFile, lpBuffer, ffi.sizeof(lpBuffer), bytes_read, lpOverlapped
)
error_check("ReadFile", code=code, expected=Enums.NON_ZERO)
return ffi.string(lpBuffer)
[docs]def CloseHandle(hObject):
"""
Closes an open object handle.
.. seealso::
https://msdn.microsoft.com/en-us/library/ms724211
:param handle hObject:
The handle object to close.
"""
input_check("hObject", hObject, Enums.HANDLE)
_, library = dist.load()
code = library.CloseHandle(hObject)
error_check("CloseHandle", code=code, expected=Enums.NON_ZERO)
[docs]def GetStdHandle(nStdHandle):
"""
Retrieves a handle to the specified standard
device (standard input, standard output, or standard error).
.. seealso::
https://msdn.microsoft.com/en-us/library/ms683231
:param int nStdHandle:
The standard device to retrieve
:rtype: handle
:return:
Returns a handle to the standard device retrieved.
"""
_, library = dist.load()
input_check("nStdHandle", nStdHandle,
allowed_values=(library.STD_INPUT_HANDLE,
library.STD_OUTPUT_HANDLE,
library.STD_ERROR_HANDLE))
handle = library.GetStdHandle(nStdHandle)
if handle == INVALID_HANDLE_VALUE: # pragma: no cover
raise WindowsAPIError(
"GetStdHandle", "Invalid Handle", INVALID_HANDLE_VALUE,
"not %s" % INVALID_HANDLE_VALUE)
return handle