"""Test resource factory for the botocore library."""
from __future__ import absolute_import
import contextlib
import functools
import inspect
import logging
import weakref
import botocore
import botocore.client
import botocore.config
import botocore.credentials
import botocore.regions
import botocore.session
import pytest
from pytest_localstack import (
_make_session,
constants,
exceptions,
hookspecs,
utils,
)
from pytest_localstack.utils import mock
logger = logging.getLogger(__name__)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_session(session):
"""Add :class:`BotocoreTestResourceFactory` to :class:`.LocalstackSession`."""
logger.debug('patching session %r', session)
session.botocore = BotocoreTestResourceFactory(session)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_module(pytest_localstack):
"""Add :func:`patch_fixture` to :mod:`pytest_localstack`."""
logger.debug('patching module %r', pytest_localstack)
pytest_localstack.patch_fixture = patch_fixture
[docs]class BotocoreTestResourceFactory(object):
"""Create botocore clients to interact with a :class:`.LocalstackSession`.
Args:
localstack_session (:class:`.LocalstackSession`):
The session that this factory should create test resources for.
"""
def __init__(self, localstack_session):
logger.debug('BotocoreTestResourceFactory.__init__')
self.localstack_session = localstack_session
self._default_session = None
[docs] def session(self, *args, **kwargs):
"""Create a botocore Session that will use Localstack.
Arguments are the same as :class:`botocore.session.Session`.
"""
return Session(self.localstack_session, *args, **kwargs)
[docs] def client(self, service_name, *args, **kwargs):
"""Create a botocore client that will use Localstack.
Arguments are the same as
:meth:`botocore.session.Session.create_client`.
"""
return self.default_session.create_client(
service_name,
*args,
**kwargs
)
@property
def default_session(self):
"""Return a default botocore Localstack Session.
Most applications only need one Session.
"""
if self._default_session is None:
self._default_session = self.session()
return self._default_session
[docs] @contextlib.contextmanager
def patch_botocore(self):
"""Context manager that will patch botocore to use Localstack.
Since boto3 relies on botocore to perform API calls, this method
also effectively patches boto3.
"""
logger.debug("enter patch")
try:
factory = self
patches = []
# Step 1: patch botocore Session to use Localstack.
@property
def localstack_session(self):
# Simlate the 'localstack_session' attr from Session class below.
# Patch this into the botocore Session class.
if 'localstack_session' in self.__dict__:
# We're patching this into the base botocore Session,
# but we don't want to override things for the Session
# subclass below.
return self.__dict__['localstack_session']
return factory.localstack_session
@localstack_session.setter
def localstack_session(self, value):
assert isinstance(self, Session)
self.__dict__['localstack_session'] = value
@property
def _components(self):
if isinstance(self, Session):
try:
return self.__dict__['_components']
except KeyError:
raise AttributeError('_components')
proxy_components = botocore.session.Session._proxy_components
if self not in proxy_components:
proxy_components[self] = botocore.session.ComponentLocator()
self._register_components()
return proxy_components[self]
@_components.setter
def _components(self, value):
self.__dict__['_components'] = value
@property
def _credentials(self):
return self._proxy_credentials.get(self)
@_credentials.setter
def _credentials(self, value):
self._proxy_credentials[self] = value
patches.append(mock.patch.multiple(
'botocore.session.Session',
localstack_session=localstack_session,
_proxy_components=weakref.WeakKeyDictionary(),
_proxy_credentials=weakref.WeakKeyDictionary(),
_credentials=_credentials,
_components=_components,
create=True,
))
patches.append(mock.patch.multiple(
botocore.session.Session,
_register_endpoint_resolver=utils.unbind(Session._register_endpoint_resolver),
_register_credential_provider=utils.unbind(Session._register_credential_provider),
create_client=utils.unbind(Session.create_client),
))
# Step 2: Safety checks
# Make absolutly sure we use Localstack and not AWS.
_original_convert_to_request_dict = botocore.client.BaseClient._convert_to_request_dict
@functools.wraps(_original_convert_to_request_dict)
def _convert_to_request_dict(self, *args, **kwargs):
request_dict = _original_convert_to_request_dict(self, *args, **kwargs)
assert factory.localstack_session.hostname in request_dict['url']
return request_dict
patches.append(mock.patch(
'botocore.client.BaseClient._convert_to_request_dict',
_convert_to_request_dict,
))
# Step 3: Patch existing clients
# Patching botocore Session doesn't help with an existing
# botocore Clients objects. They will have already been created with
# endpoints aimed at AWS. We need to patch botocore.client.BaseClient
# to temporarially act like a Localstack.
original_init = botocore.client.BaseClient.__init__
@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
# Every client created during the patch is a Localstack client.
# Set this flag so that the proxy_client_attr() stuff below
# won't break during original_init().
self._is_pytest_localstack = True
original_init(self, *args, **kwargs)
patches.append(mock.patch.multiple(
botocore.client.BaseClient,
__init__=new_init,
))
# Create a place to store proxy clients.
patches.append(mock.patch(
'botocore.client.BaseClient._proxy_clients',
weakref.WeakKeyDictionary(),
create=True,
))
def new_getattribute(self, key):
if key.startswith('__'):
return object.__getattribute__(self, key)
proxied_keys = [
'_cache',
'_client_config',
'_endpoint',
'_exceptions_factory',
'_exceptions',
'_loader',
'_request_signer',
'_response_parser',
'_serializer',
'meta',
]
__dict__ = object.__getattribute__(self, '__dict__')
if (__dict__.get('_is_pytest_localstack', False) or
key not in proxied_keys):
# Don't proxy clients that are already Localstack clients
return object.__getattribute__(self, key)
if self not in botocore.client.BaseClient._proxy_clients:
try:
meta = __dict__['meta']
except KeyError:
raise AttributeError('meta')
proxy = factory.default_session.create_client(
meta.service_model.service_name,
# config=config,
config=__dict__['_client_config'],
)
botocore.client.BaseClient._proxy_clients[self] = proxy
return object.__getattribute__(
botocore.client.BaseClient._proxy_clients[self],
key
)
patches.append(mock.patch(
'botocore.client.BaseClient.__getattribute__',
new_getattribute,
create=True,
))
with utils.nested(*patches):
yield
finally:
logger.debug("exit patch")
[docs]def patch_fixture(scope='function',
services=None,
autouse=False,
docker_client=None,
region_name=constants.DEFAULT_AWS_REGION,
kinesis_error_probability=0.0,
dynamodb_error_probability=0.0,
container_log_level=logging.DEBUG,
localstack_verison='latest',
auto_remove=True,
pull_image=True,
container_name=None,
**kwargs):
"""Create a pytest fixture that temporarially redirects all botocore
sessions and clients to a Localstack container.
This is not a fixture! It is a factory to create them.
The fixtures that are created by this function will run a Localstack
container and patch botocore to direct traffic there for the duration
of the tests.
Since boto3 uses botocore to send requests, boto3 will also be redirected.
Args:
scope (str, optional): The pytest scope which this fixture will use.
Defaults to :const:`"function"`.
services (list, dict, optional): One of
- A :class:`list` of AWS service names to start in the
Localstack container.
- A :class:`dict` of service names to the port they should run on.
Defaults to all services. Setting this
can reduce container startup time and therefore test time.
autouse (bool, optional): If :obj:`True`, automatically use this
fixture in applicable tests. Default: :obj:`False`
docker_client (:class:`~docker.client.DockerClient`, optional):
Docker client to run the Localstack container with.
Defaults to :func:`docker.client.from_env`.
region_name (str, optional): Region name to assume.
Each Localstack container acts like a single AWS region.
Defaults to :const:`"us-east-1"`.
kinesis_error_probability (float, optional): Decimal value between
0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors
into Kinesis API responses.
dynamodb_error_probability (float, optional): Decimal value
between 0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors into
DynamoDB API responses.
container_log_level (int, optional): The logging level to use
for Localstack container logs. Defaults to :data:`logging.DEBUG`.
localstack_verison (str, optional): The version of the Localstack
image to use. Defaults to :const:`"latest"`.
auto_remove (bool, optional): If :obj:`True`, delete the Localstack
container when it stops. Default: :obj:`True`
pull_image (bool, optional): If :obj:`True`, pull the Localstack
image before running it. Default: :obj:`True`
container_name (str, optional): The name for the Localstack
container. Defaults to a randomly generated id.
**kwargs: Additional kwargs will be passed to the
:class:`.LocalstackSession`.
Returns:
A :func:`pytest fixture <_pytest.fixtures.fixture>`.
"""
@pytest.fixture(scope=scope, autouse=autouse)
def _fixture():
with _make_session(
docker_client=docker_client,
services=services,
region_name=region_name,
kinesis_error_probability=kinesis_error_probability,
dynamodb_error_probability=dynamodb_error_probability,
container_log_level=container_log_level,
localstack_verison=localstack_verison,
auto_remove=auto_remove,
pull_image=pull_image,
container_name=container_name,
**kwargs) as session:
with session.botocore.patch_botocore():
yield session
return _fixture
# Grab a reference here to avoid breaking things during patching.
_original_create_client = utils.unbind(botocore.session.Session.create_client)
[docs]class Session(botocore.session.Session):
"""A botocore Session subclass that talks to Localstack."""
def __init__(self, localstack_session, *args, **kwargs):
self.localstack_session = localstack_session
super(Session, self).__init__(*args, **kwargs)
def _register_endpoint_resolver(self):
def create_default_resolver():
loader = self.get_component('data_loader')
endpoints = loader.load_data('endpoints')
return LocalstackEndpointResolver(
self.localstack_session,
endpoints,
)
self._components.lazy_register_component(
'endpoint_resolver', create_default_resolver)
def _register_credential_provider(self):
self._components.lazy_register_component(
'credential_provider',
create_credential_resolver
)
[docs] def create_client(self, *args, **kwargs):
"""Create a botocore client."""
# Localstack doesn't use the virtual host addressing style.
config = botocore.config.Config(s3={'addressing_style': 'path'})
callargs = inspect.getcallargs(_original_create_client, self, *args, **kwargs)
if callargs.get('config'):
config = callargs['config'].merge(config)
callargs['config'] = config
client = _original_create_client(**callargs)
client._is_pytest_localstack = True
return client
[docs]def create_credential_resolver():
"""Create a credentials resolver for Localstack."""
env_provider = botocore.credentials.EnvProvider()
default = DefaultCredentialProvider()
resolver = botocore.credentials.CredentialResolver(providers=[
env_provider,
default,
])
return resolver
[docs]class DefaultCredentialProvider(botocore.credentials.CredentialProvider):
"""Provide some default credentials for Localstack clients."""
METHOD = 'localstack-default'
[docs] def load(self):
"""Return credentials."""
return botocore.credentials.Credentials(
access_key=constants.DEFAULT_AWS_ACCESS_KEY_ID,
secret_key=constants.DEFAULT_AWS_SECRET_ACCESS_KEY,
token=constants.DEFAULT_AWS_SESSION_TOKEN,
method=self.METHOD,
)
[docs]class LocalstackEndpointResolver(botocore.regions.EndpointResolver):
"""Resolve AWS service endpoints based on a LocalstackSession."""
def __init__(self, localstack_session, endpoints):
self.localstack_session = localstack_session
super(LocalstackEndpointResolver, self).__init__(endpoints)
@property
def valid_regions(self):
"""Return a list of regions we can resolve endpoints for."""
return set([self.localstack_session.region_name, 'aws-global'])
[docs] def get_available_partitions(self):
"""List the partitions available to the endpoint resolver."""
return ['aws']
[docs] def get_available_endpoints(self, service_name, partition_name='aws',
allow_non_regional=False):
"""List the endpoint names of a particular partition."""
if partition_name != 'aws':
raise exceptions.UnsupportedPartitionError(partition_name)
result = []
for partition in self._endpoint_data['partitions']:
if partition['partition'] != 'aws':
continue
services = partition['services']
if service_name not in services:
continue
for endpoint_name in services[service_name]['endpoints']:
if allow_non_regional or \
endpoint_name in self.valid_regions:
result.append(endpoint_name)
return result
[docs] def construct_endpoint(self, service_name, region_name=None):
"""Resolve an endpoint for a service and region combination."""
if region_name is None:
region_name = self.localstack_session.region_name
elif region_name not in self.valid_regions:
raise exceptions.RegionError(
region_name,
self.localstack_session.region_name,
)
for partition in self._endpoint_data['partitions']:
if partition['partition'] != 'aws':
continue
result = self._endpoint_for_partition(
partition,
service_name,
region_name,
)
if result:
result['hostname'] = self.localstack_session.service_hostname(
service_name,
)
result['protocols'] = (
result['protocols'] if self.localstack_session.use_ssl
else ['http']
)
if not self.localstack_session.use_ssl:
result.pop('sslCommonName', None)
result['dnsSuffix'] = self.localstack_session.hostname
return result