Source code for ftrack_connect_pipeline.host.engine

# :coding: utf-8
# :copyright: Copyright (c) 2014-2020 ftrack

import logging
import ftrack_api
import copy

from ftrack_connect_pipeline import constants
from ftrack_connect_pipeline.log.log_item import LogItem
from ftrack_connect_pipeline.asset import FtrackObjectManager
from ftrack_connect_pipeline.asset.dcc_object import DccObject


[docs]def getEngine(baseClass, engineType): ''' Returns the Class or Subclass of the given *baseClass* that matches the name of the given *engineType* ''' for subclass in baseClass.__subclasses__(): if engineType == subclass.__name__: return subclass match = getEngine(subclass, engineType) if match: return match
[docs]class BaseEngine(object): ''' Base engine class. ''' engine_type = 'base' '''Engine type for this engine class''' FtrackObjectManager = FtrackObjectManager '''FtrackObjectManager class to use''' DccObject = DccObject '''DccObject class to use''' @property def ftrack_object_manager(self): ''' Initializes and returns an instance of :class:`~ftrack_connect_pipeline.asset.FtrackObjectManager` ''' if not isinstance( self._ftrack_object_manager, self.FtrackObjectManager ): self._ftrack_object_manager = self.FtrackObjectManager( self.event_manager ) return self._ftrack_object_manager @property def dcc_object(self): ''' Returns the :obj:`dcc_object` from the :class:`~ftrack_connect_pipeline.asset.FtrackObjectManager` ''' return self.ftrack_object_manager.dcc_object @dcc_object.setter def dcc_object(self, value): ''' Sets the :obj:`dcc_object` to the :class:`~ftrack_connect_pipeline.asset.FtrackObjectManager` ''' self.ftrack_object_manager.dcc_object = value @property def asset_info(self): ''' Returns the :obj:`asset_info` from the :class:`~ftrack_connect_pipeline.asset.FtrackObjectManager` ''' return self.ftrack_object_manager.asset_info @asset_info.setter def asset_info(self, value): ''' Sets the :obj:`asset_info` to the :class:`~ftrack_connect_pipeline.asset.FtrackObjectManager` ''' self.ftrack_object_manager.asset_info = value @property def host_id(self): '''Returns the current host id.''' return self._host_id @property def host_types(self): '''Return the current host type.''' return self._host_types
[docs] def __init__(self, event_manager, host_types, host_id, asset_type_name): ''' Initialise HostConnection with instance of :class:`~ftrack_connect_pipeline.event.EventManager` , and *host*, *host_id* and *asset_type_name* *host* : Host type.. (ex: python, maya, nuke....) *host_id* : Host id. *asset_type_name* : If engine is initialized to publish or load, the asset type should be specified. ''' super(BaseEngine, self).__init__() self.asset_type_name = asset_type_name self.session = event_manager.session self._host_types = host_types self._host_id = host_id self._definition = None self._ftrack_object_manager = None self.logger = logging.getLogger( __name__ + '.' + self.__class__.__name__ ) self.event_manager = event_manager
[docs] def run_event( self, plugin_name, plugin_type, host_type, data, options, context_data, method, ): ''' Returns an :class:`ftrack_api.event.base.Event` with the topic :const:`~ftrack_connnect_pipeline.constants.PIPELINE_RUN_PLUGIN_TOPIC` with the data of the given *plugin_name*, *plugin_type*, *host_definition*, *data*, *options*, *context_data*, *method* *plugin_name* : Name of the plugin. *plugin_type* : Type of plugin. *host_definition* : Host type. *data* : data to pass to the plugin. *options* : options to pass to the plugin *context_data* : result of the context plugin containing the context_id, aset_name... Or None *method* : Method of the plugin to be executed. ''' return ftrack_api.event.base.Event( topic=constants.PIPELINE_RUN_PLUGIN_TOPIC, data={ 'pipeline': { 'plugin_name': plugin_name, 'plugin_type': plugin_type, 'method': method, 'category': 'plugin', 'host_type': host_type, 'definition': self._definition['name'] if self._definition else None, }, 'settings': { 'data': data, 'options': options, 'context_data': context_data, }, }, )
def _run_plugin( self, plugin, plugin_type, options=None, data=None, context_data=None, method='run', ): ''' Returns the result of running the plugin with the event returned from :meth:`run_event` using the given *plugin*, *plugin_type*, *options*, *data*, *context_data*, *method* *plugin* : Plugin definition, a dictionary with the plugin information. *plugin_type* : Type of plugin. *options* : options to pass to the plugin *data* : data to pass to the plugin. *context_data* : result of the context plugin containing the context_id, aset_name... Or None *method* : Method of the plugin to be executed. ''' plugin_name = plugin['plugin'] start_data = { 'plugin_name': plugin_name, 'plugin_type': plugin_type, 'method': method, 'status': constants.RUNNING_STATUS, 'result': None, 'execution_time': 0, 'message': None, } self._notify_client(plugin, start_data) result_data = copy.deepcopy(start_data) result_data['status'] = constants.UNKNOWN_STATUS for host_type in reversed(self._host_types): event = self.run_event( plugin_name, plugin_type, host_type, data, options, context_data, method, ) plugin_result_data = self.session.event_hub.publish( event, synchronous=True ) if plugin_result_data: result_data = plugin_result_data[0] break self._notify_client(plugin, result_data) return result_data def _notify_client(self, plugin, result_data): ''' Publish an :class:`ftrack_api.event.base.Event` with the topic :const:`~ftrack_connnect_pipeline.constants.PIPELINE_CLIENT_NOTIFICATION` to notify the client of the given *plugin* result *result_data*. Also store plugin result in persistent database. *plugin* : Plugin definition, a dictionary with the plugin information. *result_data* : Result of the plugin execution. ''' result_data['host_id'] = self.host_id if plugin: result_data['widget_ref'] = plugin.get('widget_ref') result_data["plugin_id"] = plugin.get('plugin_id') else: result_data['widget_ref'] = None result_data["plugin_id"] = None event = ftrack_api.event.base.Event( topic=constants.PIPELINE_CLIENT_NOTIFICATION, data={'pipeline': result_data}, ) self.event_manager.publish( event, )
[docs] def run(self, data): ''' Executes the :meth:`_run_plugin` with the provided *data*. Returns the result of the mentioned method. *data* : pipeline['data'] provided from the client host connection at :meth:`~ftrack_connect_pipeline.client.HostConnection.run` ''' method = data.get('method', 'run') plugin = data.get('plugin', None) plugin_type = data.get('plugin_type', None) result = None if plugin: plugin_result = self._run_plugin( plugin, plugin_type, data=plugin.get('plugin_data'), options=plugin['options'], context_data=None, method=method, ) status = plugin_result['status'] bool_status = constants.status_bool_mapping[status] result = plugin_result if not bool_status: raise Exception( 'An error occurred during the execution of the plugin {}' '\n status: {} \n result: {}'.format( plugin['plugin'], status, result ) ) return result
# Base functions for loader, opener and publisher
[docs] def run_stage( self, stage_name, plugins, stage_context, stage_options, stage_data, plugins_order=None, step_type=None, step_name=None, ): ''' Returns the bool status and the result list of dictionaries of executing all the plugins in the stage. This function executes all the defined plugins for this stage using the :meth:`_run_plugin` *stage_name* : Name of the stage that's executing. *plugins* : List of plugins that has to execute. *stage_context* : Context dictionary with the result of the context plugin containing the context_id, aset_name... Or None *stage_options* : Options dictionary to be passed to each plugin. *stage_data* : Data list of dictionaries to be passed to each stage. *plugins_order* : Order of the plugins to be executed. *step_type* : Type of the step. ''' plugin_type = '{}.{}'.format(self.engine_type, stage_name) stage_status = True stage_results = [] # We don't want to pass the information of the previous plugin, so that # is why we only pass the data of the previous stage. data = stage_data i = 1 for plugin in plugins: self._notify_progress_client( step_type=step_type, step_name=step_name, stage_name=stage_name, total_plugins=len(plugins), current_plugin_index=i, status=constants.RUNNING_STATUS, results=None, ) result = None plugin_name = plugin['plugin'] plugin_options = plugin['options'] # Update the plugin_options with the stage_options. plugin_options.update(stage_options) category = plugin['category'] type = plugin['type'] default_method = plugin['default_method'] plugin_result = self._run_plugin( plugin, plugin_type, data=data, options=plugin_options, context_data=stage_context, method=default_method, ) bool_status = constants.status_bool_mapping[ plugin_result['status'] ] if not bool_status: stage_status = False result = plugin_result['result'] # We log a warning if a plugin on the stage failed. self.logger.error( "Execution of the plugin {} failed.".format(plugin_name) ) else: if plugin_result['result']: result = plugin_result['result'].get(default_method) if step_type == constants.CONTEXT: result['asset_type_name'] = self.asset_type_name plugin_dict = { "name": plugin_name, "options": plugin_options, "result": result, "status": bool_status, "category": category, "type": type, "plugin_type": plugin_result['plugin_type'], "method": plugin_result['method'], "user_data": plugin_result.get('user_data') or {}, "message": plugin_result['message'], "widget_ref": plugin_result['widget_ref'], "host_id": plugin_result['host_id'], "plugin_id": plugin_result['plugin_id'], } stage_results.append(plugin_dict) i += 1 return stage_status, stage_results
[docs] def run_step( self, step_name, stages, step_context, step_options, step_data, stages_order, step_type, ): ''' Returns the bool status and the result list of dictionaries of executing all the stages in the step. This function executes all the defined stages for for this step using the :meth:`run_stage` with the given *stage_order*. *step_name* : Name of the step that's executing. *stages* : List of stages that has to execute. *step_context* : Context dictionary with the result of the context plugin containing the context_id, aset_name... Or None *step_options* : Options dictionary to be passed to each stage. *step_data* : Data list of dictionaries to be passed to each stage. *stages_order* : Order of the stages to be executed. *step_type* : Type of the step. ''' step_status = True step_results = [] # data will be, the previous step data, plus the current step dictionary # plus the stage result filled on every loop data = step_data current_step_dict = { "name": step_name, "result": step_results, "type": step_type, } data.append(current_step_dict) for stage in stages: for stage_name in stages_order: if stage_name != stage['name']: continue stage_plugins = stage['plugins'] category = stage['category'] type = stage['type'] if not stage_plugins: continue stage_status, stage_result = self.run_stage( stage_name=stage_name, plugins=stage_plugins, stage_context=step_context, stage_options=step_options, stage_data=data, plugins_order=None, step_type=step_type, step_name=step_name, ) if not stage_status: step_status = False # Log an error if the execution of a stage has failed. self.logger.error( "Execution of the stage {} failed.".format(stage_name) ) stage_dict = { "name": stage_name, "result": stage_result, "status": stage_status, "category": category, "type": type, } step_results.append(stage_dict) # We stop the loop if the stage failed. To raise an error on # run_definitions if not step_status: self._notify_progress_client( step_type, step_name, stage_name, None, None, constants.ERROR_STATUS, step_results, ) return step_status, step_results else: self._notify_progress_client( step_type, step_name, stage_name, None, None, constants.SUCCESS_STATUS, step_results, ) self._notify_progress_client( step_type, step_name, None, None, None, constants.SUCCESS_STATUS, step_results, ) return step_status, step_results
def _notify_progress_client( self, step_type, step_name, stage_name, total_plugins, current_plugin_index, status, results, ): ''' Publish an :class:`ftrack_api.event.base.Event` with the topic :const:`~ftrack_connnect_pipeline.constants.PIPELINE_CLIENT_NOTIFICATION` to notify the client of the given *plugin* result *result_data*. Also store plugin result in persistent database. *plugin* : Plugin definition, a dictionary with the plugin information. *result_data* : Result of the plugin execution. ''' data = { 'host_id': self.host_id, 'step_type': step_type, 'step_name': step_name, 'stage_name': stage_name, 'total_plugins': total_plugins, 'current_plugin_index': current_plugin_index, 'status': status, 'results': results, } event = ftrack_api.event.base.Event( topic=constants.PIPELINE_CLIENT_PROGRESS_NOTIFICATION, data={'pipeline': data}, ) self.event_manager.publish( event, )
[docs] def run_definition(self, data): ''' Runs the whole definition from the provided *data*. Call the method :meth:`run_step` for each context, component and finalizer steps. *data* : pipeline['data'] provided from the client host connection at :meth:`~ftrack_connect_pipeline.client.HostConnection.run` Should be a valid definition. ''' self._definition = data context_data = None components_output = [] finalizers_output = [] for step_group in constants.STEP_GROUPS: group_steps = data[step_group] group_results = [] if step_group == constants.FINALIZERS: group_results = copy.deepcopy(components_output) group_status = True for step in group_steps: step_name = step['name'] step_stages = step['stages'] step_enabled = step['enabled'] step_stage_order = step['stage_order'] step_category = step['category'] step_type = step['type'] step_options = {} if step_group == constants.COMPONENTS: if 'file_formats' in step: step_options['file_formats'] = step[ 'file_formats' ] # Pass on to collector if not step_enabled: self.logger.debug( 'Skipping step {} as it has been disabled'.format( step_name ) ) continue step_data = copy.deepcopy(group_results) step_status, step_result = self.run_step( step_name=step_name, stages=step_stages, step_context=context_data, step_options=step_options, step_data=step_data, stages_order=step_stage_order, step_type=step_type, ) if not step_status: group_status = False # Log an error if the execution of a step has failed. self.logger.error( "Execution of the step {} failed.".format(step_name) ) step_dict = { "name": step_name, "result": step_result, "status": step_status, "category": step_category, "type": step_type, } group_results.append(step_dict) # Stop if context results are false to raise a proper error. if not group_status: break if not group_status: raise Exception( 'An error occurred during the execution of the a {} and ' 'can not continue, please, check the plugin logs'.format( step_group ) ) if step_group == constants.CONTEXTS: context_latest_step = group_results[-1] context_latest_stage = context_latest_step.get('result')[-1] context_latest_plugin = context_latest_stage.get('result')[-1] context_latest_plugin_result = context_latest_plugin.get( 'result' ) context_data = context_latest_plugin_result elif step_group == constants.COMPONENTS: components_output = copy.deepcopy(group_results) i = 0 for component_step in group_results: for component_stage in component_step.get("result"): self.logger.debug( "Checking stage name {} of type {}".format( component_stage.get("name"), component_stage.get("type"), ) ) if not component_stage.get("type") in [ constants.IMPORTER, constants.EXPORTER, constants.POST_IMPORTER, ]: self.logger.debug( "Removing stage name {} of type {}".format( component_stage.get("name"), component_stage.get("type"), ) ) components_output[i]['result'].remove( component_stage ) i += 1 elif step_group == constants.FINALIZERS: finalizers_output = group_results return finalizers_output
from ftrack_connect_pipeline.host.engine.publish import * from ftrack_connect_pipeline.host.engine.load import * from ftrack_connect_pipeline.host.engine.open import * from ftrack_connect_pipeline.host.engine.asset_manager import *