"""Test resource factory for the botocore library."""
from __future__ import absolute_import
import contextlib
import functools
import inspect
import logging
import weakref
import botocore
import botocore.client
import botocore.config
import botocore.credentials
import botocore.regions
import botocore.session
import pytest
from pytest_localstack import _make_session, constants, exceptions, hookspecs, utils
from pytest_localstack.utils import mock
logger = logging.getLogger(__name__)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_session(session):
"""Add :class:`BotocoreTestResourceFactory` to :class:`.LocalstackSession`."""
logger.debug("patching session %r", session)
session.botocore = BotocoreTestResourceFactory(session)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_module(pytest_localstack):
"""Add :func:`patch_fixture` to :mod:`pytest_localstack`."""
logger.debug("patching module %r", pytest_localstack)
pytest_localstack.patch_fixture = patch_fixture
[docs]class BotocoreTestResourceFactory(object):
"""Create botocore clients to interact with a :class:`.LocalstackSession`.
Args:
localstack_session (:class:`.LocalstackSession`):
The session that this factory should create test resources for.
"""
def __init__(self, localstack_session):
logger.debug("BotocoreTestResourceFactory.__init__")
self.localstack_session = localstack_session
self._default_session = None
[docs] def session(self, *args, **kwargs):
"""Create a botocore Session that will use Localstack.
Arguments are the same as :class:`botocore.session.Session`.
"""
return Session(self.localstack_session, *args, **kwargs)
[docs] def client(self, service_name, *args, **kwargs):
"""Create a botocore client that will use Localstack.
Arguments are the same as
:meth:`botocore.session.Session.create_client`.
"""
return self.default_session.create_client(service_name, *args, **kwargs)
@property
def default_session(self):
"""Return a default botocore Localstack Session.
Most applications only need one Session.
"""
if self._default_session is None:
self._default_session = self.session()
return self._default_session
[docs] @contextlib.contextmanager
def patch_botocore(self):
"""Context manager that will patch botocore to use Localstack.
Since boto3 relies on botocore to perform API calls, this method
also effectively patches boto3.
"""
logger.debug("enter patch")
try:
factory = self
patches = []
# Step 1: patch botocore Session to use Localstack.
attr = {}
@property
def localstack_session(self):
# Simlate the 'localstack_session' attr from Session class below.
# Patch this into the botocore Session class.
if "localstack_session" in self.__dict__:
# We're patching this into the base botocore Session,
# but we don't want to override things for the Session
# subclass below.
return self.__dict__["localstack_session"]
return factory.localstack_session
@localstack_session.setter
def localstack_session(self, value):
assert isinstance(self, Session)
self.__dict__["localstack_session"] = value
attr["localstack_session"] = localstack_session
@property
def _components(self):
if isinstance(self, Session):
try:
return self.__dict__["_components"]
except KeyError:
raise AttributeError("_components")
proxy_components = botocore.session.Session._proxy_components
if self not in proxy_components:
proxy_components[self] = botocore.session.ComponentLocator()
self._register_components()
return proxy_components[self]
@_components.setter
def _components(self, value):
self.__dict__["_components"] = value
attr.update(
{
"_components": _components,
"_internal_components": _components, # This probably isn't the best way to handle _internal_components, but w/e
"_proxy_components": weakref.WeakKeyDictionary(),
}
)
@property
def _credentials(self):
return self._proxy_credentials.get(self)
@_credentials.setter
def _credentials(self, value):
self._proxy_credentials[self] = value
attr.update(
{
"_credentials": _credentials,
"_proxy_credentials": weakref.WeakKeyDictionary(),
}
)
patches.append(
mock.patch.multiple("botocore.session.Session", create=True, **attr)
)
patches.append(
mock.patch.multiple(
botocore.session.Session,
_register_endpoint_resolver=utils.unbind(
Session._register_endpoint_resolver
),
_register_credential_provider=utils.unbind(
Session._register_credential_provider
),
create_client=utils.unbind(Session.create_client),
)
)
# Step 2: Safety checks
# Make absolutly sure we use Localstack and not AWS.
_original_convert_to_request_dict = (
botocore.client.BaseClient._convert_to_request_dict
)
@functools.wraps(_original_convert_to_request_dict)
def _convert_to_request_dict(self, *args, **kwargs):
request_dict = _original_convert_to_request_dict(self, *args, **kwargs)
assert factory.localstack_session.hostname in request_dict["url"]
return request_dict
patches.append(
mock.patch(
"botocore.client.BaseClient._convert_to_request_dict",
_convert_to_request_dict,
)
)
# Step 3: Patch existing clients
# Patching botocore Session doesn't help with an existing
# botocore Clients objects. They will have already been created with
# endpoints aimed at AWS. We need to patch botocore.client.BaseClient
# to temporarially act like a Localstack.
original_init = botocore.client.BaseClient.__init__
@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
# Every client created during the patch is a Localstack client.
# Set this flag so that the proxy_client_attr() stuff below
# won't break during original_init().
self._is_pytest_localstack = True
original_init(self, *args, **kwargs)
patches.append(
mock.patch.multiple(botocore.client.BaseClient, __init__=new_init)
)
# Create a place to store proxy clients.
patches.append(
mock.patch(
"botocore.client.BaseClient._proxy_clients",
weakref.WeakKeyDictionary(),
create=True,
)
)
def new_getattribute(self, key):
if key.startswith("__"):
return object.__getattribute__(self, key)
proxied_keys = [
"_cache",
"_client_config",
"_endpoint",
"_exceptions_factory",
"_exceptions",
"exceptions",
"_loader",
"_request_signer",
"_response_parser",
"_serializer",
"meta",
]
__dict__ = object.__getattribute__(self, "__dict__")
if (
__dict__.get("_is_pytest_localstack", False)
or key not in proxied_keys
):
# Don't proxy clients that are already Localstack clients
return object.__getattribute__(self, key)
if self not in botocore.client.BaseClient._proxy_clients:
try:
meta = __dict__["meta"]
except KeyError:
raise AttributeError("meta")
proxy = factory.default_session.create_client(
meta.service_model.service_name,
# config=config,
config=__dict__["_client_config"],
)
botocore.client.BaseClient._proxy_clients[self] = proxy
return object.__getattribute__(
botocore.client.BaseClient._proxy_clients[self], key
)
patches.append(
mock.patch(
"botocore.client.BaseClient.__getattribute__",
new_getattribute,
create=True,
)
)
with utils.nested(*patches):
yield
finally:
logger.debug("exit patch")
[docs]def patch_fixture(
scope="function",
services=None,
autouse=False,
docker_client=None,
region_name=constants.DEFAULT_AWS_REGION,
kinesis_error_probability=0.0,
dynamodb_error_probability=0.0,
container_log_level=logging.DEBUG,
localstack_verison="latest",
auto_remove=True,
pull_image=True,
container_name=None,
**kwargs
):
"""Create a pytest fixture that temporarially redirects all botocore
sessions and clients to a Localstack container.
This is not a fixture! It is a factory to create them.
The fixtures that are created by this function will run a Localstack
container and patch botocore to direct traffic there for the duration
of the tests.
Since boto3 uses botocore to send requests, boto3 will also be redirected.
Args:
scope (str, optional): The pytest scope which this fixture will use.
Defaults to :const:`"function"`.
services (list, dict, optional): One of
- A :class:`list` of AWS service names to start in the
Localstack container.
- A :class:`dict` of service names to the port they should run on.
Defaults to all services. Setting this
can reduce container startup time and therefore test time.
autouse (bool, optional): If :obj:`True`, automatically use this
fixture in applicable tests. Default: :obj:`False`
docker_client (:class:`~docker.client.DockerClient`, optional):
Docker client to run the Localstack container with.
Defaults to :func:`docker.client.from_env`.
region_name (str, optional): Region name to assume.
Each Localstack container acts like a single AWS region.
Defaults to :const:`"us-east-1"`.
kinesis_error_probability (float, optional): Decimal value between
0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors
into Kinesis API responses.
dynamodb_error_probability (float, optional): Decimal value
between 0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors into
DynamoDB API responses.
container_log_level (int, optional): The logging level to use
for Localstack container logs. Defaults to :data:`logging.DEBUG`.
localstack_verison (str, optional): The version of the Localstack
image to use. Defaults to :const:`"latest"`.
auto_remove (bool, optional): If :obj:`True`, delete the Localstack
container when it stops. Default: :obj:`True`
pull_image (bool, optional): If :obj:`True`, pull the Localstack
image before running it. Default: :obj:`True`
container_name (str, optional): The name for the Localstack
container. Defaults to a randomly generated id.
**kwargs: Additional kwargs will be passed to the
:class:`.LocalstackSession`.
Returns:
A :func:`pytest fixture <_pytest.fixtures.fixture>`.
"""
@pytest.fixture(scope=scope, autouse=autouse)
def _fixture():
with _make_session(
docker_client=docker_client,
services=services,
region_name=region_name,
kinesis_error_probability=kinesis_error_probability,
dynamodb_error_probability=dynamodb_error_probability,
container_log_level=container_log_level,
localstack_verison=localstack_verison,
auto_remove=auto_remove,
pull_image=pull_image,
container_name=container_name,
**kwargs
) as session:
with session.botocore.patch_botocore():
yield session
return _fixture
# Grab a reference here to avoid breaking things during patching.
_original_create_client = utils.unbind(botocore.session.Session.create_client)
[docs]class Session(botocore.session.Session):
"""A botocore Session subclass that talks to Localstack."""
def __init__(self, localstack_session, *args, **kwargs):
self.localstack_session = localstack_session
super(Session, self).__init__(*args, **kwargs)
def _register_endpoint_resolver(self):
def create_default_resolver():
loader = self.get_component("data_loader")
endpoints = loader.load_data("endpoints")
return LocalstackEndpointResolver(self.localstack_session, endpoints)
if constants.BOTOCORE_VERSION >= (1, 10, 58):
self._internal_components.lazy_register_component(
"endpoint_resolver", create_default_resolver
)
else:
self._components.lazy_register_component(
"endpoint_resolver", create_default_resolver
)
def _register_credential_provider(self):
self._components.lazy_register_component(
"credential_provider", create_credential_resolver
)
[docs] def create_client(self, *args, **kwargs):
"""Create a botocore client."""
# Localstack doesn't use the virtual host addressing style.
config = botocore.config.Config(s3={"addressing_style": "path"})
callargs = inspect.getcallargs(_original_create_client, self, *args, **kwargs)
if callargs.get("config"):
config = callargs["config"].merge(config)
callargs["config"] = config
client = _original_create_client(**callargs)
client._is_pytest_localstack = True
return client
[docs]def create_credential_resolver():
"""Create a credentials resolver for Localstack."""
env_provider = botocore.credentials.EnvProvider()
default = DefaultCredentialProvider()
resolver = botocore.credentials.CredentialResolver(
providers=[env_provider, default]
)
return resolver
[docs]class DefaultCredentialProvider(botocore.credentials.CredentialProvider):
"""Provide some default credentials for Localstack clients."""
METHOD = "localstack-default"
[docs] def load(self):
"""Return credentials."""
return botocore.credentials.Credentials(
access_key=constants.DEFAULT_AWS_ACCESS_KEY_ID,
secret_key=constants.DEFAULT_AWS_SECRET_ACCESS_KEY,
token=constants.DEFAULT_AWS_SESSION_TOKEN,
method=self.METHOD,
)
[docs]class LocalstackEndpointResolver(botocore.regions.EndpointResolver):
"""Resolve AWS service endpoints based on a LocalstackSession."""
def __init__(self, localstack_session, endpoints):
self.localstack_session = localstack_session
super(LocalstackEndpointResolver, self).__init__(endpoints)
@property
def valid_regions(self):
"""Return a list of regions we can resolve endpoints for."""
return set([self.localstack_session.region_name, "aws-global"])
[docs] def get_available_partitions(self):
"""List the partitions available to the endpoint resolver."""
return ["aws"]
[docs] def get_available_endpoints(
self, service_name, partition_name="aws", allow_non_regional=False
):
"""List the endpoint names of a particular partition."""
if partition_name != "aws":
raise exceptions.UnsupportedPartitionError(partition_name)
result = []
for partition in self._endpoint_data["partitions"]:
if partition["partition"] != "aws":
continue
services = partition["services"]
if service_name not in services:
continue
for endpoint_name in services[service_name]["endpoints"]:
if allow_non_regional or endpoint_name in self.valid_regions:
result.append(endpoint_name)
return result
[docs] def construct_endpoint(self, service_name, region_name=None):
"""Resolve an endpoint for a service and region combination."""
if region_name is None:
region_name = self.localstack_session.region_name
elif region_name not in self.valid_regions:
raise exceptions.RegionError(
region_name, self.localstack_session.region_name
)
for partition in self._endpoint_data["partitions"]:
if partition["partition"] != "aws":
continue
result = self._endpoint_for_partition(partition, service_name, region_name)
if result:
result["hostname"] = self.localstack_session.service_hostname(
service_name
)
result["protocols"] = (
result["protocols"] if self.localstack_session.use_ssl else ["http"]
)
if not self.localstack_session.use_ssl:
result.pop("sslCommonName", None)
result["dnsSuffix"] = self.localstack_session.hostname
return result