zmq_socket_monitor_versioned - Man Page

monitor socket events

Synopsis

int zmq_socket_monitor_versioned (void *socket, char *endpoint, uint64_t events, int event_version, int type);

int zmq_socket_monitor_pipes_stats (void *socket);

Description

The zmq_socket_monitor_versioned() method lets an application thread track socket events (like connects) on a ZeroMQ socket. Each call to this method creates a ZMQ_PAIR socket and binds that to the specified inproc:// endpoint. To collect the socket events, you must create your own ZMQ_PAIR socket, and connect that to the endpoint.

The events argument is a bitmask of the socket events you wish to monitor, see Supported events below. To monitor all events for a version, use the event value ZMQ_EVENT_ALL_V<version>, e.g. ZMQ_EVENT_ALL_V1. For non-DRAFT event versions, this value will not change in the future, so new event types will only be added in new versions (note that this is a change over zmq_socket_monitor and the unversioned ZMQ_EVENT_ALL).

Note that event_version 2 is currently in DRAFT mode. The protocol may be changed at any time for event_versions in DRAFT.

ZMQ_CURRENT_EVENT_VERSION and ZMQ_CURRENT_EVENT_VERSION_DRAFT are always defined to the most recent stable or DRAFT event version, which are currently 1 resp. 2

This page describes the protocol for event_version 2 only. For the protocol used with event_version 1, please refer to zmq_socket_monitor(3).

Each event is sent in multiple frames. The first frame contains an event number (64 bits). The number and content of further frames depend on this event number.

Unless it is specified differently, the second frame contains the number of value frames that will follow it as a 64 bits integer. The third frame to N-th frames contain an event value (64 bits) that provides additional data according to the event number. Each event type might have a different number of values. The second-to-last and last frames contain strings that specifies the affected connection or endpoint. The former frame contains a string denoting the local endpoint, while the latter frame contains a string denoting the remote endpoint. Either of these may be empty, depending on the event type and whether the connection uses a bound or connected local endpoint.

Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future.

The type argument is used to specify the type of the monitoring socket. Supported types are ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH. Note that consumers of the events will have to be compatible with the socket type, for instance a monitoring socket of type ZMQ_PUB will require consumers of type ZMQ_SUB. In the case that the monitoring socket type is of ZMQ_PUB, the multipart message topic is the event number, thus consumers should subscribe to the events they want to receive.

The zmq_socket_monitor_pipes_stats() method triggers an event of type ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. NOTE: zmq_socket_monitor_pipes_stats() is in DRAFT state.

Monitoring events are only generated by some transports: At the moment these
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
'zmq_socket_monitor_versioned' on a socket that is connected or bound to some
other transport, as this may not be known at the time
'zmq_socket_monitor_versioned' is called. Also, a socket can be connected/bound
to multiple endpoints using different transports.

Supported events (v1)

ZMQ_EVENT_CONNECTED
~~~~~~~~~~~~~~~~~~~
The socket has successfully connected to a remote peer. The event value
is the file descriptor (FD) of the underlying network socket. Warning:
there is no guarantee that the FD is still valid by the time your code
receives this event.

ZMQ_EVENT_CONNECT_DELAYED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request on the socket is pending. The event value is unspecified.

ZMQ_EVENT_CONNECT_RETRIED
~~~~~~~~~~~~~~~~~~~~~~~~~
A connect request failed, and is now being retried. The event value is the
reconnect interval in milliseconds. Note that the reconnect interval is
recalculated for each retry.

ZMQ_EVENT_LISTENING
~~~~~~~~~~~~~~~~~~~
The socket was successfully bound to a network interface. The event value
is the FD of the underlying network socket. Warning: there is no guarantee
that the FD is still valid by the time your code receives this event.

ZMQ_EVENT_BIND_FAILED
~~~~~~~~~~~~~~~~~~~~~
The socket could not bind to a given interface. The event value is the
errno generated by the system bind call.

