Source code for zhmcclient._notification

#!/usr/bin/env python
# Copyright 2017,2021 IBM Corp. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
The HMC supports the publishing of notifications for specific topics. This
includes for example asynchronous job completion, property or status changes,
and operating system messages issued in an LPAR or DPM partition.

The zhmcclient package supports receiving HMC notifications in an easy-to-use
way, as shown in the following example that receives and displays OS messages
for a DPM partition::

    import zhmcclient

    hmc = ...
    userid = ...
    password = ...

    session = zhmcclient.Session(hmc, userid, password)
    client = zhmcclient.Client(session)
    cpc = client.cpcs.find(name=cpcname)
    partition = cpc.partitions.find(name=partname)

    topic = partition.open_os_message_channel(include_refresh_messages=True)

    print("Subscribing for OS messages for partition %s on CPC %s using "
          "notifications..." % (partition.name, cpc.name))

    receiver = zhmcclient.NotificationReceiver(
        topic, hmc, session.session_id, session.session_credential)

    while True:
        try:
            for headers, message in receiver.notifications():
                print("HMC notification #%s:" % headers['session-sequence-nr'])
                os_msg_list = message['os-messages']
                for os_msg in os_msg_list:
                    msg_txt = os_msg['message-text'].strip('\\n')
                    msg_id = os_msg['message-id']
                    print("OS message #%s:\\n%s" % (msg_id, msg_txt))
        except zhmcclient.NotificationError as exc:
            print(f"Notification Error: {exc} - reconnecting")
            continue
        except stomp.exception.StompException as exc:
            print("fSTOMP Error: {exc} - reconnecting")
            continue
        except KeyboardInterrupt:
            print("Keyboard Interrupt - leaving")
            receiver.close()
            break
        else:
            print("Receiver has been closed  - leaving")
            break

When running this example code in one terminal, and stopping or starting
the partition in another terminal, one can monitor the shutdown or boot
messages issued by the operating system. The following commands use the
``zhmc`` CLI provided in the :term:`zhmccli project` to do that:

.. code-block:: text

    $ zhmc partition stop {cpc-name} {partition-name}
    $ zhmc partition start {cpc-name} {partition-name}
