Source code for sardana.pool.poolmeasurementgroup

#!/usr/bin/env python

##############################################################################
##
## This file is part of Sardana
##
## http://www.sardana-controls.org/
##
## Copyright 2011 CELLS / ALBA Synchrotron, Bellaterra, Spain
##
## Sardana is free software: you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## Sardana is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with Sardana.  If not, see <http://www.gnu.org/licenses/>.
##
##############################################################################

"""This module is part of the Python Pool library. It defines the base classes
for"""

__all__ = ["PoolMeasurementGroup"]

__docformat__ = 'restructuredtext'

from taurus.core.taurusvalidator import AttributeNameValidator

from sardana import State, ElementType, \
    TYPE_EXP_CHANNEL_ELEMENTS, TYPE_TIMERABLE_ELEMENTS
from sardana.sardanaevent import EventType
from sardana.pool.pooldefs import AcqMode, AcqTriggerType
from sardana.pool.poolgroupelement import PoolGroupElement
from sardana.pool.poolacquisition import PoolAcquisition
from sardana.pool.poolexternal import PoolExternalObject

from sardana.taurus.core.tango.sardana import PlotType, Normalization

#----------------------------------------------
# Measurement Group Configuration information
#----------------------------------------------
# dict <str, obj> with (at least) keys:
#    - 'timer' : the timer channel name / timer channel id
#    - 'monitor' : the monitor channel name / monitor channel id
#    - 'controllers' : dict<Controller, dict> where:
#        - key: ctrl
#        - value: dict<str, dict> with (at least) keys:
#            - 'units': dict<str, dict> with (at least) keys:
#                - 'id' : the unit ID inside the controller
#                - 'timer' : the timer channel name / timer channel id
#                - 'monitor' : the monitor channel name / monitor channel id
#                - 'trigger_type' : 'Gate'/'Software'
#                - 'channels' where value is a dict<str, obj> with (at least) keys:
#                    - 'id' : the channel name ( channel id )
#                    optional keys:
#                    - 'enabled' : True/False (default is True)
#                    any hints:
#                    - 'output' : True/False (default is True)
#                    - 'plot_type' : 'No'/'1D'/'2D' (default is 'No')
#                    - 'plot_axes' : list<str> 'where str is channel name/'step#/'index#' (default is [])
#                    - 'label' : prefered label (default is channel name)
#                    - 'scale' : <float, float> with min/max (defaults to channel
#                                range if it is defined
#                    - 'plot_color' : int representing RGB
#    optional keys:
#    - 'label' : measurement group label (defaults to measurement group name)
#    - 'description' : measurement group description

# <MeasurementGroupConfiguration>
#  <timer>UxTimer</timer>
#  <monitor>CT1</monitor>
# </MeasurementGroupConfiguration>

# Example: 2 NI cards, where channel 1 of card 1 is wired to channel 1 of card 2
# at configuration time we should set:
# ctrl.setPar( <unit>, <parameter name>, <parameter value> )
# ni0ctrl.setCtrlPar(0, 'trigger_type', AcqTriggerType.Software)
# ni0ctrl.setCtrlPar(0, 'timer', 1) # channel 1 is the timer
# ni0ctrl.setCtrlPar(0, 'monitor', 4) # channel 4 is the monitor
# ni1ctrl.setCtrlPar(0, 'trigger_type', AcqTriggerType.ExternalTrigger)
# ni1ctrl.setCtrlPar(0, 'master', 0)

# when we count for 1.5 seconds:
# ni1ctrl.Load(1.5)
# ni0ctrl.Load(1.5)
# ni1ctrl.Start()
# ni0ctrl.Start()

"""

"""