ZMQ_EVENT_ACCEPTED
~~~~~~~~~~~~~~~~~~
The socket has accepted a connection from a remote peer. The event value is
the FD of the underlying network socket. Warning: there is no guarantee that
the FD is still valid by the time your code receives this event.

ZMQ_EVENT_ACCEPT_FAILED
~~~~~~~~~~~~~~~~~~~~~~~
The socket has rejected a connection from a remote peer. The event value is
the errno generated by the accept call.

ZMQ_EVENT_CLOSED
~~~~~~~~~~~~~~~~
The socket was closed. The event value is the FD of the (now closed) network
socket.

ZMQ_EVENT_CLOSE_FAILED
~~~~~~~~~~~~~~~~~~~~~~
The socket close failed. The event value is the errno returned by the system
call. Note that this event occurs only on IPC transports.

ZMQ_EVENT_DISCONNECTED
~~~~~~~~~~~~~~~~~~~~~~
The socket was disconnected unexpectedly. The event value is the FD of the
underlying network socket. Warning: this socket will be closed.

ZMQ_EVENT_MONITOR_STOPPED
~~~~~~~~~~~~~~~~~~~~~~~~~
Monitoring on this socket ended.

ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Unspecified error during handshake.
The event value is an errno.

ZMQ_EVENT_HANDSHAKE_SUCCEEDED
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake succeeded.
The event value is unspecified.

ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake failed due to some mechanism protocol
error, either between the ZMTP mechanism peers, or between the mechanism
server and the ZAP handler. This indicates a configuration or implementation
error in either peer resp. the ZAP handler.
The event value is one of the ZMQ_PROTOCOL_ERROR_* values:
ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND
ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_SEQUENCE
ZMQ_PROTOCOL_ERROR_ZMTP_KEY_EXCHANGE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_MESSAGE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_HELLO
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_INITIATE
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_READY
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_WELCOME
ZMQ_PROTOCOL_ERROR_ZMTP_INVALID_METADATA
ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC
ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH
ZMQ_PROTOCOL_ERROR_ZAP_UNSPECIFIED
ZMQ_PROTOCOL_ERROR_ZAP_MALFORMED_REPLY
ZMQ_PROTOCOL_ERROR_ZAP_BAD_REQUEST_ID
ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION
ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE
ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA

ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The ZMTP security mechanism handshake failed due to an authentication failure.
The event value is the status code returned by the ZAP handler (i.e. 300,
400 or 500).

Supported events (v2)

ZMQ_EVENT_PIPES_STATS
~~~~~~~~~~~~~~~~~~~~~
This event provides two values, the number of messages in each of the two
queues associated with the returned endpoint (respectively egress and ingress).
This event only triggers after calling the function
_zmq_socket_monitor_pipes_stats()_.
NOTE: this measurement is asynchronous, so by the time the message is received
the internal state might have already changed.
NOTE: when the monitored socket and the monitor are not used in a poll, the
event might not be delivered until an API has been called on the monitored
socket, like zmq_getsockopt for example (the option is irrelevant).
NOTE: in DRAFT state, not yet available in stable releases.



RETURN VALUE

The zmq_socket_monitor() and zmq_socket_monitor_pipes_stats() functions return a value of 0 or greater if successful. Otherwise they return -1 and set errno to one of the values defined below.

Errors - Zmq_socket_monitor()

ETERM

The 0MQ context associated with the specified socket was terminated.

EPROTONOSUPPORT

The transport protocol of the monitor endpoint is not supported. Monitor sockets are required to use the inproc:// transport.

EINVAL

The monitor endpoint supplied does not exist or the specified socket type is not supported.

Errors - Zmq_socket_monitor_pipes_stats()

ENOTSOCK

The socket parameter was not a valid 0MQ socket.

EINVAL

The socket did not have monitoring enabled.

EAGAIN

The monitored socket did not have any connections to monitor yet.

Example

Monitoring client and server sockets.

//  Read one event off the monitor socket; return values and addresses
//  by reference, if not null, and event number by value. Returns -1
//  in case of error.