"""


import os
import json
import ssl
import queue
from collections import namedtuple
import logging
import uuid

from ._logging import logged_api_call
from ._constants import DEFAULT_STOMP_PORT, DEFAULT_STOMP_CONNECT_TIMEOUT, \
    DEFAULT_STOMP_CONNECT_RETRIES, DEFAULT_STOMP_RECONNECT_SLEEP_INITIAL, \
    DEFAULT_STOMP_RECONNECT_SLEEP_INCREASE, DEFAULT_STOMP_RECONNECT_SLEEP_MAX, \
    DEFAULT_STOMP_RECONNECT_SLEEP_JITTER, DEFAULT_STOMP_KEEPALIVE, \
    DEFAULT_STOMP_HEARTBEAT_SEND_CYCLE, DEFAULT_STOMP_HEARTBEAT_RECEIVE_CYCLE, \
    DEFAULT_STOMP_HEARTBEAT_RECEIVE_CHECK, STOMP_MIN_CONNECTION_CHECK_TIME, \
    JMS_LOGGER_NAME
from ._exceptions import NotificationJMSError, NotificationParseError, \
    SubscriptionNotFound, NotificationConnectionError, \
    NotificationSubscriptionError
from ._utils import get_stomp_rt_kwargs, get_headers_message

__all__ = ['NotificationReceiver', 'StompRetryTimeoutConfig']

# Write a log message for each STOMP heartbeat sent or received
DEBUG_HEARTBEATS = False

JMS_LOGGER = logging.getLogger(JMS_LOGGER_NAME)


[docs] class StompRetryTimeoutConfig: # pylint: disable=too-few-public-methods """ A configuration setting that specifies various retry and timeout related parameters for STOMP connections to the HMC for receiving notifictions. HMC/SE version requirements: None """ def __init__(self, connect_timeout=None, connect_retries=None, reconnect_sleep_initial=None, reconnect_sleep_increase=None, reconnect_sleep_max=None, reconnect_sleep_jitter=None, keepalive=None, heartbeat_send_cycle=None, heartbeat_receive_cycle=None, heartbeat_receive_check=None): # pylint: disable=line-too-long """ For all parameters, `None` means that this object does not specify a value for the parameter, and that a default value will be used (see :ref:`Constants`). All parameters are available as instance attributes. Parameters: connect_timeout (:term:`number`): STOMP connect timeout in seconds. This timeout applies to making a connection at the socket level. The special value 0 means that no timeout is set. connect_retries (:term:`integer`): Number of retries (after the initial attempt) for STOMP connection-related issues. These retries are performed for failed DNS lookups, failed socket connections, and socket connection timeouts. The special value -1 means that there are infinite retries. reconnect_sleep_initial (:term:`number`): Initial STOMP reconnect sleep delay in seconds. The reconnect sleep delay is the time to wait before reconnecting. reconnect_sleep_increase (:term:`number`): Factor by which the reconnect sleep delay is increased after each connection attempt. For example, 0.5 means to wait 50% longer than before the previous attempt, 1.0 means wait twice as long, and 0.0 means keep the delay constant. reconnect_sleep_max (:term:`number`): Maximum reconnect sleep delay in seconds, regardless of the `reconnect_sleep_increase` value. reconnect_sleep_jitter (:term:`number`): Random additional time to wait before a reconnect to avoid stampeding, as a percentage of the current reconnect sleep delay. For example, a value of 0.1 means to wait an extra 0%-10% of the delay calculated using the previous three parameters. keepalive (bool): Enable keepalive at the socket level. heartbeat_send_cycle (:term:`number`): Cycle time in which the client will send heartbeats to the HMC, in seconds. This time is sent to the HMC as the minimum cycle time the client can do, and the HMC returns that time as the cycle time in which it wants to receive heartbeats. The cycle time should not be less than 0.2 sec; a few seconds is a reasonable value. The special value 0 disables the sending of heartbeats to the HMC. heartbeat_receive_cycle (:term:`number`): Cycle time in which the HMC will send heartbeats to the client, in seconds. This time is sent to the HMC as the cycle time in which the client wants to receive heartbeats, and the HMC uses that time to send heartbeats. The cycle time should not be less than 0.2 sec; a few seconds is a reasonable value. The special value 0 disables heartbeat sending by the HMC and checking on the client side. heartbeat_receive_check (:term:`number`): Additional time for checking the heartbeats received from the HMC on the client, as a percentage of the 'heartbeat_receive_cycle' time. For example, a value of 0.5 means to wait an extra 50% of the 'heartbeat_receive_cycle' time. This value should not be less than 0.5, and a value of 1 or 2 is a reasonable value. """ # noqa: E501 self.connect_timeout = connect_timeout self.connect_retries = connect_retries self.reconnect_sleep_initial = reconnect_sleep_initial self.reconnect_sleep_increase = reconnect_sleep_increase self.reconnect_sleep_max = reconnect_sleep_max self.reconnect_sleep_jitter = reconnect_sleep_jitter self.keepalive = keepalive self.heartbeat_send_cycle = heartbeat_send_cycle self.heartbeat_receive_cycle = heartbeat_receive_cycle self.heartbeat_receive_check = heartbeat_receive_check _attrs = ('connect_timeout', 'connect_retries', 'reconnect_sleep_initial', 'reconnect_sleep_increase', 'reconnect_sleep_max', 'reconnect_sleep_jitter', 'keepalive', 'heartbeat_send_cycle', 'heartbeat_receive_cycle', 'heartbeat_receive_check')
[docs] def override_with(self, override_config): """ Return a new configuration object that represents the configuration from this configuration object acting as a default, and the specified configuration object overriding that default for any of its attributes that are not `None`. Parameters: override_config (:class:`~zhmcclient.StompRetryTimeoutConfig`): The configuration object overriding the defaults defined in this configuration object. Returns: :class:`~zhmcclient.StompRetryTimeoutConfig`: A new configuration object representing this configuration object, overridden by the specified configuration object. """ ret = StompRetryTimeoutConfig() for attr in StompRetryTimeoutConfig._attrs: value = getattr(self, attr) if override_config and getattr(override_config, attr) is not None: value = getattr(override_config, attr) setattr(ret, attr, value) return ret
[docs] class NotificationReceiver: """ A class for receiving HMC notifications that are published to particular HMC notification topics. **Experimental:** This class is considered experimental at this point, and its API may change incompatibly as long as it is experimental. Creating an object of this class establishes a JMS session with the HMC and subscribes for the specified HMC notification topic(s). Notification topic strings are created by the HMC in context of a particular client session (i.e. :class:`~zhmcclient.Session` object). However, these topic strings can be used by any JMS message listener that knows the topic string and that authenticates under some valid HMC userid. The HMC userid used by the JMS listener does not need to be the one that was used for the client session in which the notification topic was originally created. HMC/SE version requirements: None """ default_stomp_rt_config = StompRetryTimeoutConfig( connect_timeout=DEFAULT_STOMP_CONNECT_TIMEOUT, connect_retries=DEFAULT_STOMP_CONNECT_RETRIES, reconnect_sleep_initial=DEFAULT_STOMP_RECONNECT_SLEEP_INITIAL, reconnect_sleep_increase=DEFAULT_STOMP_RECONNECT_SLEEP_INCREASE, reconnect_sleep_max=DEFAULT_STOMP_RECONNECT_SLEEP_MAX, reconnect_sleep_jitter=DEFAULT_STOMP_RECONNECT_SLEEP_JITTER, keepalive=DEFAULT_STOMP_KEEPALIVE, heartbeat_send_cycle=DEFAULT_STOMP_HEARTBEAT_SEND_CYCLE, heartbeat_receive_cycle=DEFAULT_STOMP_HEARTBEAT_RECEIVE_CYCLE, heartbeat_receive_check=DEFAULT_STOMP_HEARTBEAT_RECEIVE_CHECK, ) def __init__(self, topic_names, host, userid, password, port=DEFAULT_STOMP_PORT, stomp_rt_config=None): """ Parameters: topic_names (:term:`string` or list/tuple thereof): Name(s) of the HMC notification topic(s). Must not be `None`. host (:term:`string`): HMC host. For valid formats, see the :attr:`~zhmcclient.Session.host` property. Must not be `None`. userid (:term:`string`): Userid for logging on to the HMC message broker. Must not be `None`. If the HMC userid is configured to use MFA, this must be the session ID of a session that user has with the HMC. Otherwise, it can either be the session ID, or the HMC userid. password (:term:`string`): Password for logging on to the HMC message broker. Must not be `None`. If `userid` specifies a session ID, this must be the session credential for that session ID. If `userid` specifies an HMC userid, this must be the password for that userid. port (:term:`integer`): STOMP TCP port. Defaults to :attr:`~zhmcclient._constants.DEFAULT_STOMP_PORT`. stomp_rt_config (:class:`~zhmcclient.StompRetryTimeoutConfig`): The STOMP retry/timeout configuration for this session, overriding any defaults. `None` for an attribute in that configuration object means that the default value will be used for that attribute. `None` for the entire `stomp_rt_config` parameter means that a default configuration will be used with the default values for all of its attributes. See :ref:`Constants` for the default values. """ if not isinstance(topic_names, (list, tuple)): topic_names = [topic_names] self._topic_names = topic_names self._host = host self._userid = userid self._password = password self._port = port self._rt_config = stomp_rt_config self._rt_config = self.default_stomp_rt_config.override_with( stomp_rt_config) # Subscription ID numbers that are in use. # Each subscription for a topic gets its own unique ID. # - key: topic name # - value: Subscription ID number self._sub_ids = {} # Next subscription ID number to be used. # After allocating a subscription ID number, this number is increased. # It is never decreased again. self._next_sub_id = 1 # Process PID, used to ensure uniqueness of subscription ID self._process_pid = os.getpid() # Thread-safe handover queue between listener thread and receiver # thread self._handover_queue = queue.Queue(10) # STOMP connection self._conn = None # Open/closed state of the receiver self._closed = False # Lazy importing of the stomp module, because the import is slow in some # versions. # pylint: disable=import-outside-toplevel import stomp self._stomp = stomp
[docs] @logged_api_call def connect(self): """ Create a listener, connect to the HMC and subscribe for the specified topics. If a connection exists with the HMC, it is first closed. Note: STOMP does not nicely recover when just performing a STOMP connect after a connection loss, because there are occurrences of ssl.SSLError: PROTOCOL_IS_SHUTDOWN. Therefore, we create a new listener as well. Raises: NotificationConnectionError: STOMP connection failed. NotificationSubscriptionError: STOMP subscription failed. """ # In case of reconnect, close the previous connection. This will also # stop the listener thread. if self._conn: JMS_LOGGER.info( "Disconnecting previous STOMP connection") self._conn.disconnect(receipt=uuid.uuid4()) # Set up the STOMP listener JMS_LOGGER.info("Setting up a STOMP connection") rt_kwargs = get_stomp_rt_kwargs(self._rt_config) self._conn = self._stomp.Connection( [(self._host, self._port)], **rt_kwargs) set_kwargs = dict() set_kwargs['ssl_version'] = ssl.PROTOCOL_TLS_CLIENT self._conn.set_ssl(for_hosts=[(self._host, self._port)], **set_kwargs) listener = _NotificationListener(self._handover_queue) self._conn.set_listener('', listener) connected = self.is_connected() JMS_LOGGER.info( "Connecting via STOMP to the HMC (currently connected: %s)", connected) try: # wait=True causes the connection to be retried for some times # and finally raises stomp.ConnectFailedException self._conn.connect(self._userid, self._password, wait=True) except Exception as exc: msg = f"STOMP connection failed: {exc.__class__.__name__}: {exc}" JMS_LOGGER.warning(msg) raise NotificationConnectionError(msg) JMS_LOGGER.info("STOMP connection successfully established") for topic_name in self._topic_names: self.subscribe(topic_name)
[docs] @logged_api_call def is_connected(self): """ Return whether this notification receiver is currently connected to the HMC. """ if self._conn: return self._conn.is_connected() return False
def _id_value(self, sub_id): """ Create the subscription ID from the subscription ID number. """ id_value = f'zhmcclient.{self._process_pid}.{id(self)}.{sub_id}' return id_value
[docs] @logged_api_call def subscribe(self, topic_name): """ Subscribe this notification receiver for a topic. Parameters: topic_name (:term:`string`): Name of the HMC notification topic. Must not be `None`. Returns: string: Subscription ID Raises: NotificationSubscriptionError: STOMP subscription failed. """ dest = "/topic/" + topic_name sub_id = self._next_sub_id self._next_sub_id += 1 self._sub_ids[topic_name] = sub_id id_value = self._id_value(sub_id) JMS_LOGGER.info( "Subscribing via STOMP for object notification topic '%s'", topic_name) try: self._conn.subscribe(destination=dest, id=id_value, ack='auto') except Exception as exc: msg = f"STOMP subscription failed: {exc.__class__.__name__}: {exc}" JMS_LOGGER.warning(msg) raise NotificationSubscriptionError(msg) return id_value
[docs] @logged_api_call def unsubscribe(self, topic_name): """ Unsubscribe this notification receiver from a topic. If the topic is not currently subscribed for by this receiver, SubscriptionNotFound is raised. Parameters: topic_name (:term:`string`): Name of the HMC notification topic. Must not be `None`. Raises: SubscriptionNotFound: Topic is not currently subscribed for. NotificationSubscriptionError: STOMP unsubscription failed. """ try: sub_id = self._sub_ids[topic_name] except KeyError: raise SubscriptionNotFound( f"Subscription topic {topic_name!r} is not currently " "subscribed for") id_value = self._id_value(sub_id) JMS_LOGGER.info( "Unsubscribing via STOMP from object notification topic '%s'", topic_name) try: self._conn.unsubscribe(id=id_value) except Exception as exc: msg = ( f"STOMP unsubscription failed: {exc.__class__.__name__}: " f"{exc}") JMS_LOGGER.warning(msg) raise NotificationSubscriptionError(msg)
[docs] @logged_api_call def is_subscribed(self, topic_name): """ Return whether this notification receiver is currently subscribed for a topic. Parameters: topic_name (:term:`string`): Name of the HMC notification topic. Must not be `None`. """ return topic_name in self._sub_ids
[docs] @logged_api_call def get_subscription(self, topic_name): """ Return the subscription ID for a topic this notification receiver is subscribed for. If the topic is not currently subscribed for by this receiver, SubscriptionNotFound is raised. Parameters: topic_name (:term:`string`): Name of the HMC notification topic. Must not be `None`. """ try: sub_id = self._sub_ids[topic_name] except KeyError: raise SubscriptionNotFound( f"Subscription topic {topic_name!r} is not currently " "subscribed for") return self._id_value(sub_id)
[docs] @logged_api_call def notifications(self): """ Generator method that yields all HMC notifications (= JMS messages) received by this notification receiver. The method connects to the HMC if needed, so after raising :exc:`~zhmcclient.NotificationConnectionError` or :exc:`stomp.exception.StompException`, it can simply be called again to reconnect and resume waiting for notifications. This method returns only when the receiver is closed (using :meth:`~zhmcclient.NotificationRecever.close`) by some other thread; any errors do not cause the method to return but always cause an exception to be raised. For an example how to use this method, see :ref:`Notifications` or the example scripts. Yields: : A tuple (headers, message) representing one HMC notification, with: * headers (dict): The notification header fields. Some important header fields (dict items) are: * 'notification-type' (string): The HMC notification type (e.g. 'os-message', 'job-completion', or others). * 'session-sequence-nr' (string): The sequence number of this HMC notification within the session created by this notification receiver object. This number starts at 0 when this receiver object is created, and is incremented each time an HMC notification is published to this receiver. * 'class' (string): The class name of the HMC resource publishing the HMC notification (e.g. 'partition'). * 'object-id' (string) or 'element-id' (string): The ID of the HMC resource publishing the HMC notification. For a complete list of notification header fields, see section "Message format" in chapter 4. "Asynchronous notification" in the :term:`HMC API` book. * message (:term:`JSON object`): Body of the HMC notification, converted into a JSON object. `None` for notifications that have no content in their response body. The properties of the JSON object vary by notification type. For a description of the JSON properties, see the sub-sections for each notification type within section "Notification message formats" in chapter 4. "Asynchronous notification" in the :term:`HMC API` book. Returns: None Raises: :exc:`~zhmcclient.NotificationJMSError`: Received JMS error from the HMC. :exc:`~zhmcclient.NotificationParseError`: Cannot parse JMS message body as JSON. :exc:`~zhmcclient.NotificationConnectionError`: Issue with STOMP connection to HMC. Detecting lost connections requires that heartbeating is enabled in the stomp retry/timeout configuration. :exc:`~zhmcclient.NotificationSubscriptionError`: STOMP subscription failed. """ # The timeout for getting an item from the handover queue. If the # timeout expires, a check for connection loss is performed and then # a new get from the handover queue. Since the connection loss # detection is based on heartbeat loss, it does not make sense to check # more often than the heartbeat receive cycle. ho_get_timeout = STOMP_MIN_CONNECTION_CHECK_TIME if self._rt_config: ho_get_timeout = max( ho_get_timeout, self._rt_config.heartbeat_receive_cycle + 1) self.connect() while True: # Get an item from the listener while True: if self._closed: return try: item = self._handover_queue.get(timeout=ho_get_timeout) except queue.Empty: # This check detects a disconnect only when heartbeating is # enabled in the stomp retry/timeout configuration. if not self._conn.is_connected(): raise NotificationConnectionError( "Lost STOMP connection to HMC") continue break # Now we have an item from the listener if item.msgtype == 'message': try: msg_obj = json.loads(item.message) except Exception as exc: raise NotificationParseError( "Cannot convert JMS message body to JSON: " f"{exc.__class__.__name__}: {exc}", item.message) elif item.msgtype == 'error': if 'message' in item.headers: # Not sure that is always the case, but it was the case # in issue #770. details = f": {item.headers['message'].strip()}" else: details = "" raise NotificationJMSError( f"Received JMS error from HMC{details}", item.headers, item.message) elif item.msgtype in ('disconnected', 'heartbeat_timeout'): # Get all contiguous such entries to handle them just once num_disc = 0 num_hbto = 0 if item.msgtype == 'disconnected': num_disc += 1 elif item.msgtype == 'heartbeat_timeout': num_hbto += 1 while True: try: item_ = self._handover_queue.get(timeout=ho_get_timeout) except queue.Empty: break if item_.msgtype == 'disconnected': num_disc += 1 elif item_.msgtype == 'heartbeat_timeout': num_hbto += 1 else: # Put the item back. # TODO: Find way to put it to the front of the queue. try: self._handover_queue.put(item_) except queue.Full: JMS_LOGGER.error( "Handover queue is full (put-back) - " "dropping %s event", item_.msgtype) break raise NotificationConnectionError( f"STOMP received {num_hbto} heartbeat timeouts and " f"{num_disc} disconnect messages") else: raise RuntimeError( f"Invalid handover item: {item.msgtype}") yield item.headers, msg_obj
[docs] @logged_api_call def close(self): """ Close the receiver and cause its :meth:`~zhmcclient.NotificationReceiver.notifications` method to return. This also disconnects the STOMP session from the HMC, unsubscribing for any topics. Raises: stomp.exception.StompException: From stomp.Connection.disconnect() """ self._closed = True self._conn.disconnect()
_NotificationItem = namedtuple( '_NotificationItem', [ 'msgtype', # str: message type: 'message', 'error', 'disconnected', # 'heartbeat_timeout' 'headers', # dict: STOMP headers (only for msgtype='message') 'message', # str: STOMP message in JSON (only for msgtype='message') ] ) class _NotificationListener: """ A notification listener class for use by the Python `stomp-py` package. This is an internal class that does not need to be accessed or created by the user. An object of this class is automatically created by the :class:`~zhmcclient.NotificationReceiver` class, for its notification topic. Note: In the stomp examples, this class inherits from stomp.ConnectionListener. However, since we want to import the stomp module in a lazy manner, we are not inheriting from that class, but repeat its methods here. """ def __init__(self, handover_queue): """ Parameters: handover_queue (Queue): Thread-safe queue between this listener object in the listener thread and the notification receiver object in the main thread. The queue items are _NotificationItem objects. """ self._handover_queue = handover_queue # Lazy importing of the stomp module, because the import is slow in some # versions. # pylint: disable=import-outside-toplevel import stomp self._stomp = stomp def on_connecting(self, host_and_port): """ Event method that gets called when the TCP/IP connection to the HMC has been established or re-established. Note that at this point, no connection has been established at the STOMP protocol level. Parameters: host_and_port (tuple(str, int)): Host name and port number to which the TCP/IP connection has been established. """ pass def on_connected(self, *frame_args): # pylint: disable=no-self-use """ Event method that gets called when a STOMP CONNECTED frame has been received from the HMC (after a TCP/IP connection has been established or re-established). Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ headers, _ = get_headers_message(frame_args) heartbeat = headers.get('heart-beat', '0,0') can_send, want_receive = map(int, heartbeat.split(',')) if can_send == 0: can_send_str = "cannot send heartbeats" else: can_send_str = f"can send heartbeats every {can_send} msec" if want_receive == 0: want_receive_str = "does not want to receive heartbeats" else: want_receive_str = \ f"wants to receive heartbeats every {want_receive} msec" JMS_LOGGER.info( "Connected. The HMC %s and %s", can_send_str, want_receive_str) def on_disconnecting(self): """ Event method that gets called before a STOMP DISCONNECT frame is sent to the HMC. """ pass # Lazy importing of the stomp module, because the import is slow in some # versions. # pylint: disable=import-outside-toplevel import stomp self._stomp = stomp def on_disconnected(self): """ Event method that gets called when the TCP/IP connection to the HMC has been lost. No messages should be sent via the connection until it has been re-established. """ # We detect disconnects in the notifications() method by checking the # connection status, because this event method is called not when the # disconnect happens, but when the connection is re-established. # And it is called twice. pass def on_heartbeat_timeout(self): """ Event method that gets called when a STOMP heartbeat has not been received from the HMC within the specified period. """ # We detect heartbeat issues in the notifications() method by checking # the connection status, because this heartbeat event method is called # not when the hearbeat is missing, but when the connection is # re-established. pass def on_before_message(self, *frame_args): """ Event method that gets called when a STOMP MESSAGE frame has been received from the HMC, but before the on_message() method is called. Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ pass def on_message(self, *frame_args): """ Event method that gets called when a STOMP MESSAGE frame has been received from the HMC (representing an HMC notification). Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ headers, message = get_headers_message(frame_args) item = _NotificationItem( headers=headers, message=message, msgtype='message') try: self._handover_queue.put(item, timeout=5) except queue.Full: JMS_LOGGER.error( "Handover queue is full - dropping 'message' event") def on_receipt(self, *frame_args): """ Event method that gets called when a STOMP RECEIPT frame has been received from the HMC. This is sent by the HMC if requested by the client using the 'receipt' header. Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ pass def on_error(self, *frame_args): """ Event method that gets called when a STOMP ERROR frame has been received from the HMC. This happens for example when the client registers for a non-existing HMC notification topic. Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ headers, message = get_headers_message(frame_args) item = _NotificationItem( headers=headers, message=message, msgtype='error') try: self._handover_queue.put(item, timeout=5) except queue.Full: JMS_LOGGER.error( "Handover queue is full - dropping 'error' event") def on_send(self, *frame_args): # pylint: disable=no-self-use """ Event method that gets called when the STOMP connection is in the process of sending a message. Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ _, message = get_headers_message(frame_args) if message is None and DEBUG_HEARTBEATS: JMS_LOGGER.info("Sending STOMP heartbeat to HMC") def on_heartbeat(self): # pylint: disable=no-self-use """ Event method that gets called when a STOMP heartbeat has been received. """ if DEBUG_HEARTBEATS: JMS_LOGGER.info("Received STOMP heartbeat from HMC") def on_receiver_loop_completed(self, *frame_args): """ Event method that gets called when the connection receiver_loop has finished. Parameters: frame_args: The STOMP frame. For details, see get_headers_message(). """ pass