[docs]class PoolMeasurementGroup(PoolGroupElement): DFT_DESC = 'General purpose measurement group' def __init__(self, **kwargs): self._integration_time = None self._monitor_count = None self._acquisition_mode = AcqMode.Timer self._config = None self._config_dirty = True kwargs['elem_type'] = ElementType.MeasurementGroup PoolGroupElement.__init__(self, **kwargs) self.set_configuration(kwargs.get('configuration')) def _create_action_cache(self): acq_name = "%s.Acquisition" % self._name return PoolAcquisition(self, acq_name) def _calculate_element_state(self, elem, elem_state_info): if elem.get_type() == ElementType.ZeroDExpChannel: if elem_state_info[0] == State.Moving: elem_state_info = State.On, elem_state_info[1] return PoolGroupElement._calculate_element_state(self, elem, elem_state_info)
[docs] def on_element_changed(self, evt_src, evt_type, evt_value): name = evt_type.name if name == 'state': if evt_src.get_type() == ElementType.ZeroDExpChannel: # 0D channels are "passive", which means they cannot contribute # to set the measurement group into a moving state if evt_value in (State.On, State.Moving): return state, status = self._calculate_states() self.set_state(state, propagate=2) self.set_status("\n".join(status))
[docs] def get_pool_controllers(self): return self.get_acquisition().get_pool_controllers()
[docs] def get_pool_controller_by_name(self, name): name = name.lower() for ctrl in self.get_pool_controllers(): if ctrl.name.lower() == name or ctrl.full_name.lower() == name: return ctrl
# -------------------------------------------------------------------------- # configuration # -------------------------------------------------------------------------- def _is_managed_element(self, element): return element.get_type() in TYPE_EXP_CHANNEL_ELEMENTS def _build_channel_defaults(self, channel_data, channel): """Fills the channel default values for the given channel dictionary""" external_from_name = isinstance(channel, (str, unicode)) ndim = None instrument = None if external_from_name: name = full_name = source = channel else: name = channel.name full_name = channel.full_name source = channel.get_source() ndim = None instrument = None ctype = channel.get_type() if ctype != ElementType.External: instrument = channel.instrument if ctype == ElementType.CTExpChannel: ndim = 0 elif ctype == ElementType.PseudoCounter: ndim = 0 elif ctype == ElementType.ZeroDExpChannel: ndim = 0 elif ctype == ElementType.OneDExpChannel: ndim = 1 elif ctype == ElementType.TwoDExpChannel: ndim = 2 elif ctype == ElementType.External: config = channel.get_config() if config is not None: ndim = int(config.data_format) elif ctype == ElementType.IORegister: ndim = 0 # Definitively should be initialized by measurement group # index MUST be here already (asserting this in the following line) channel_data['index'] = channel_data['index'] channel_data['name'] = channel_data.get('name', name) channel_data['full_name'] = channel_data.get('full_name', full_name) channel_data['source'] = channel_data.get('source', source) channel_data['enabled'] = channel_data.get('enabled', True) channel_data['label'] = channel_data.get('label', channel_data['name']) channel_data['instrument'] = channel_data.get('instrument', getattr(instrument, 'name', None)) channel_data['ndim'] = ndim # Probably should be initialized by measurement group channel_data['output'] = channel_data.get('output', True) # Perhaps should NOT be initialized by measurement group channel_data['plot_type'] = channel_data.get('plot_type', PlotType.No) channel_data['plot_axes'] = channel_data.get('plot_axes', []) channel_data['conditioning'] = channel_data.get('conditioning', '') channel_data['normalization'] = channel_data.get('normalization', Normalization.No) return channel_data def _build_configuration(self): """Builds a configuration object from the list of elements""" config = {} user_elements = self.get_user_elements() ctrls = self.get_pool_controllers() # find the first CT first_timerable = None for elem in user_elements: if elem.get_type() in TYPE_TIMERABLE_ELEMENTS: first_timerable = elem break if first_timerable is None: raise Exception("It is not possible to construct a measurement " "group without at least one timer able channel " "(Counter/timer, 1D or 2D)") g_timer = g_monitor = first_timerable config['timer'] = g_timer config['monitor'] = g_monitor config['controllers'] = controllers = {} external_user_elements = [] for index, element in enumerate(user_elements): elem_type = element.get_type() if elem_type == ElementType.External: external_user_elements.append((index, element)) continue ctrl = element.controller ctrl_data = controllers.get(ctrl) # attention: following lines are only prepared for 1 unit per # controller if ctrl_data is None: controllers[ctrl] = ctrl_data = {} ctrl_data['units'] = units = {} units['0'] = unit_data = {} unit_data['id'] = 0 unit_data['channels'] = channels = {} if elem_type in TYPE_TIMERABLE_ELEMENTS: elements = ctrls[ctrl] if g_timer in elements: unit_data['timer'] = g_timer else: unit_data['timer'] = elements[0] if g_monitor in elements: unit_data['monitor'] = g_monitor else: unit_data['monitor'] = elements[0] unit_data['trigger_type'] = AcqTriggerType.Software else: channels = ctrl_data['units']['0']['channels'] channels[element] = channel_data = {} channel_data['index'] = user_elements.index(element) channel_data = self._build_channel_defaults(channel_data, element) config['label'] = self.name config['description'] = self.DFT_DESC if len(external_user_elements) > 0: controllers['__tango__'] = ctrl_data = {} ctrl_data['units'] = units = {} units['0'] = unit_data = {} unit_data['id'] = 0 unit_data['channels'] = channels = {} for index, element in external_user_elements: channels[element] = channel_data = {} channel_data['index'] = index channel_data = self._build_channel_defaults(channel_data, element) return config
[docs] def set_configuration(self, config=None, propagate=1): if config is None: config = self._build_configuration() else: # create a configuration based on a new configuration user_elem_ids = {} pool = self.pool for c, c_data in config['controllers'].items(): external = isinstance(c, (str, unicode)) # attention: following line only prepared for 1 unit per controller for channel_data in c_data['units']['0']['channels'].values(): if external: element = id = channel_data['full_name'] channel_data['source'] = id else: element = pool.get_element_by_full_name(channel_data['full_name']) id = element.id user_elem_ids[channel_data['index']] = id channel_data = self._build_channel_defaults(channel_data, element) indexes = sorted(user_elem_ids.keys()) assert indexes == range(len(indexes)) self.set_user_element_ids([ user_elem_ids[idx] for idx in indexes ]) # checks g_timer, g_monitor = config['timer'], config['monitor'] # attention: following line only prepared for 1 unit per controller timer_ctrl_data = config['controllers'][g_timer.controller]['units']['0'] if timer_ctrl_data['timer'] != g_timer: self.warning('unit timer and global timer mismatch. ' 'Using global timer') self.debug('For controller %s, timer is defined as channel %s. ' 'The global timer is set to channel %s which belongs ' 'to the same controller', g_timer.controller.name, timer_ctrl_data['timer'].name, g_timer.name) timer_ctrl_data['timer'] = g_timer # attention: following line only prepared for 1 unit per controller monitor_ctrl_data = config['controllers'][g_monitor.controller]['units']['0'] if monitor_ctrl_data['monitor'] != g_monitor: self.warning('unit monitor and global monitor mismatch. ' 'Using global monitor') self.debug('For controller %s, monitor is defined as channel %s. ' 'The global timer is set to channel %s which belongs ' 'to the same controller', g_monitor.controller.name, monitor_ctrl_data['monitor'].name, g_monitor.name) monitor_ctrl_data['monitor'] != g_monitor self._config = config self._config_dirty = True if not propagate: return self.fire_event(EventType("configuration", priority=propagate), config)
[docs] def set_configuration_from_user(self, cfg, propagate=1): config = {} user_elements = self.get_user_elements() pool = self.pool timer_name = cfg.get('timer', user_elements[0].full_name) monitor_name = cfg.get('monitor', user_elements[0].full_name) config['timer'] = pool.get_element_by_full_name(timer_name) config['monitor'] = pool.get_element_by_full_name(monitor_name) config['controllers'] = controllers = {} for c_name, c_data in cfg['controllers'].items(): # discard controllers which don't have items (garbage) ch_count = 0 for u_data in c_data['units'].values(): ch_count += len(u_data['channels']) if ch_count == 0: continue external = c_name.startswith('__') if external: ctrl = c_name else: ctrl = pool.get_element_by_full_name(c_name) assert ctrl.get_type() == ElementType.Controller controllers[ctrl] = ctrl_data = {} ctrl_data['units'] = units = {} for u_id, u_data in c_data['units'].items(): # discard units which don't have items (garbage) if len(u_data['channels']) == 0: continue units[u_id] = unit_data = dict(u_data) unit_data['id'] = u_data.get('id', u_id) if not external and ctrl.is_timerable(): unit_data['timer'] = pool.get_element_by_full_name(u_data['timer']) unit_data['monitor'] = pool.get_element_by_full_name(u_data['monitor']) unit_data['trigger_type'] = u_data['trigger_type'] unit_data['channels'] = channels = {} for ch_name, ch_data in u_data['channels'].items(): if external: validator = AttributeNameValidator() params = validator.getParams(ch_data['full_name']) params['pool'] = self.pool channel = PoolExternalObject(**params) else: channel = pool.get_element_by_full_name(ch_name) channels[channel] = dict(ch_data) config['label'] = cfg.get('label', self.name) config['description'] = cfg.get('description', self.DFT_DESC) self.set_configuration(config, propagate=propagate)
[docs] def get_configuration(self): return self._config
[docs] def get_user_configuration(self): cfg = self.get_configuration() config = {} config['timer'] = cfg['timer'].full_name config['monitor'] = cfg['monitor'].full_name config['controllers'] = controllers = {} for c, c_data in cfg['controllers'].items(): ctrl_name = c if not isinstance(c, (str, unicode)): ctrl_name = c.full_name external = ctrl_name.startswith('__') controllers[ctrl_name] = ctrl_data = {} ctrl_data['units'] = units = {} for u_id, u_data in c_data['units'].items(): units[u_id] = unit_data = {} unit_data['id'] = u_data['id'] if not external and c.is_timerable(): if u_data.has_key('timer'): unit_data['timer'] = u_data['timer'].full_name if u_data.has_key('monitor'): unit_data['monitor'] = u_data['monitor'].full_name if u_data.has_key('trigger_type'): unit_data['trigger_type'] = u_data['trigger_type'] unit_data['channels'] = channels = {} for ch, ch_data in u_data['channels'].items(): channels[ch.full_name] = dict(ch_data) config['label'] = cfg['label'] config['description'] = cfg['description'] return config
[docs] def load_configuration(self, force=False): """Loads the current configuration to all involved controllers""" cfg = self.get_configuration() g_timer, g_monitor = cfg['timer'], cfg['monitor'] for ctrl, ctrl_data in cfg['controllers'].items(): # skip external channels if type(ctrl) is str: continue if ctrl.operator == self and not force and not self._config_dirty: continue ctrl.operator = self if ctrl.is_timerable(): for unit, unit_data in ctrl_data['units'].items(): #if ctrl == g_timer.controller: # ctrl.set_ctrl_par('timer', g_timer.axis) #if ctrl == g_monitor.controller: # ctrl.set_ctrl_par('monitor', g_monitor.axis) ctrl.set_ctrl_par('timer', unit_data['timer'].axis) ctrl.set_ctrl_par('monitor', unit_data['monitor'].axis) ctrl.set_ctrl_par('trigger_type', unit_data['trigger_type']) self._config_dirty = False
[docs] def get_timer(self): return self.get_configuration()['timer']
timer = property(get_timer) # -------------------------------------------------------------------------- # integration time # --------------------------------------------------------------------------
[docs] def get_integration_time(self): return self._integration_time
[docs] def set_integration_time(self, integration_time, propagate=1): self._integration_time = integration_time if not propagate: return self.fire_event(EventType("integration_time", priority=propagate), integration_time)
integration_time = property(get_integration_time, set_integration_time, doc="the current integration time") # -------------------------------------------------------------------------- # integration time # --------------------------------------------------------------------------
[docs] def get_monitor_count(self): return self._monitor_count
[docs] def set_monitor_count(self, monitor_count, propagate=1): self._monitor_count = monitor_count if not propagate: return self.fire_event(EventType("monitor_count", priority=propagate), monitor_count)
monitor_count = property(get_monitor_count, set_monitor_count, doc="the current monitor count") # -------------------------------------------------------------------------- # acquisition mode # --------------------------------------------------------------------------
[docs] def get_acquisition_mode(self): return self._acquisition_mode
[docs] def set_acquisition_mode(self, acquisition_mode, propagate=1): self._acquisition_mode = acquisition_mode if not propagate: return self.fire_event(EventType("acquisition_mode", priority=propagate), acquisition_mode)
acquisition_mode = property(get_acquisition_mode, set_acquisition_mode, doc="the current acquisition mode") # -------------------------------------------------------------------------- # acquisition # --------------------------------------------------------------------------
[docs] def start_acquisition(self, value=None, multiple=1): self._aborted = False if not self._simulation_mode: # load configuration into controller(s) if necessary self.load_configuration() # start acquisition kwargs = dict(head=self, config=self._config, multiple=multiple) if self.acquisition_mode == AcqMode.Timer: kwargs["integ_time"] = self._integration_time elif self.acquisition_mode == AcqMode.Monitor: kwargs["monitor_count"] = self._monitor_count self.acquisition.run(**kwargs)
[docs] def set_acquisition(self, acq_cache): self.set_action_cache(acq_cache)
[docs] def get_acquisition(self): return self.get_action_cache()
acquisition = property(get_acquisition, doc="acquisition object")