static uint64_t
get_monitor_event (void *monitor, uint64_t **value, char **local_address, char **remote_address)
{
    //  First frame in message contains event number
    zmq_msg_t msg;
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    uint64_t event;
    memcpy (&event, zmq_msg_data (&msg), sizeof (event));
    zmq_msg_close (&msg);

    //  Second frame in message contains the number of values
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    uint64_t value_count;
    memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
    zmq_msg_close (&msg);

    if (value) {
        *value = (uint64_t *) malloc (value_count * sizeof (uint64_t));
        assert (*value);
    }

    for (uint64_t i = 0; i < value_count; ++i) {
        //  Subsequent frames in message contain event values
        zmq_msg_init (&msg);
        if (zmq_msg_recv (&msg, monitor, 0) == -1)
            return -1;              //  Interrupted, presumably
        assert (zmq_msg_more (&msg));

        if (value && *value)
            memcpy (&(*value)[i], zmq_msg_data (&msg), sizeof (uint64_t));
        zmq_msg_close (&msg);
    }

    //  Second-to-last frame in message contains local address
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (zmq_msg_more (&msg));

    if (local_address_) {
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        size_t size = zmq_msg_size (&msg);
        *local_address_ = (char *) malloc (size + 1);
        memcpy (*local_address_, data, size);
        (*local_address_)[size] = 0;
    }
    zmq_msg_close (&msg);

    //  Last frame in message contains remote address
    zmq_msg_init (&msg);
    if (zmq_msg_recv (&msg, monitor, 0) == -1)
        return -1;              //  Interrupted, presumably
    assert (!zmq_msg_more (&msg));

    if (remote_address_) {
        uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
        size_t size = zmq_msg_size (&msg);
        *remote_address_ = (char *) malloc (size + 1);
        memcpy (*remote_address_, data, size);
        (*remote_address_)[size] = 0;
    }
    zmq_msg_close (&msg);

    return event;
}

int main (void)
{
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  We'll monitor these two sockets
    void *client = zmq_socket (ctx, ZMQ_DEALER);
    assert (client);
    void *server = zmq_socket (ctx, ZMQ_DEALER);
    assert (server);

    //  Socket monitoring only works over inproc://
    int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2);
    assert (rc == -1);
    assert (zmq_errno () == EPROTONOSUPPORT);

    //  Monitor all events on client and server sockets
    rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2);
    assert (rc == 0);
    rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2);
    assert (rc == 0);

    //  Create two sockets for collecting monitor events
    void *client_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (client_mon);
    void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
    assert (server_mon);

    //  Connect these to the inproc endpoints so they'll get events
    rc = zmq_connect (client_mon, "inproc://monitor-client");
    assert (rc == 0);
    rc = zmq_connect (server_mon, "inproc://monitor-server");
    assert (rc == 0);

    //  Now do a basic ping test
    rc = zmq_bind (server, "tcp://127.0.0.1:9998");
    assert (rc == 0);
    rc = zmq_connect (client, "tcp://127.0.0.1:9998");
    assert (rc == 0);
    bounce (client, server);

    //  Close client and server
    close_zero_linger (client);
    close_zero_linger (server);

    //  Now collect and check events from both sockets
    int event = get_monitor_event (client_mon, NULL, NULL);
    if (event == ZMQ_EVENT_CONNECT_DELAYED)
        event = get_monitor_event (client_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_CONNECTED);
    event = get_monitor_event (client_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_MONITOR_STOPPED);

    //  This is the flow of server events
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_LISTENING);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_ACCEPTED);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_CLOSED);
    event = get_monitor_event (server_mon, NULL, NULL);
    assert (event == ZMQ_EVENT_MONITOR_STOPPED);

    //  Close down the sockets
    close_zero_linger (client_mon);
    close_zero_linger (server_mon);
    zmq_ctx_term (ctx);

    return 0 ;
}

See Also

zmq(7)

Authors

This page was written by the 0MQ community. To make a change please read the 0MQ Contribution Policy at http://www.zeromq.org/docs:contributing.

Referenced By

zmq_socket_monitor(3).

10/09/2023 0MQ 4.3.5 0MQ Manual