# This file is a part of the AnyBlok project
#
# Copyright (C) 2014 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2018 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2018 Georges RACINET <gracinet@anybox.fr>
# Copyright (C) 2019 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
# Copyright (C) 2021 Jean-Sebastien SUZANNE <js.suzanne@gmail.com>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
"""Base classes for unit/integration tests with anyblok.
This module provides :class:`BlokTestCase`, which is the main one meant for
blok tests, and :class:`DBTestCase`, whose primary purpose is to test anyblok
itself, in so-called "framework tests".
"""
import os
import unittest
from contextlib import contextmanager
from copy import copy
from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING, getLogger
import sqlalchemy
from sqlalchemy import event
from sqlalchemy.orm import clear_mappers
from sqlalchemy_utils.functions import create_database, database_exists, orm
from testfixtures import LogCapture as LC
from anyblok import (
configuration_post_load,
load_init_function_from_entry_points,
)
from anyblok.blok import BlokManager
from anyblok.config import Configuration, get_url
from anyblok.environment import EnvironmentManager
from anyblok.registry import RegistryManager
from .common import DATABASES_CACHED
from .common import sgdb_in as sgdb_in_
logger = getLogger(__name__)
class MockParser:
def _get_kwargs(self):
return []
def _get_args(self):
return False
def load_configuration():
load_init_function_from_entry_points(unittest=True)
Configuration.load_config_for_test()
Configuration.parse_options(MockParser())
configuration_post_load(unittest=True)
def drop_database(url):
url = copy(sqlalchemy.engine.url.make_url(url))
database = url.database
if url.drivername.startswith("postgresql"):
url.database = "postgres"
elif not url.drivername.startswith("sqlite"):
url.database = None
engine = sqlalchemy.create_engine(url)
if engine.dialect.name == "sqlite" and url.database != ":memory:":
os.remove(url.database)
else:
text = "DROP DATABASE {0}".format(orm.quote(engine, database))
cnx = engine.connect()
cnx.execute("ROLLBACK")
cnx.execute(text)
cnx.execute("commit")
cnx.close()
@contextmanager
def tmp_configuration(**values):
"""Add Configuration value only in the contextmanager
::
with TestCase.Configuration(db_name='a db name'):
assert Configuration.get('db_name') == 'a db name'
:param **values: values to update
"""
old_values = {x: Configuration.get(x) for x in values.keys()}
Configuration.update(**values)
try:
yield
finally:
Configuration.update(**old_values)
[docs]class TestCase(unittest.TestCase):
"""Common helpers, not meant to be used directly."""
_transaction_case_teared_down = False
[docs] @classmethod
def init_configuration_manager(cls, **env):
"""Initialise the configuration manager with environ variable
to launch the test
.. warning::
For the moment we not use the environ variable juste constante
:param env: add another dict to merge with environ variable
"""
db_name = Configuration.get("db_name") or "test_anyblok"
db_driver_name = Configuration.get("db_driver_name") or "postgresql"
env.update(
{
"db_name": db_name,
"db_driver_name": db_driver_name,
}
)
Configuration.update(**env)
[docs] @classmethod
def createdb(cls, keep_existing=False, with_demo=False):
"""Create the database specified in configuration.
::
cls.init_configuration_manager()
cls.createdb()
:param keep_existing: If false drop the previous db before create it
"""
url = get_url()
db_template_name = Configuration.get("db_template_name", None)
if database_exists(url):
if keep_existing:
return False
drop_database(url)
create_database(url, template=db_template_name)
registry = RegistryManager.get(Configuration.get("db_name", None))
if registry is None:
return
registry.System.Parameter.set(
"with-demo", Configuration.get("with_demo", with_demo)
)
return True
[docs] @classmethod
def dropdb(cls):
"""Drop the database specified in configuration.
::
cls.init_configuration_manager()
cls.dropdb()
"""
url = get_url()
if database_exists(url):
drop_database(url)
[docs] @classmethod
def additional_setting(cls):
return dict(unittest=True)
[docs] @classmethod
def getRegistry(cls):
"""Return the registry for the test database.
This assumes the database is created, and the registry has already
been initialized::
registry = self.getRegistry()
:rtype: registry instance
"""
additional_setting = cls.additional_setting()
return RegistryManager.get(
Configuration.get("db_name"), **additional_setting
)
[docs] def setUp(self):
super(TestCase, self).setUp()
self.addCleanup(self.callCleanUp)
[docs] def callCleanUp(self):
if not self._transaction_case_teared_down:
self.cleanUp()
[docs] def cleanUp(self):
self.tearDown()
[docs] def tearDown(self):
"""Roll back the session"""
super(TestCase, self).tearDown()
self._transaction_case_teared_down = True
Configuration = tmp_configuration
[docs]class DBTestCase(TestCase):
"""Base class for tests that need to work on an empty database.
.. warning::
The database is created and dropped with each unit test
For instance, this is the one used for Field, Column, RelationShip, and
more generally core framework tests.
The drawback of using this base class is that tests will be slow. The
advantage is ultimate test isolation.
Sample usage::
from anyblok.tests.testcase import DBTestCase
def simple_column(ColumnType=None, **kwargs):
@Declarations.register(Declarations.Model)
class Test:
id = Declarations.Column.Integer(primary_key=True)
col = ColumnType(**kwargs)
class TestColumns(DBTestCase):
def test_integer(self):
Integer = Declarations.Column.Integer
registry = self.init_registry(simple_column,
ColumnType=Integer)
test = registry.Test.insert(col=1)
self.assertEqual(test.col, 1)
"""
blok_entry_points = ("bloks",)
"""setuptools entry points to load blok """
[docs] @classmethod
def setUpClass(cls):
"""Intialialise the configuration manager"""
super(DBTestCase, cls).setUpClass()
cls.init_configuration_manager()
if cls.createdb(keep_existing=True):
BlokManager.load(entry_points=("bloks", "test_bloks"))
registry = cls.getRegistry()
registry.commit()
registry.close()
BlokManager.unload()
[docs] def setUp(self):
"""Create a database and load the blok manager"""
self.registry = None
super(DBTestCase, self).setUp()
BlokManager.load(entry_points=self.blok_entry_points)
[docs] def tearDown(self):
"""Clear the registry, unload the blok manager and drop the database"""
if self.registry:
self.registry.close()
RegistryManager.clear()
BlokManager.unload()
clear_mappers()
super(DBTestCase, self).tearDown()
[docs] def init_registry(self, function, **kwargs):
"""call a function to filled the blok manager with new model
:param function: function to call
:param kwargs: kwargs for the function
:rtype: registry instance
"""
return self.init_registry_with_bloks(None, function, **kwargs)
[docs] def init_registry_with_bloks(self, bloks, function, **kwargs):
"""call a function to filled the blok manager with new model and
bloks to install
:param bloks: list of blok's names
:param function: function to call
:param kwargs: kwargs for the function
:rtype: registry instance
"""
if bloks is None:
bloks = []
if isinstance(bloks, tuple):
bloks = list(bloks)
if "anyblok-test" not in bloks:
bloks.append("anyblok-test")
from copy import deepcopy
loaded_bloks = deepcopy(RegistryManager.loaded_bloks)
if function is not None:
EnvironmentManager.set("current_blok", "anyblok-test")
try:
function(**kwargs)
finally:
EnvironmentManager.set("current_blok", None)
try:
self.registry = registry = self.__class__.getRegistry()
if bloks:
registry.upgrade(install=bloks)
finally:
RegistryManager.loaded_bloks = loaded_bloks
return registry
[docs] def reload_registry(self, registry, function, **kwargs):
"""call a function to filled the blok manager with new model and
before reload the registry
:param bloks: list of blok's names
:param function: function to call
:param kwargs: kwargs for the function
:rtype: registry instance
"""
from copy import deepcopy
loaded_bloks = deepcopy(RegistryManager.loaded_bloks)
if function is not None:
EnvironmentManager.set("current_blok", "anyblok-test")
try:
function(**kwargs)
finally:
EnvironmentManager.set("current_blok", None)
try:
registry.reload()
finally:
RegistryManager.loaded_bloks = loaded_bloks
return registry
[docs]class BlokTestCase(unittest.TestCase):
"""Base class for tests meant to run on a preinstalled database.
The tests written with this class don't need to start afresh on a new
database, and therefore run much faster than those inheriting
:class:`DBTestCase`. Instead, they expect the tested bloks to be already
installed and up to date.
The session gets rollbacked after each test.
Such tests are appropriate for a typical blok developper workflow:
* create and install the bloks once
* run the tests of the blok under development repeatedly
* upgrade the bloks in database when needed (schema change, update of
dependencies)
They are also appropriate for on the fly testing while installing the
bloks: the tests of each blok get run on the database state they expect,
before dependent (downstream) bloks, that could. e.g., alter the database
schema, get themselves installed.
This is useful to test a whole stack at once using only one
database (typically in CI bots).
Sample usage::
from anyblok.tests.testcase import BlokTestCase
class MyBlokTest(BlokTestCase):
def test_1(self):
# access to the registry by ``self.registry``
...
"""
_transaction_case_teared_down = False
registry = None
"""The instance of :class:`anyblok.registry.Registry`` to use in tests.
The ``session_commit()`` method is disabled to avoid side effects from one
test to the other.
"""
[docs] @classmethod
def additional_setting(cls):
return dict(unittest=True)
[docs] @classmethod
def setUpClass(cls):
"""Initialize the registry."""
super(BlokTestCase, cls).setUpClass()
additional_setting = cls.additional_setting()
if cls.registry is None:
if len(BlokManager.list()) == 0:
BlokManager.load()
cls.registry = RegistryManager.get(
Configuration.get("db_name"), **additional_setting
)
cls.registry.commit()
[docs] def setUp(self):
super(BlokTestCase, self).setUp()
self.addCleanup(self.callCleanUp)
self.registry.begin_nested() # add SAVEPOINT
@event.listens_for(self.registry.session, "after_transaction_end")
def restart_savepoint(session, transaction):
# TODO gracinet: while working on SharedDataTestCase
# I noticed that this could keep listening long after everything
# has been teared down. See SharedDataTestCase for unregistration
# example.
if transaction.nested and not transaction._parent.nested:
session.expire_all()
session.begin_nested()
[docs] def callCleanUp(self):
if not self._transaction_case_teared_down:
self.cleanUp()
[docs] def cleanUp(self):
self.tearDown()
[docs] def tearDown(self):
"""Roll back the session"""
super(BlokTestCase, self).tearDown()
try:
self.registry.System.Cache.invalidate_all()
except sqlalchemy.exc.InvalidRequestError:
pass
finally:
self.registry.rollback()
self.registry.session.close()
self._transaction_case_teared_down = True
class SharedDataTestCase(BlokTestCase):
@classmethod
def setUpClass(cls):
super(SharedDataTestCase, cls).setUpClass()
cls.pre_data_savepoint = cls.registry.begin_nested()
try:
cls.setUpSharedData()
except Exception:
cls.tearDownClass()
raise
@classmethod
def setUpSharedData(cls):
"""To be implemented by concrete test classes."""
@classmethod
def make_case_savepoint(cls, session=None):
if session is None:
session = cls.registry
cls.case_savepoint = session.begin_nested()
def setUp(self):
# we don't want to execute BlokTestCase.setUp(), only its parent's:
super(BlokTestCase, self).setUp()
# tearDown is not called in case of errors in setUp, but these are:
self.addCleanup(self.callCleanUp)
self.make_case_savepoint()
@event.listens_for(self.registry.session, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction is self.case_savepoint:
session.expire_all()
self.make_case_savepoint()
self.savepoint_restarter = restart_savepoint
@classmethod
def tearDownClass(cls):
cls.pre_data_savepoint.rollback()
super(SharedDataTestCase, cls).tearDownClass()
def tearDown(self):
"""Roll back the session"""
super(BlokTestCase, self).tearDown()
self.case_savepoint.rollback()
self.registry.System.Cache.invalidate_all()
event.remove(
self.registry.session,
"after_transaction_end",
self.savepoint_restarter,
)
self._transaction_case_teared_down = True
[docs]class LogCapture(LC):
"""Overwrite ``testfixtures.LogCapture`` to add some helper methods"""
[docs] def get_messages(self, *levels):
"""Return the captured messages
::
with LogCapture() as logs:
messages = logs.get_messages(INFO, WARNING)
:param *levels: list of logging.level
:rtype: list of formated message
"""
return [
self.format(r)
for r in self.records
if (not levels or r.levelno in levels)
]
[docs] def get_debug_messages(self):
"""Return only the logging.DEBUG messages"""
return self.get_messages(DEBUG)
[docs] def get_info_messages(self):
"""Return only the logging.INFO messages"""
return self.get_messages(INFO)
[docs] def get_warning_messages(self):
"""Return only the logging.WARNING messages"""
return self.get_messages(WARNING)
[docs] def get_error_messages(self):
"""Return only the logging.ERROR messages"""
return self.get_messages(ERROR)
[docs] def get_critical_messages(self):
"""Return only the logging.CRITICAL messages"""
return self.get_messages(CRITICAL)
def skip_unless_bloks_installed(*bloks):
"""A decorator to skip a test if some Bloks aren't installed.
In cases of soft dependency between Bloks (i.e., the dependent Blok
has some of its features behaving differently if another Blok is installed
without actually requiring it), it's useful to write tests that will
be executed only if some Bloks are installed.
Here's an example taken from Anyblok / Wms Base,
where the ``wms-inventory`` Blok wants to take Reservation
into account if it's installed yet doesn't
want to introduce a hard requirement onto ``wms-reservation``
@skip_unless_bloks_installed('wms-reservation')
def test_choose_affected_with_reserved(self):
# this test can now safely assume that wms-reservation is installed
"""
def bloks_decorator(testmethod):
def wrapped(self):
Blok = self.registry.System.Blok
for blok_name in bloks:
blok = Blok.query().get(blok_name)
if blok.state != "installed":
raise unittest.SkipTest(
"Blok %r is not installed" % blok_name
)
return testmethod(self)
# necessary not to be ignored by test runner
wrapped.__name__ = testmethod.__name__
return wrapped
return bloks_decorator
def sgdb_in(databases):
if not DATABASES_CACHED:
load_configuration()
url = get_url(db_name="")
engine = sqlalchemy.create_engine(url)
return sgdb_in_(engine, databases)