Source code for pywincffi.dev.testutil

"""
Test Utility
------------

This module is used by the unittests.
"""

import os
import socket
import subprocess
import sys
from random import choice
from string import ascii_lowercase, ascii_uppercase
from textwrap import dedent

from cffi import FFI
from mock import patch

try:
    # The setup.py file installs unittest2 for Python 2
    # which backports newer test framework features.
    from unittest2 import TestCase as _TestCase
except ImportError:  # pragma: no cover
    # pylint: disable=wrong-import-order
    from unittest import TestCase as _TestCase

from pywincffi.core import dist
from pywincffi.core.logger import get_logger

logger = get_logger("core.testutil")

# To keep lint on non-windows platforms happy.
try:
    WindowsError
except NameError:  # pragma: no cover
    WindowsError = OSError  # pylint: disable=redefined-builtin


[docs]class LibraryWrapper(object): # pylint: disable=too-few-public-methods """ Used by :meth:`TestCase.mock_library` to replace specific attributes on a compiled library. """ def __init__(self, library, attributes): self.library = library self.attributes = {} for attribute, value in attributes.items(): if not hasattr(library, attribute): raise AttributeError( "No such attribute %r on library" % attribute) self.attributes[attribute] = value def __getattr__(self, item): if item in self.attributes: return self.attributes[item] return getattr(self.library, item)
[docs]def mock_library(**attributes): """ Used to replace an attribute the library that :func:`dist.load` returns. Useful for replacing part of the compiled library as part of the test. """ ffi, library = dist.load() return patch.object( dist, "load", lambda: [ffi, LibraryWrapper(library, attributes)])
[docs]class SharedState(object): # pylint: disable=too-few-public-methods """ Contains some state data which is shared across multiple :class:`TestCase` instances. This is kept outside of the test case class itself so it can't be inadvertently modified by a test or fixture. """ HAS_INTERNET = None ffi = None kernel32 = None ws2_32 = None
[docs]class TestCase(_TestCase): # pylint: disable=too-many-public-methods """ A base class for all test cases. By default the core test case just provides some extra functionality. """ # A list of hosts and port to check for internet access on. If we fail # to reach any of the hosts in `INTERNET_HOSTS` on `INTERNET_PORT` then # `HAS_INTERNET` will be set to False. INTERNET_PORT = 80 INTERNET_HOSTS = ("github.com", "readthedocs.org", "example.com") REQUIRES_INTERNET = False HAS_INTERNET = None # Class level attributes used to access some specific Windows API # functions when testing. This is kept separate from what `dist.load()` # produces so problems in the build don't break parts of the base TestCase. ffi = None kernel32 = None ws2_32 = None
[docs] @classmethod def setUpClass(cls): # Reset everything back to the default values first. cls.ffi = None cls.kernel32 = None cls.ws2_32 = None cls.HAS_INTERNET = None # First run and this test case requires internet access. Determine # if we have access to the internet then cache the value. if cls.REQUIRES_INTERNET and SharedState.HAS_INTERNET is None: original_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(1) try: for hostname in cls.INTERNET_HOSTS: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((hostname, cls.INTERNET_PORT)) SharedState.HAS_INTERNET = True break # pylint: disable=broad-except except Exception: # pragma: no cover pass finally: sock.close() else: # pragma: no cover SharedState.HAS_INTERNET = False finally: socket.setdefaulttimeout(original_timeout) if os.name == "nt" and SharedState.ffi is None: try: ffi = FFI() ffi.set_unicode(True) ffi.cdef(dedent(""" // kernel32 functions DWORD GetLastError(void); void SetLastError(DWORD); // ws2_32 functions void WSASetLastError(int); int WSAGetLastError(void); """)) SharedState.ffi = ffi SharedState.kernel32 = ffi.dlopen("kernel32") SharedState.ws2_32 = ffi.dlopen("ws2_32") # pylint: disable=broad-except except Exception as error: # pragma: no cover if os.name == "nt": SharedState.ffi = error cls.HAS_INTERNET = SharedState.HAS_INTERNET cls.ffi = SharedState.ffi cls.kernel32 = SharedState.kernel32 cls.ws2_32 = SharedState.ws2_32
[docs] def setUp(self): # pragma: no cover if self.REQUIRES_INTERNET and not self.HAS_INTERNET: if os.environ.get("CI"): self.fail( "%s requires internet but we do not seem to be " "connected." % self.__class__.__name__) self.skipTest("Internet unavailable") if os.name != "nt": return if isinstance(self.ffi, Exception): self.fail("FFI module setup failed: %s" % self.ffi) self.assertIsNotNone( self.kernel32, "setUp() failed: missing kernel32") self.assertIsNotNone( self.ws2_32, "setUp() failed: missing ws2_32") self.addCleanup(self.unhandled_error_check) # Always reset the last error to 0 between tests. This ensures # that if an unhandled API error occurs it won't impact the # currently running test. The cleanup step above will ensure that # tests that do not exit cleanly will cause a failure. self.kernel32.SetLastError(0) self.ws2_32.WSASetLastError(0)
[docs] def GetLastError(self): # pylint: disable=invalid-name """ Returns a tuple containing output from the Windows GetLastError function and the associated error message. The error message will be None if GetLastError() returns 0. """ errno = self.kernel32.GetLastError() return errno, self.ffi.getwinerror(errno) if errno != 0 else None
[docs] def WSAGetLastError(self): # pylint: disable=invalid-name """ Returns a tuple containing output from the Windows WSAGetLastError function and the associated error message. The error message will be None if WSAGetLastError() returns 0. """ errno = self.ws2_32.WSAGetLastError() return errno, self.ffi.getwinerror(errno) if errno != 0 else None
[docs] def WSASetLastError(self, errno): # pylint: disable=invalid-name """Wrapper for WSASetLastError()""" self.ws2_32.WSASetLastError(errno)
[docs] def SetLastError(self, errno): # pylint: disable=invalid-name """Wrapper for SetLastError()""" self.kernel32.SetLastError(errno)
[docs] def unhandled_error_check(self): """ A cleanup step which ensures that there are not any uncaught API errors left over. Unhandled errors could be a sign of an unhandled testing artifact, improper API usage or other problem. In any case, unhandled errors are often a source of test flake. """ # Check for kernel32 errors. k32_errno, k32_message = self.GetLastError() self.assertEqual( k32_errno, 0, msg="Unhandled kernel32 error. Errno: %r. Message: %r" % ( k32_errno, k32_message)) # Check for ws2_32 errors. ws2_errno, ws2_message = self.WSAGetLastError() self.assertEqual( ws2_errno, 0, msg="Unhandled ws2_32 error. Errno: %r. Message: %r" % ( ws2_errno, ws2_message))
def _terminate_process(self, process): # pylint: disable=no-self-use """ Calls terminnate() on ``process`` and ignores any errors produced. """ try: process.terminate() # pylint: disable=broad-except except Exception: # pragma: no cover pass
[docs] def create_python_process(self, command): """Creates a Python process that run ``command``""" process = subprocess.Popen( [sys.executable, "-c", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE) self.addCleanup(self._terminate_process, process) return process
[docs] def random_string(self, length): """ Returns a random string as long as ``length``. The first character will always be a letter. All other characters will be A-F, A-F or 0-9. """ if length < 1: # pragma: no cover self.fail("Length must be at least 1.") # First character should always be a letter so the string # can be used in object names. output = choice(ascii_lowercase) length -= 1 while length: length -= 1 output += choice(ascii_lowercase + ascii_uppercase + "0123456789") return output
[docs] def assert_last_error(self, errno): """ This function will assert that the last unhandled error was ``errno``. After the check the last error will be reset to zero. :param int errno: The expected value from GetLastError. """ last_error, _ = self.GetLastError() self.assertEqual(last_error, errno) self.SetLastError(0)
[docs] def maybe_assert_last_error(self, errno): """ This function is similar to :meth:`assert_last_error` except it won't fail if the current error number is already 0. """ last_error, _ = self.GetLastError() self.assertIn(last_error, (0, errno)) self.SetLastError(0)