"""Test resource factory for the botocore library."""
import contextlib
import functools
import inspect
import logging
import socket
import weakref
import botocore
import botocore.client
import botocore.config
import botocore.credentials
import botocore.regions
import botocore.session
import pytest
from pytest_localstack import _make_session, constants, exceptions, hookspecs, utils
from pytest_localstack.utils import mock
try:
import boto3
except ImportError:
boto3 = None
logger = logging.getLogger(__name__)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_session(session):
"""Add :class:`BotocoreTestResourceFactory` to :class:`.LocalstackSession`."""
logger.debug("patching session %r", session)
session.botocore = BotocoreTestResourceFactory(session)
[docs]@hookspecs.pytest_localstack_hookimpl
def contribute_to_module(pytest_localstack):
"""Add :func:`patch_fixture` to :mod:`pytest_localstack`."""
logger.debug("patching module %r", pytest_localstack)
pytest_localstack.patch_fixture = patch_fixture
[docs]class BotocoreTestResourceFactory:
"""Create botocore clients to interact with a :class:`.LocalstackSession`.
Args:
localstack_session (:class:`.LocalstackSession`):
The session that this factory should create test resources for.
"""
def __init__(self, localstack_session):
logger.debug("BotocoreTestResourceFactory.__init__")
self.localstack_session = localstack_session
self._default_session = None
[docs] def session(self, *args, **kwargs):
"""Create a botocore Session that will use Localstack.
Arguments are the same as :class:`botocore.session.Session`.
"""
return Session(self.localstack_session, *args, **kwargs)
[docs] def client(self, service_name, *args, **kwargs):
"""Create a botocore client that will use Localstack.
Arguments are the same as
:meth:`botocore.session.Session.create_client`.
"""
return self.default_session.create_client(service_name, *args, **kwargs)
@property
def default_session(self):
"""Return a default botocore Localstack Session.
Most applications only need one Session.
"""
if self._default_session is None:
self._default_session = self.session()
return self._default_session
[docs] @contextlib.contextmanager
def patch_botocore(self):
"""Context manager that will patch botocore to use Localstack.
Since boto3 relies on botocore to perform API calls, this method
also effectively patches boto3.
"""
# Q: Why is this method so complicated?
# A: Because the most common usecase is something like this::
#
# >>> import boto3
# >>>
# >>> S3 = boto3.resource('s3')
# >>>
# >>> def do_stuff():
# >>> bucket = S3.Bucket('foobar')
# >>> bucket.create()
# ...
#
# The `S3` resource creates a botocore Client when the module
# is loaded. It's hard to patch existing Client instances since
# there isn't a good way to find them.
# You must add a descriptor to the Client class
# that overrides specific properties of the Client instances.
# TODO: Could we use use `gc.get_referrers()` to find instances?
logger.debug("enter patch")
if boto3 is not None:
preexisting_boto3_session = boto3.DEFAULT_SESSION
try:
factory = self
patches = []
# Step 1: patch botocore Session to use Localstack.
attr = {}
@property
def localstack_session(self):
# Simlate the 'localstack_session' attr from Session class below.
# Patch this into the botocore Session class.
if "localstack_session" in self.__dict__:
# We're patching this into the base botocore Session,
# but we don't want to override things for the Session
# subclass below.
return self.__dict__["localstack_session"]
return factory.localstack_session
@localstack_session.setter
def localstack_session(self, value):
assert isinstance(self, Session)
self.__dict__["localstack_session"] = value
attr["localstack_session"] = localstack_session
@property
def _components(self):
if isinstance(self, Session):
try:
return self.__dict__["_components"]
except KeyError:
raise AttributeError("_components")
proxy_components = botocore.session.Session._proxy_components
if self not in proxy_components:
proxy_components[self] = botocore.session.ComponentLocator()
self._register_components()
return proxy_components[self]
@_components.setter
def _components(self, value):
self.__dict__["_components"] = value
@property
def _internal_components(self):
if isinstance(self, Session):
try:
return self.__dict__["_internal_components"]
except KeyError:
raise AttributeError("_internal_components")
proxy_components = botocore.session.Session._proxy_components
if self not in proxy_components:
proxy_components[self] = DebugComponentLocator()
self._register_components()
return proxy_components[self]
@_internal_components.setter
def _internal_components(self, value):
self.__dict__["_internal_components"] = value
attr.update(
{
"_components": _components,
"_internal_components": _internal_components,
"_proxy_components": weakref.WeakKeyDictionary(),
}
)
@property
def _credentials(self):
return self._proxy_credentials.get(self)
@_credentials.setter
def _credentials(self, value):
self._proxy_credentials[self] = value
attr.update(
{
"_credentials": _credentials,
"_proxy_credentials": weakref.WeakKeyDictionary(),
}
)
patches.append(
mock.patch.multiple("botocore.session.Session", create=True, **attr)
)
patches.append(
mock.patch.multiple(
botocore.session.Session,
_register_endpoint_resolver=utils.unbind(
Session._register_endpoint_resolver
),
_register_credential_provider=utils.unbind(
Session._register_credential_provider
),
create_client=utils.unbind(Session.create_client),
)
)
# Step 2: Safety checks
# Make absolutly sure we use Localstack and not AWS.
_original_convert_to_request_dict = (
botocore.client.BaseClient._convert_to_request_dict
)
@functools.wraps(_original_convert_to_request_dict)
def _convert_to_request_dict(self, *args, **kwargs):
request_dict = _original_convert_to_request_dict(self, *args, **kwargs)
assert any(
(
factory.localstack_session.hostname in request_dict["url"],
socket.gethostname() in request_dict["url"],
)
)
return request_dict
patches.append(
mock.patch(
"botocore.client.BaseClient._convert_to_request_dict",
_convert_to_request_dict,
)
)
# Step 3: Patch existing clients
# Patching botocore Session doesn't help with an existing
# botocore Clients objects. They will have already been created with
# endpoints aimed at AWS. We need to patch botocore.client.BaseClient
# to temporarially act like a Localstack.
original_init = botocore.client.BaseClient.__init__
@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
# Every client created during the patch is a Localstack client.
# Set this flag so that the proxy_client_attr() stuff below
# won't break during original_init().
self._is_pytest_localstack = True
original_init(self, *args, **kwargs)
patches.append(
mock.patch.multiple(botocore.client.BaseClient, __init__=new_init)
)
# Create a place to store proxy clients.
patches.append(
mock.patch(
"botocore.client.BaseClient._proxy_clients",
weakref.WeakKeyDictionary(),
create=True,
)
)
def new_getattribute(self, key):
if key.startswith("__"):
return object.__getattribute__(self, key)
proxied_keys = [
"_cache",
"_client_config",
"_endpoint",
"_exceptions_factory",
"_exceptions",
"exceptions",
"_loader",
"_request_signer",
"_response_parser",
"_serializer",
"meta",
]
__dict__ = object.__getattribute__(self, "__dict__")
if (
__dict__.get("_is_pytest_localstack", False)
or key not in proxied_keys
):
# Don't proxy clients that are already Localstack clients
return object.__getattribute__(self, key)
if self not in botocore.client.BaseClient._proxy_clients:
try:
meta = __dict__["meta"]
except KeyError:
raise AttributeError("meta")
proxy = factory.default_session.create_client(
meta.service_model.service_name,
# config=config,
config=__dict__["_client_config"],
)
botocore.client.BaseClient._proxy_clients[self] = proxy
return object.__getattribute__(
botocore.client.BaseClient._proxy_clients[self], key
)
patches.append(
mock.patch(
"botocore.client.BaseClient.__getattribute__",
new_getattribute,
create=True,
)
)
# STS is sneaky and even after patching the endpoint it has a final custom check
# to see whether it should override with the global endpoint url... patch that too
patches.append(
mock.patch(
"botocore.args.ClientArgsCreator._should_set_global_sts_endpoint",
lambda *args, **kwargs: False,
)
)
with utils.nested(*patches):
yield
finally:
logger.debug("exit patch")
if boto3 is not None:
boto3.DEFAULT_SESSION = preexisting_boto3_session
[docs]def patch_fixture(
scope="function",
services=None,
autouse=False,
docker_client=None,
region_name=None,
kinesis_error_probability=0.0,
dynamodb_error_probability=0.0,
container_log_level=logging.DEBUG,
localstack_version="latest",
auto_remove=True,
pull_image=True,
container_name=None,
**kwargs
):
"""Create a pytest fixture that temporarially redirects all botocore
sessions and clients to a Localstack container.
This is not a fixture! It is a factory to create them.
The fixtures that are created by this function will run a Localstack
container and patch botocore to direct traffic there for the duration
of the tests.
Since boto3 uses botocore to send requests, boto3 will also be redirected.
Args:
scope (str, optional): The pytest scope which this fixture will use.
Defaults to :const:`"function"`.
services (list, dict, optional): One of
- A :class:`list` of AWS service names to start in the
Localstack container.
- A :class:`dict` of service names to the port they should run on.
Defaults to all services. Setting this
can reduce container startup time and therefore test time.
autouse (bool, optional): If :obj:`True`, automatically use this
fixture in applicable tests. Default: :obj:`False`
docker_client (:class:`~docker.client.DockerClient`, optional):
Docker client to run the Localstack container with.
Defaults to :func:`docker.client.from_env`.
region_name (str, optional): Region name to assume.
Each Localstack container acts like a single AWS region.
Defaults to :const:`"us-east-1"`.
kinesis_error_probability (float, optional): Decimal value between
0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors
into Kinesis API responses.
dynamodb_error_probability (float, optional): Decimal value
between 0.0 (default) and 1.0 to randomly inject
ProvisionedThroughputExceededException errors into
DynamoDB API responses.
container_log_level (int, optional): The logging level to use
for Localstack container logs. Defaults to :data:`logging.DEBUG`.
localstack_version (str, optional): The version of the Localstack
image to use. Defaults to :const:`"latest"`.
auto_remove (bool, optional): If :obj:`True`, delete the Localstack
container when it stops. Default: :obj:`True`
pull_image (bool, optional): If :obj:`True`, pull the Localstack
image before running it. Default: :obj:`True`
container_name (str, optional): The name for the Localstack
container. Defaults to a randomly generated id.
**kwargs: Additional kwargs will be passed to the
:class:`.LocalstackSession`.
Returns:
A :func:`pytest fixture <_pytest.fixtures.fixture>`.
"""
@pytest.fixture(scope=scope, autouse=autouse)
def _fixture(pytestconfig):
if not pytestconfig.pluginmanager.hasplugin("localstack"):
pytest.skip("skipping because localstack plugin isn't loaded")
with _make_session(
docker_client=docker_client,
services=services,
region_name=region_name,
kinesis_error_probability=kinesis_error_probability,
dynamodb_error_probability=dynamodb_error_probability,
container_log_level=container_log_level,
localstack_version=localstack_version,
auto_remove=auto_remove,
pull_image=pull_image,
container_name=container_name,
**kwargs
) as session:
with session.botocore.patch_botocore():
yield session
return _fixture
# Grab a reference here to avoid breaking things during patching.
_original_create_client = utils.unbind(botocore.session.Session.create_client)
[docs]class Session(botocore.session.Session):
"""A botocore Session subclass that talks to Localstack."""
def __init__(self, localstack_session, *args, **kwargs):
self.localstack_session = localstack_session
super(Session, self).__init__(*args, **kwargs)
def _register_endpoint_resolver(self):
def create_default_resolver():
loader = self.get_component("data_loader")
endpoints = loader.load_data("endpoints")
return LocalstackEndpointResolver(self.localstack_session, endpoints)
if constants.BOTOCORE_VERSION >= (1, 10, 58):
self._internal_components.lazy_register_component(
"endpoint_resolver", create_default_resolver
)
else:
self._components.lazy_register_component(
"endpoint_resolver", create_default_resolver
)
def _register_credential_provider(self):
self._components.lazy_register_component(
"credential_provider", create_credential_resolver
)
[docs] def create_client(self, *args, **kwargs):
"""Create a botocore client."""
# Localstack doesn't use the virtual host addressing style.
config = botocore.config.Config(s3={"addressing_style": "path"})
callargs = inspect.getcallargs(_original_create_client, self, *args, **kwargs)
if callargs.get("config"):
config = callargs["config"].merge(config)
callargs["config"] = config
with mock.patch(
"botocore.args.ClientArgsCreator._should_set_global_sts_endpoint",
lambda *args, **kwargs: False,
):
client = _original_create_client(**callargs)
client._is_pytest_localstack = True
return client
[docs]def create_credential_resolver():
"""Create a credentials resolver for Localstack."""
env_provider = botocore.credentials.EnvProvider()
default = DefaultCredentialProvider()
resolver = botocore.credentials.CredentialResolver(
providers=[env_provider, default]
)
return resolver
[docs]class DefaultCredentialProvider(botocore.credentials.CredentialProvider):
"""Provide some default credentials for Localstack clients."""
METHOD = "localstack-default"
[docs] def load(self):
"""Return credentials."""
return botocore.credentials.Credentials(
access_key=constants.DEFAULT_AWS_ACCESS_KEY_ID,
secret_key=constants.DEFAULT_AWS_SECRET_ACCESS_KEY,
token=constants.DEFAULT_AWS_SESSION_TOKEN,
method=self.METHOD,
)
[docs]class LocalstackEndpointResolver(botocore.regions.EndpointResolver):
"""Resolve AWS service endpoints based on a LocalstackSession."""
def __init__(self, localstack_session, endpoints):
self.localstack_session = localstack_session
super(LocalstackEndpointResolver, self).__init__(endpoints)
@property
def valid_regions(self):
"""Return a list of regions we can resolve endpoints for."""
return set([self.localstack_session.region_name, "aws-global"])
[docs] def get_available_partitions(self):
"""List the partitions available to the endpoint resolver."""
return ["aws"]
[docs] def get_available_endpoints(
self, service_name, partition_name="aws", allow_non_regional=False
):
"""List the endpoint names of a particular partition."""
if partition_name != "aws":
raise exceptions.UnsupportedPartitionError(partition_name)
result = []
for partition in self._endpoint_data["partitions"]:
if partition["partition"] != "aws":
continue
services = partition["services"]
if service_name not in services:
continue
for endpoint_name in services[service_name]["endpoints"]:
if allow_non_regional or endpoint_name in self.valid_regions:
result.append(endpoint_name)
return result
[docs] def construct_endpoint(self, service_name, region_name=None):
"""Resolve an endpoint for a service and region combination."""
if region_name is None:
region_name = self.localstack_session.region_name
elif region_name not in self.valid_regions:
raise exceptions.RegionError(
region_name, self.localstack_session.region_name
)
for partition in self._endpoint_data["partitions"]:
if partition["partition"] != "aws":
continue
result = self._endpoint_for_partition(partition, service_name, region_name)
if result:
result["hostname"] = self.localstack_session.service_hostname(
service_name
)
result["protocols"] = (
result["protocols"] if self.localstack_session.use_ssl else ["http"]
)
if not self.localstack_session.use_ssl:
result.pop("sslCommonName", None)
result["dnsSuffix"] = self.localstack_session.hostname
return result