ZBEACON(3) | CZMQ Manual | ZBEACON(3) |
NAME¶
zbeacon - Class for LAN discovery and presence
SYNOPSIS¶
// Create new zbeacon actor instance: // // zactor_t *beacon = zactor_new (zbeacon, NULL); // // Destroy zbeacon instance: // // zactor_destroy (&beacon); // // Enable verbose logging of commands and activity: // // zstr_send (beacon, "VERBOSE"); // // Configure beacon to run on specified UDP port, and return the name of // the host, which can be used as endpoint for incoming connections. To // force the beacon to operate on a given interface, set the environment // variable ZSYS_INTERFACE, or call zsys_set_interface() before creating // the beacon. If the system does not support UDP broadcasts (lacking a // workable interface), returns an empty hostname: // // // Pictures: 's' = C string, 'i' = int // zsock_send (beacon, "si", "CONFIGURE", port_number); // char *hostname = zstr_recv (beacon); // // Start broadcasting a beacon at a specified interval in msec. The beacon // data can be at most UDP_FRAME_MAX bytes; this constant is defined in // zsys.h to be 255: // // // Pictures: 'b' = byte * data + size_t size // zsock_send (beacon, "sbi", "PUBLISH", data, size, interval); // // Stop broadcasting the beacon: // // zstr_sendx (beacon, "SILENCE", NULL); // // Start listening to beacons from peers. The filter is used to do a prefix // match on received beacons, to remove junk. Note that any received data // that is identical to our broadcast beacon_data is discarded in any case. // If the filter size is zero, we get all peer beacons: // // zsock_send (beacon, "sb", "SUBSCRIBE", filter_data, filter_size); // // Stop listening to other peers // // zstr_sendx (beacon, "UNSUBSCRIBE", NULL); // // Receive next beacon from a peer. Received beacons are always a 2-frame // message containing the ipaddress of the sender, and then the binary // beacon data as published by the sender: // // zmsg_t *msg = zmsg_recv (beacon); // // This is the zbeacon constructor as a zactor_fn: CZMQ_EXPORT void
zbeacon (zsock_t *pipe, void *unused); // Self test of this class CZMQ_EXPORT void
zbeacon_test (bool verbose); Please add '@interface' section in './../src/zbeacon.c'.
DESCRIPTION¶
The zbeacon class implements a peer-to-peer discovery service for local networks. A beacon can broadcast and/or capture service announcements using UDP messages on the local area network. This implementation uses IPv4 UDP broadcasts. You can define the format of your outgoing beacons, and set a filter that validates incoming beacons. Beacons are sent and received asynchronously in the background.
This class replaces zbeacon_v2, and is meant for applications that use the CZMQ v3 API (meaning, zsock).
EXAMPLE¶
From zbeacon_test method.
// Test 1 - two beacons, one speaking, one listening // Create speaker beacon to broadcast our service zactor_t *speaker = zactor_new (zbeacon, NULL); assert (speaker); if (verbose)
zstr_sendx (speaker, "VERBOSE", NULL); zsock_send (speaker, "si", "CONFIGURE", 9999); char *hostname = zstr_recv (speaker); if (!*hostname) {
printf ("OK (skipping test, no UDP broadcasting)\n");
zactor_destroy (&speaker);
freen (hostname);
return; } freen (hostname); // Create listener beacon on port 9999 to lookup service zactor_t *listener = zactor_new (zbeacon, NULL); assert (listener); if (verbose)
zstr_sendx (listener, "VERBOSE", NULL); zsock_send (listener, "si", "CONFIGURE", 9999); hostname = zstr_recv (listener); assert (*hostname); freen (hostname); // We will broadcast the magic value 0xCAFE byte announcement [2] = { 0xCA, 0xFE }; zsock_send (speaker, "sbi", "PUBLISH", announcement, 2, 100); // We will listen to anything (empty subscription) zsock_send (listener, "sb", "SUBSCRIBE", "", 0); // Wait for at most 1/2 second if there's no broadcasting zsock_set_rcvtimeo (listener, 500); char *ipaddress = zstr_recv (listener); if (ipaddress) {
zframe_t *content = zframe_recv (listener);
assert (zframe_size (content) == 2);
assert (zframe_data (content) [0] == 0xCA);
assert (zframe_data (content) [1] == 0xFE);
zframe_destroy (&content);
zstr_free (&ipaddress);
zstr_sendx (speaker, "SILENCE", NULL); } zactor_destroy (&listener); zactor_destroy (&speaker); // Test subscription filter using a 3-node setup zactor_t *node1 = zactor_new (zbeacon, NULL); assert (node1); zsock_send (node1, "si", "CONFIGURE", 5670); hostname = zstr_recv (node1); assert (*hostname); freen (hostname); zactor_t *node2 = zactor_new (zbeacon, NULL); assert (node2); zsock_send (node2, "si", "CONFIGURE", 5670); hostname = zstr_recv (node2); assert (*hostname); freen (hostname); zactor_t *node3 = zactor_new (zbeacon, NULL); assert (node3); zsock_send (node3, "si", "CONFIGURE", 5670); hostname = zstr_recv (node3); assert (*hostname); freen (hostname); zsock_send (node1, "sbi", "PUBLISH", "NODE/1", 6, 250); zsock_send (node2, "sbi", "PUBLISH", "NODE/2", 6, 250); zsock_send (node3, "sbi", "PUBLISH", "RANDOM", 6, 250); zsock_send (node1, "sb", "SUBSCRIBE", "NODE", 4); // Poll on three API sockets at once zpoller_t *poller = zpoller_new (node1, node2, node3, NULL); assert (poller); int64_t stop_at = zclock_mono () + 1000; while (zclock_mono () < stop_at) {
long timeout = (long) (stop_at - zclock_mono ());
if (timeout < 0)
timeout = 0;
void *which = zpoller_wait (poller, timeout);
if (which) {
assert (which == node1);
char *ipaddress, *received;
zstr_recvx (node1, &ipaddress, &received, NULL);
assert (streq (received, "NODE/2"));
zstr_free (&ipaddress);
zstr_free (&received);
} } zpoller_destroy (&poller); // Stop listening zstr_sendx (node1, "UNSUBSCRIBE", NULL); // Stop all node broadcasts zstr_sendx (node1, "SILENCE", NULL); zstr_sendx (node2, "SILENCE", NULL); zstr_sendx (node3, "SILENCE", NULL); // Destroy the test nodes zactor_destroy (&node1); zactor_destroy (&node2); zactor_destroy (&node3); // Unset multicast zsys_set_ipv4_mcast_address (NULL); // Test 1 - two beacons, one speaking, one listening // Create speaker beacon to broadcast our service zactor_t *speaker = zactor_new (zbeacon, NULL); assert (speaker); if (verbose)
zstr_sendx (speaker, "VERBOSE", NULL); zsock_send (speaker, "si", "CONFIGURE", 9999); char *hostname = zstr_recv (speaker); if (!*hostname) {
printf ("OK (skipping test, no UDP broadcasting)\n");
zactor_destroy (&speaker);
freen (hostname);
return; } freen (hostname); // Create listener beacon on port 9999 to lookup service zactor_t *listener = zactor_new (zbeacon, NULL); assert (listener); if (verbose)
zstr_sendx (listener, "VERBOSE", NULL); zsock_send (listener, "si", "CONFIGURE", 9999); hostname = zstr_recv (listener); assert (*hostname); freen (hostname); // We will broadcast the magic value 0xCAFE byte announcement [2] = { 0xCA, 0xFE }; zsock_send (speaker, "sbi", "PUBLISH", announcement, 2, 100); // We will listen to anything (empty subscription) zsock_send (listener, "sb", "SUBSCRIBE", "", 0); // Wait for at most 1/2 second if there's no broadcasting zsock_set_rcvtimeo (listener, 500); char *ipaddress = zstr_recv (listener); if (ipaddress) {
zframe_t *content = zframe_recv (listener);
assert (zframe_size (content) == 2);
assert (zframe_data (content) [0] == 0xCA);
assert (zframe_data (content) [1] == 0xFE);
zframe_destroy (&content);
zstr_free (&ipaddress);
zstr_sendx (speaker, "SILENCE", NULL); } zactor_destroy (&listener); zactor_destroy (&speaker); // Test subscription filter using a 3-node setup zactor_t *node1 = zactor_new (zbeacon, NULL); assert (node1); zsock_send (node1, "si", "CONFIGURE", 5670); hostname = zstr_recv (node1); assert (*hostname); freen (hostname); zactor_t *node2 = zactor_new (zbeacon, NULL); assert (node2); zsock_send (node2, "si", "CONFIGURE", 5670); hostname = zstr_recv (node2); assert (*hostname); freen (hostname); zactor_t *node3 = zactor_new (zbeacon, NULL); assert (node3); zsock_send (node3, "si", "CONFIGURE", 5670); hostname = zstr_recv (node3); assert (*hostname); freen (hostname); zsock_send (node1, "sbi", "PUBLISH", "NODE/1", 6, 250); zsock_send (node2, "sbi", "PUBLISH", "NODE/2", 6, 250); zsock_send (node3, "sbi", "PUBLISH", "RANDOM", 6, 250); zsock_send (node1, "sb", "SUBSCRIBE", "NODE", 4); // Poll on three API sockets at once zpoller_t *poller = zpoller_new (node1, node2, node3, NULL); assert (poller); int64_t stop_at = zclock_mono () + 1000; while (zclock_mono () < stop_at) {
long timeout = (long) (stop_at - zclock_mono ());
if (timeout < 0)
timeout = 0;
void *which = zpoller_wait (poller, timeout);
if (which) {
assert (which == node1);
char *ipaddress, *received;
zstr_recvx (node1, &ipaddress, &received, NULL);
assert (streq (received, "NODE/2"));
zstr_free (&ipaddress);
zstr_free (&received);
} } zpoller_destroy (&poller); // Stop listening zstr_sendx (node1, "UNSUBSCRIBE", NULL); // Stop all node broadcasts zstr_sendx (node1, "SILENCE", NULL); zstr_sendx (node2, "SILENCE", NULL); zstr_sendx (node3, "SILENCE", NULL); // Destroy the test nodes zactor_destroy (&node1); zactor_destroy (&node2); zactor_destroy (&node3); #if defined (__WINDOWS__) zsys_shutdown(); #endif
AUTHORS¶
The czmq manual was written by the authors in the AUTHORS file.
RESOURCES¶
Main web site:
Report bugs to the email <zeromq-dev@lists.zeromq.org[1]>
COPYRIGHT¶
Copyright (c) the Contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for 0MQ: http://czmq.zeromq.org. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.
NOTES¶
- 1.
- zeromq-dev@lists.zeromq.org
03/05/2021 | CZMQ 4.2.1 |