# -*- coding: utf-8 -*-
# This file is a part of the AnyBlok project
#
# Copyright (C) 2014 Jean-Sebastien SUZANNE <jssuzanne@anybox.fr>
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file,You can
# obtain one at http://mozilla.org/MPL/2.0/.
from pkg_resources import iter_entry_points
from anyblok.imp import ImportManager
from .logging import log
from anyblok.environment import EnvironmentManager
from time import sleep
from sys import modules
from os.path import dirname
from logging import getLogger
from os.path import join
from datetime import datetime
logger = getLogger(__name__)
[docs]class BlokManagerException(LookupError):
""" Simple exception to BlokManager """
def __init__(self, *args, **kwargs):
EnvironmentManager.set('current_blok', None)
super(BlokManagerException, self).__init__(*args, **kwargs)
[docs]class BlokManager:
""" Manage the bloks for one process
A blok has a `setuptools` entrypoint, this entry point is defined
by the ``entry_points`` attribute in the first load
The ``bloks`` attribute is a dict with all the loaded entry points
Use this class to import all the bloks in the entrypoint::
BlokManager.load()
"""
bloks = {}
entry_points = None
ordered_bloks = []
auto_install = []
importers = {}
[docs] @classmethod
def list(cls):
""" Return the ordered bloks
:rtype: list of blok name ordered by loading
"""
return cls.ordered_bloks
[docs] @classmethod
def has(cls, blok):
""" Return True if the blok is loaded
:param blok: blok name
:rtype: bool
"""
return blok and blok in cls.ordered_bloks or False
[docs] @classmethod
def get(cls, blok):
""" Return the loaded blok
:param blok: blok name
:rtype: blok instance
:exception: BlokManagerException
"""
if not cls.has(blok):
raise BlokManagerException('%r not found' % blok)
return cls.bloks[blok]
[docs] @classmethod
def set(cls, blokname, blok):
""" Add a new blok
:param blokname: blok name
:param blok: blok instance
:exception: BlokManagerException
"""
if cls.has(blokname):
raise BlokManagerException('%r already present' % blokname)
cls.bloks[blokname] = blok
cls.ordered_bloks.append(blokname)
[docs] @classmethod
@log(logger, level='debug')
def reload(cls):
""" Reload the entry points
Empty the ``bloks`` dict and use the ``entry_points`` attribute to
load bloks
:exception: BlokManagerException
"""
if cls.entry_points is None:
raise BlokManagerException(
"""You must use the ``load`` classmethod before using """
"""``reload``""")
entry_points = []
entry_points += cls.entry_points
cls.unload()
cls.load(entry_points=entry_points)
[docs] @classmethod
@log(logger, level='debug')
def unload(cls):
""" Unload all the bloks but not the registry """
cls.bloks = {}
cls.ordered_bloks = []
cls.entry_points = None
cls.auto_install = []
from .registry import RegistryManager
RegistryManager.unload()
@classmethod
def get_need_blok_linked_bloks(cls, blok):
for required in cls.bloks[blok].required:
if not cls.get_need_blok(required):
raise BlokManagerException(
"Not %s required bloks found" % required)
cls.bloks[required].required_by.append(blok)
for optional in cls.bloks[blok].optional:
if cls.get_need_blok(optional):
cls.bloks[optional].optional_by.append(blok)
for conditional in cls.bloks[blok].conditional:
cls.bloks[conditional].conditional_by.append(blok)
for conflicting in cls.bloks[blok].conflicting:
cls.bloks[conflicting].conflicting_by.append(blok)
@classmethod
def get_need_blok(cls, blok):
if cls.has(blok):
return True
if blok not in cls.bloks:
return False
cls.get_need_blok_linked_bloks(blok)
cls.ordered_bloks.append(blok)
EnvironmentManager.set('current_blok', blok)
if not ImportManager.has(blok):
# Import only if not exist don't reload here
mod = ImportManager.add(blok)
mod.imports()
else:
mod = ImportManager.get(blok)
mod.reload()
if cls.bloks[blok].autoinstall:
cls.auto_install.append(blok)
return True
[docs] @classmethod
@log(logger, level='debug')
def load(cls, entry_points=('bloks',)):
""" Load all the bloks and import them
:param entry_points: Use by ``iter_entry_points`` to get the blok
:exception: BlokManagerException
"""
if not entry_points:
raise BlokManagerException("The entry_points mustn't be empty")
cls.entry_points = entry_points
if EnvironmentManager.get('current_blok'):
while EnvironmentManager.get('current_blok'):
sleep(0.1)
EnvironmentManager.set('current_blok', 'start')
bloks = []
for entry_point in entry_points:
count = 0
for i in iter_entry_points(entry_point):
count += 1
blok = i.load()
blok.required_by = []
blok.optional_by = []
blok.conditional_by = []
blok.conflicting_by = []
cls.set(i.name, blok)
blok.name = i.name
bloks.append((blok.priority, i.name))
if not count:
raise BlokManagerException(
"Invalid bloks group %r" % entry_point)
# Empty the ordered blok to reload it depending on the priority
cls.ordered_bloks = []
bloks.sort()
try:
while bloks:
blok = bloks.pop(0)[1]
cls.get_need_blok(blok)
finally:
EnvironmentManager.set('current_blok', None)
[docs] @classmethod
def getPath(cls, blok):
""" Return the path of the blok
:param blok: blok name in ``ordered_bloks``
:rtype: absolute path
"""
blok = cls.get(blok)
return dirname(modules[blok.__module__].__file__)
[docs] @classmethod
def add_importer(cls, key, cls_name):
""" Add a new importer
:param key: key of the importer
:param cls_name: name of the model to import
"""
cls.importers[key] = cls_name
[docs] @classmethod
def has_importer(cls, key):
""" Check if an importer """
return True if key in cls.importers else False
[docs] @classmethod
def get_importer(cls, key):
""" Get the importer class name
:param key: key of the importer
:rtype: name of the model to import
:exception: BlokManagerException
"""
if not cls.has_importer(key):
raise BlokManagerException(
"No importer found for the key %r" % key)
return cls.importers[key]
[docs]class Blok:
""" Super class for all the bloks
define the default value for:
* priority: order to load the blok
* required: list of the bloks needed to install this blok
* optional: list of the bloks to be installed if present in the blok list
* conditional: if all the bloks of this list are installed then install
this blok
"""
autoinstall = False
priority = 100
required = []
optional = []
conditional = []
conflicting = []
name = None # filled by the BlokManager
author = ''
logo = ''
def __init__(self, registry):
self.registry = registry
[docs] @classmethod
def import_declaration_module(cls):
""" Do the python import for the Declaration of the model or other
"""
[docs] def update(self, latest_version):
""" Call at the installation or update
:param latest_version: latest version installed, if the blok have not
been installing the latest_version will be None
"""
[docs] def pre_migration(self, latest_version):
"""Call at update, before the automigration
.. warning::
You can not use the ORM
:param latest_version: latest version installed, if the blok have not
been installing the latest_version will be None
"""
[docs] def post_migration(self, latest_version):
"""Call at update, after the automigration
:param latest_version: latest version installed, if the blok have not
been installing the latest_version will be None
"""
[docs] def uninstall(self):
""" Call at the uninstallation
"""
[docs] def load(self):
""" Call at the launch of the application
"""
[docs] def import_file(self, importer_name, model, *file_path, **kwargs):
""" Import data file
:param importer_name: Name of the importer (need installation of the
Blok which have the importer)
:param model: Model of the data to import
:param \*file_path: relative path of the path in this Blok
:param \*\*kwargs: Option for the importer
:rtype: return dict of result
"""
blok_path = BlokManager.getPath(self.name)
_file = join(blok_path, *file_path)
logger.info("import %r file: %r", importer_name, _file)
Importer = self.registry.get(BlokManager.get_importer(importer_name))
file_to_import = None
with open(_file, 'rb') as fp:
file_to_import = fp.read()
importer = Importer.insert(
model=model, file_to_import=file_to_import, **kwargs)
started_at = datetime.now()
res = importer.run(self.name)
stoped_at = datetime.now()
dt = stoped_at - started_at
logger.info("Create %d entries, Update %d entries (%d.%d sec)",
len(res['created_entries']), len(res['updated_entries']),
dt.seconds, dt.microseconds)
if 'error_found' in res and res['error_found']:
for error in res['error_found']:
logger.error(error)
else:
importer.delete()
return res