# :coding: utf-8
# :copyright: Copyright (c) 2014-2020 ftrack
import threading
import logging
import ftrack_api
from ftrack_connect_pipeline import constants
import uuid
logger = logging.getLogger(__name__)
class _EventHubThread(threading.Thread):
'''Listen for events from ftrack's event hub.'''
def __repr__(self):
return "<{0}:{1}>".format(self.__class__.__name__, self.name)
def __init__(self, session):
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
_name = str(hash(session))
super(_EventHubThread, self).__init__(name=_name)
self.logger.debug('Name set for the thread: {}'.format(_name))
self._session = session
def start(self):
'''Start thread for *_session*.'''
self.logger.debug(
'starting event hub thread for session {}'.format(self._session)
)
super(_EventHubThread, self).start()
def run(self):
'''Listen for events.'''
self.logger.debug(
'hub thread started for session {}'.format(self._session)
)
self._session.event_hub.wait()
[docs]class EventManager(object):
'''Manages the events handling.'''
def __repr__(self):
return '<EventManager:{}:{}>'.format(self.mode, self.id)
def __del__(self):
self.logger.debug('Closing {}'.format(self))
@property
def id(self):
return uuid.uuid4().hex
@property
def session(self):
return self._session
@property
def connected(self):
_connected = False
try:
_connected = self.session.event_hub.connected
except Exception as e:
self.logger.error(
"Error checking event hub connected {}".format(e)
)
return _connected
@property
def mode(self):
return self._mode
def _connect(self):
# If is not already connected, connect to event hub.
while not self.connected:
self.session.event_hub.connect()
def _wait(self):
for thread in threading.enumerate():
if thread.getName() == str(hash(self.session)):
self._event_hub_thread = thread
break
if not self._event_hub_thread:
# self.logger.debug('Initializing new hub thread {}'.format(self))
self._event_hub_thread = _EventHubThread(self.session)
if not self._event_hub_thread.is_alive():
# self.logger.debug('Starting new hub thread for {}'.format(self))
self._event_hub_thread.start()
[docs] def __init__(self, session, mode=constants.LOCAL_EVENT_MODE):
self.logger = logging.getLogger(
__name__ + '.' + self.__class__.__name__
)
self._event_hub_thread = None
self._mode = mode
self._session = session
if mode == constants.REMOTE_EVENT_MODE:
# TODO: Bring this back when API event hub properly can differentiate between local and remote mode
self._connect()
self._wait()
# self.logger.debug('Initialising {}'.format(self))
[docs] def publish(self, event, callback=None, mode=None):
'''Emit *event* and provide *callback* function.'''
mode = mode or self.mode
# self.logger.debug(
# 'Publishing event topic {} in {} mode'.format(
# event.get('topic'), mode
# )
# )
if mode is constants.LOCAL_EVENT_MODE:
result = self.session.event_hub.publish(
event,
synchronous=True,
)
if result:
result = result[0]
# Mock async event reply.
new_event = ftrack_api.event.base.Event(
topic='ftrack.meta.reply',
data=result,
in_reply_to_event=event['id'],
)
if callback:
callback(new_event)
else:
self.session.event_hub.publish(event, on_reply=callback)
[docs] def subscribe(self, topic, callback):
self.session.event_hub.subscribe('topic={}'.format(topic), callback)