Source code for ftrack_connect_pipeline.client

# :coding: utf-8
# :copyright: Copyright (c) 2014-2020 ftrack
import os
import time
import logging
import copy
import uuid

from six import string_types
import ftrack_api
from ftrack_connect_pipeline import constants
from ftrack_connect_pipeline.log import LogDB
from ftrack_connect_pipeline.log.log_item import LogItem
from ftrack_connect_pipeline.definition import definition_object

[docs]class HostConnection(object): ''' Host Connection Base class. This class is used to communicate between the client and the host. ''' @property def context_id(self): '''Returns the current context id as fetched from the host''' return self._context_id @context_id.setter def context_id(self, value): '''Set the context id for this host connection to *value*. Will notify the host and other active host connection through an event, and tell the client through callback.''' if value == self.context_id: return self._context_id = value @property def event_manager(self): '''Returns instance of :class:`~ftrack_connect_pipeline.event.EventManager`''' return self._event_manager @property def session(self): ''' Returns instance of :class:`ftrack_api.session.Session` ''' return self.event_manager.session @property def definitions(self): '''Returns the current definitions, filtered on discoverable.''' context_identifiers = [] if self.context_id: entity = self.session.query( 'TypedContext where id is {}'.format(self.context_id) ).first() if entity: # Task, Project,... context_identifiers.append(entity.get('context_type').lower()) if 'type' in entity: # Modeling, animation... context_identifiers.append( entity['type'].get('name').lower() ) # Name of the task or the project context_identifiers.append(entity.get('name').lower()) if context_identifiers: result = {} for schema_title in self._raw_host_data['definition'].keys(): result[schema_title] = self._filter_definitions( context_identifiers, self._raw_host_data['definition'][schema_title], ) # TODO: # This is a dictionary where the keys are the definition types like # publisher, opener, etc... and the values are a list of those. # But it also includes the key Schema, which contains a list of the # schemas for each type. This should be cleaned up in the future in # order to separate schemas from the definitions. return copy.deepcopy(result) return definition_object.DefinitionObject( self._raw_host_data['definition'] ) def _filter_definitions(self, context_identifiers, definitions): '''Filter *definitions* on *context_identifiers* and discoverable.''' result = [] for definition in definitions: match = False discoverable = definition.get('discoverable') if not discoverable: # Append if not discoverable, because that means should be # discovered always as the Asset Manager or the logger match = True else: # This is not in a list comprehension because it needs the break # once found for discover_name in discoverable: if discover_name.lower() in context_identifiers: # Add definition as it matches match = True break if not match: self.logger.debug( 'Excluding definition {} - context identifiers {} ' 'does not match schema discoverable: {}.'.format( definition.get('name'), context_identifiers, discoverable, ) ) if match: result.append(definition) # Convert the list to our custom DefinitionList so we can have get # method and automatically convert all definitions to definitionObject return definition_object.DefinitionList(result) @property def id(self): '''Returns the current host id.''' return self._raw_host_data['host_id'] @property def name(self): '''Returns the current host name.''' return self._raw_host_data['host_name'] @property def host_types(self): '''Returns the list of compatible host for the current definitions.''' return self._raw_host_data['host_id'].split("-")[0].split(".") def __del__(self): self.logger.debug('Closing {}'.format(self)) def __repr__(self): return '<HostConnection: {}>'.format( def __hash__(self): return hash( def __eq__(self, other): return self.__hash__() == other.__hash__()
[docs] def __init__(self, event_manager, host_data): '''Initialise HostConnection with instance of :class:`~ftrack_connect_pipeline.event.EventManager` , and *host_data* *host_data* : Dictionary containing the host information. :py:func:`` ''' self.logger = logging.getLogger( '{0}.{1}'.format(__name__, self.__class__.__name__) ) copy_data = copy.deepcopy(host_data) self._event_manager = event_manager self._raw_host_data = copy_data self._context_id = self._raw_host_data.get('context_id') self.subscribe_host_context_change()
[docs] def run(self, data, engine, callback=None): ''' Publish an event with the topic :py:const:`~ftrack_connect_pipeline.constants.PIPELINE_HOST_RUN` with the given *data* and *engine*. ''' event = ftrack_api.event.base.Event( topic=constants.PIPELINE_HOST_RUN, data={ 'pipeline': { 'host_id':, 'data': data, 'engine_type': engine, } }, ) self.event_manager.publish(event, callback)
[docs] def launch_client(self, name, source=None): '''Send a widget launch event, to be picked up by DCC.''' event = ftrack_api.event.base.Event( topic=constants.PIPELINE_CLIENT_LAUNCH, data={ 'pipeline': { 'host_id':, 'name': name, 'source': source, } }, ) self.event_manager.publish( event, )
[docs] def subscribe_host_context_change(self): '''Have host connection subscribe to context change events, to be able to notify client''' self.session.event_hub.subscribe( 'topic={} and data.pipeline.host_id={}'.format( constants.PIPELINE_HOST_CONTEXT_CHANGE, ), self._ftrack_host_context_id_changed, )
def _ftrack_host_context_id_changed(self, event): '''Set the new context ID based on data provided in *event*''' self.context_id = event['data']['pipeline']['context_id']
[docs] def change_host_context_id(self, context_id): event = ftrack_api.event.base.Event( topic=constants.PIPELINE_CLIENT_CONTEXT_CHANGE, data={'pipeline': {'host_id':, 'context_id': context_id}}, ) self.event_manager.publish( event, )
[docs]class Client(object): ''' Base client class. ''' ui_types = [constants.UI_TYPE] '''Compatible UI for this client.''' definition_filters = None '''Use only definitions that matches the definition_filters''' definition_extensions_filter = None '''(Open) Only show definitions and components capable of accept these filename extensions. ''' _host_connection = None '''The singleton host connection used by all clients within the process space / DCC''' _host_connections = [] '''The list of discovered host connections''' def __repr__(self): return '<Client:{0}>'.format(self.ui_types) def __del__(self): self.logger.debug('Closing {}'.format(self)) @property def session(self): ''' Returns instance of :class:`ftrack_api.session.Session` ''' return self._event_manager.session @property def event_manager(self): '''Returns instance of :class:`~ftrack_connect_pipeline.event.EventManager`''' return self._event_manager @property def connected(self): ''' Returns True if client is connected to a :class:``''' return self._connected @property def context_id(self): '''Returns the current context id from host''' if self.host_connection is None: raise Exception('No host connection available') return self.host_connection.context_id @context_id.setter def context_id(self, context_id): '''Sets the context id on current host connection, will throw an exception if no host connection is active''' if not isinstance(context_id, string_types): raise ValueError('Context should be in form of a string.') if self.host_connection is None: raise Exception('No host connection available') self.host_connection.change_host_context_id(context_id) @property def context(self): '''Returns the current context''' if self.host_connection is None: raise Exception('No host connection available') if self.host_connection.context_id is None: raise Exception('No host context id set') return self.session.query( 'Context where id={}'.format(self.context_id) ).first() @property def host_connections(self): '''Return the current list of host_connections''' return Client._host_connections @host_connections.setter def host_connections(self, value): '''Return the current list of host_connections''' Client._host_connections = value @property def host_connection(self): ''' Return instance of :class:`~ftrack_connect_pipeline.client.HostConnection` ''' return Client._host_connection @host_connection.setter def host_connection(self, value): ''' Assign the host_connection to the given *value* *value* : should be instance of :class:`~ftrack_connect_pipeline.client.HostConnection` ''' if value is None or ( self.host_connection and == ): return self.logger.debug('host connection: {}'.format(value)) Client._host_connection = value self.on_client_notification() self.subscribe_host_context_change() # Feed change of host and context to client self.on_host_changed(self.host_connection) self.on_context_changed(self.host_connection.context_id) @property def schema(self): '''Return the current schema.''' return self._schema @property def definition(self): '''Returns the current definition.''' return self._definition @property def definitions(self): '''Returns the definitions list of the current host connection''' if self.host_connection is None: raise Exception('No host connection available') return self.host_connection.definitions @property def engine_type(self): '''Return the current engine type''' return self._engine_type @property def logs(self): '''Return the log items''' self._init_logs() return self._logs.get_log_items( if not self.host_connection is None else None ) def _init_logs(self): '''Delayed initialization of logs, when we know host ID.''' if self._logs is None: self._logs = LogDB( if not self.host_connection is None else uuid.uuid4().hex ) @property def multithreading_enabled(self): '''Return True if DCC supports multithreading (write operations)''' return self._multithreading_enabled
[docs] def __init__(self, event_manager, multithreading_enabled=True): ''' Initialise Client with instance of :class:`~ftrack_connect_pipeline.event.EventManager` ''' self._current = {} self.context_change_subscribe_id = None self._connected = False self._logs = None self._schema = None self._definition = None self.__callback = None self.logger = logging.getLogger( __name__ + '.' + self.__class__.__name__ ) self._event_manager = event_manager self.logger.debug('Initialising {}'.format(self)) self._multithreading_enabled = multithreading_enabled
# Host
[docs] def discover_hosts(self, force_rediscover=False, time_out=3): ''' Find for available hosts during the optional *time_out* and Returns a list of discovered :class:`~ftrack_connect_pipeline.client.HostConnection`. Skip this and use existing singleton host connection if previously detected, unless *force_rediscover* is True. ''' if force_rediscover: self.host_connections = None self.host_connection = None if self.host_connection is not None: self.on_client_notification() self.subscribe_host_context_change() self.on_host_changed(self.host_connection) self.on_context_changed(self.host_connection.context_id) return # discovery host loop and timeout. start_time = time.time() self.logger.debug('time out set to {}:'.format(time_out)) if not time_out: self.logger.warning( 'Running client with no time out.' 'Will not stop until discover a host.' 'Terminate with: Ctrl-C' ) while not self.host_connections: delta_time = time.time() - start_time if time_out and delta_time >= time_out: self.logger.warning('Could not discover any host.') break self._discover_hosts() if self.__callback and self.host_connections: self.__callback(self.host_connections) # Feed host connections to the client self.on_hosts_discovered(self.host_connections)
def _discover_hosts(self): ''' Publish an event with the topic :py:data:`~ftrack_connect_pipeline.constants.PIPELINE_DISCOVER_HOST` with the callback py:meth:`~ftrack_connect_pipeline.client._host_discovered` ''' self.host_connections = [] # Start over discover_event = ftrack_api.event.base.Event( topic=constants.PIPELINE_DISCOVER_HOST ) self._event_manager.publish( discover_event, callback=self._host_discovered ) def _host_discovered(self, event): ''' Callback, add the :class:`~ftrack_connect_pipeline.client.HostConnection` of the new discovered :class:`` from the given *event*. *event*: :class:`ftrack_api.event.base.Event` ''' if not event['data']: return host_connection = HostConnection(self.event_manager, event['data']) if ( host_connection and host_connection not in self.host_connections and self.filter_host(host_connection) ): Client._host_connections.append(host_connection) self._connected = True
[docs] def filter_host(self, host_connection): '''Return True if the *host_connection* should be considered *host_connection*: :class:`ftrack_connect_pipeline.client.HostConnection` ''' # On the discovery time context id could be None, so we have to consider # hosts with non context id. This method could be useful later on to # filter hosts that not match a specific criteria. But considering all # hosts as valid for now. return True
[docs] def change_host(self, host_connection): '''Client(user) has chosen the host connection to use, set it to *host_connection*''' self.host_connection = host_connection
[docs] def on_hosts_discovered(self, host_connections): '''Callback, hosts has been discovered. To be overridden by the qt client''' pass
[docs] def on_host_changed(self, host_connection): '''Called when the host has been (re-)selected by the user. To be overridden by the qt client.''' pass
# Context
[docs] def subscribe_host_context_change(self): '''Have host connection subscribe to context change events, to be able to notify client''' if self.context_change_subscribe_id: self.session.unsubscribe(self.context_change_subscribe_id) self.context_change_subscribe_id = self.session.event_hub.subscribe( 'topic={} and data.pipeline.host_id={}'.format( constants.PIPELINE_HOST_CONTEXT_CHANGE,, ), self._host_context_id_changed, )
def _host_context_id_changed(self, event): '''Set the new context ID based on data provided in *event*''' # Feed the new context to the client self.on_context_changed(event['data']['pipeline']['context_id'])
[docs] def on_context_changed(self, context_id): '''Called when the context has been set or changed within the host connection, either from this client or remote (other client or the host). Should be overridden by client.''' pass
[docs] def unsubscribe_host_context_change(self): '''Unsubscribe to client context change events''' if self.context_change_subscribe_id: self.session.event_hub.unsubscribe( self.context_change_subscribe_id ) self.context_change_subscribe_id = None
# Definition
[docs] def run_definition(self, definition=None, engine_type=None): ''' Calls the :meth:`` to run the entire given *definition* with the given *engine_type*. Callback received at :meth:`_run_callback` ''' # If not definition or engine type passed use the original ones set up # in the client if not definition: definition = self.definition if not engine_type: engine_type = self.engine_type definition.to_dict(), engine_type, callback=self._run_callback, )
[docs] def get_schema_from_definition(self, definition): '''Return matching schema for the given *definition*''' if not self.host_connection: self.logger.error("please set the host connection first") return for schema in self.host_connection.definitions['schema']: if ( schema['properties']['type'].get('default') == definition['type'] ): return schema self.logger.debug( "Schema title: {} and type: {} does not match definition {}".format( schema['title'], schema['properties']['type'].get('default'), definition['name'], ) ) self.logger.error( "Can't find a matching schema for the given definition: {}".format( ) ) return
[docs] def change_definition(self, definition, schema=None): ''' Assign the given *schema* and the given *definition* as the current :obj:`schema` and :obj:`definition` ''' if not self.host_connection: self.logger.error("please set the host connection first") return if not schema: schema = self.get_schema_from_definition(definition) self._schema = schema self._definition = definition self.change_engine(self.definition['_config']['engine_type'])
# Plugin
[docs] def run_plugin(self, plugin_data, method, engine_type): ''' Calls the :meth:`` to run one single plugin. Callback received at :meth:`_run_callback` *plugin_data* : Dictionary with the plugin information. *method* : method of the plugin to be run ''' # Plugin type is constructed using the engine_type and the type of the plugin. # (publisher.collector). We have to make sure that plugin_type is in # the data argument passed to the host_connection, because we are only # passing data to the engine. And the engine_type is only available # on the definition. plugin_type = '{}.{}'.format(engine_type, plugin_data['type']) data = { 'plugin': plugin_data, 'plugin_type': plugin_type, 'method': method, } data, engine_type, callback=self._run_callback )
def _run_callback(self, event): '''Callback of the :meth:`~ftrack_connect_pipeline.client.run_plugin''' self.logger.debug("_run_callback event: {}".format(event))
[docs] def on_ready(self, callback, time_out=3): ''' calls the given *callback* method when host is been discovered with the optional *time_out* ''' self.__callback = callback self.discover_hosts(time_out=time_out)
[docs] def change_engine(self, engine_type): ''' Assign the given *engine_type* as the current :obj:`engine_type` ''' self._engine_type = engine_type
def _on_log_item_added(self, log_item): '''Called when a client notify event arrives.''' pass
[docs] def on_client_notification(self): ''' Subscribe to topic :const:`~ftrack_connect_pipeline.constants.PIPELINE_CLIENT_NOTIFICATION` to receive client notifications from the host in :meth:`_notify_client` ''' self.session.event_hub.subscribe( 'topic={} and data.pipeline.host_id={}'.format( constants.PIPELINE_CLIENT_NOTIFICATION, ), self._notify_client, )
def _notify_client(self, event): ''' Callback of the :const:`~ftrack_connect_pipeline.constants.PIPELINE_CLIENT_NOTIFICATION` event. *event*: :class:`ftrack_api.event.base.Event` ''' result = event['data']['pipeline']['result'] status = event['data']['pipeline']['status'] plugin_name = event['data']['pipeline']['plugin_name'] widget_ref = event['data']['pipeline']['widget_ref'] message = event['data']['pipeline']['message'] user_data = event['data']['pipeline'].get('user_data') or {} user_message = user_data.get('message') plugin_id = event['data']['pipeline'].get('plugin_id') self._on_log_item_added(LogItem(event['data']['pipeline'])) if constants.status_bool_mapping[status]: self.logger.debug( '\n plugin_name: {} \n status: {} \n result: {} \n ' 'message: {} \n user_message: {} \n plugin_id: {}'.format( plugin_name, status, result, message, user_message, plugin_id, ) ) if ( status == constants.ERROR_STATUS or status == constants.EXCEPTION_STATUS ): raise Exception( 'An error occurred during the execution of the ' 'plugin name {} \n message: {} \n user_message: {} ' '\n data: {} \n plugin_id: {}'.format( plugin_name, message, user_message, result, plugin_id ) )