ZGOSSIP(3) | CZMQ Manual | ZGOSSIP(3) |
NAME¶
zgossip - Class for decentralized configuration management
SYNOPSIS¶
// To work with zgossip, use the CZMQ zactor API: // // Create new zgossip instance, passing logging prefix: // // zactor_t *zgossip = zactor_new (zgossip, "myname"); // // Destroy zgossip instance // // zactor_destroy (&zgossip); // // Enable verbose logging of commands and activity: // // zstr_send (zgossip, "VERBOSE"); // // Bind zgossip to specified endpoint. TCP endpoints may specify // the port number as "*" to acquire an ephemeral port: // // zstr_sendx (zgossip, "BIND", endpoint, NULL); // // Return assigned port number, specifically when BIND was done using an // an ephemeral port: // // zstr_sendx (zgossip, "PORT", NULL); // char *command, *port_str; // zstr_recvx (zgossip, &command, &port_str, NULL); // assert (streq (command, "PORT")); // // Specify configuration file to load, overwriting any previous loaded // configuration file or options: // // zstr_sendx (zgossip, "LOAD", filename, NULL); // // Set configuration path value: // // zstr_sendx (zgossip, "SET", path, value, NULL); // // Save configuration data to config file on disk: // // zstr_sendx (zgossip, "SAVE", filename, NULL); // // Send zmsg_t instance to zgossip: // // zactor_send (zgossip, &msg); // // Receive zmsg_t instance from zgossip: // // zmsg_t *msg = zactor_recv (zgossip); // // This is the zgossip constructor as a zactor_fn: // CZMQ_EXPORT void
zgossip (zsock_t *pipe, void *args); // Self test of this class CZMQ_EXPORT void
zgossip_test (bool verbose); Please add '@interface' section in './../src/zgossip.c'.
DESCRIPTION¶
Implements a gossip protocol for decentralized configuration management. Your applications nodes form a loosely connected network (which can have cycles), and publish name/value tuples. Each node re-distributes the new tuples it receives, so that the entire network eventually achieves a consistent state. The current design does not expire tuples.
Provides these commands (sent as multipart strings to the actor):
Returns these messages:
The gossip protocol distributes information around a loosely-connected network of gossip services. The information consists of name/value pairs published by applications at any point in the network. The goal of the gossip protocol is to create eventual consistency between all the using applications.
The name/value pairs (tuples) can be used for configuration data, for status updates, for presence, or for discovery. When used for discovery, the gossip protocol works as an alternative to e.g. UDP beaconing.
The gossip network consists of a set of loosely-coupled nodes that exchange tuples. Nodes can be connected across arbitrary transports, so the gossip network can have nodes that communicate over inproc, over IPC, and/or over TCP, at the same time.
Each node runs the same stack, which is a server-client hybrid using a modified Harmony pattern (from Chapter 8 of the Guide): http://zguide.zeromq.org/page:all#True-Peer-Connectivity-Harmony-Pattern
Each node provides a ROUTER socket that accepts client connections on an key defined by the application via a BIND command. The state machine for these connections is in zgossip.xml, and the generated code is in zgossip_engine.inc.
Each node additionally creates outbound connections via DEALER sockets to a set of servers ("remotes"), and under control of the calling app, which sends CONNECT commands for each configured remote.
The messages between client and server are defined in zgossip_msg.xml. We built this stack using the zeromq/zproto toolkit.
To join the gossip network, a node connects to one or more peers. Each peer acts as a forwarder. This loosely-coupled network can scale to thousands of nodes. However the gossip protocol is NOT designed to be efficient, and should not be used for application data, as the same tuples may be sent many times across the network.
The basic logic of the gossip service is to accept PUBLISH messages from its owning application, and to forward these to every remote, and every client it talks to. When a node gets a duplicate tuple, it throws it away. When a node gets a new tuple, it stores it, and forwards it as just described.
At present there is no way to expire tuples from the network.
The assumptions in this design are:
EXAMPLE¶
From zgossip_test method.
// Test basic client-to-server operation of the protocol zactor_t *server = zactor_new (zgossip, "server"); assert (server); if (verbose)
zstr_send (server, "VERBOSE"); zstr_sendx (server, "BIND", "inproc://zgossip", NULL); zsock_t *client = zsock_new (ZMQ_DEALER); assert (client); zsock_set_rcvtimeo (client, 2000); int rc = zsock_connect (client, "inproc://zgossip"); assert (rc == 0); // Send HELLO, which gets no message zgossip_msg_t *message = zgossip_msg_new (); zgossip_msg_set_id (message, ZGOSSIP_MSG_HELLO); zgossip_msg_send (message, client); // Send PING, expect PONG back zgossip_msg_set_id (message, ZGOSSIP_MSG_PING); zgossip_msg_send (message, client); zgossip_msg_recv (message, client); assert (zgossip_msg_id (message) == ZGOSSIP_MSG_PONG); zgossip_msg_destroy (&message); zactor_destroy (&server); zsock_destroy (&client); // Test peer-to-peer operations zactor_t *base = zactor_new (zgossip, "base"); assert (base); if (verbose)
zstr_send (base, "VERBOSE"); // Set a 100msec timeout on clients so we can test expiry zstr_sendx (base, "SET", "server/timeout", "100", NULL); zstr_sendx (base, "BIND", "inproc://base", NULL); zactor_t *alpha = zactor_new (zgossip, "alpha"); assert (alpha); if (verbose)
zstr_send (alpha, "VERBOSE"); zstr_sendx (alpha, "CONNECT", "inproc://base", NULL); zstr_sendx (alpha, "PUBLISH", "inproc://alpha-1", "service1", NULL); zstr_sendx (alpha, "PUBLISH", "inproc://alpha-2", "service2", NULL); zactor_t *beta = zactor_new (zgossip, "beta"); assert (beta); if (verbose)
zstr_send (beta, "VERBOSE"); zstr_sendx (beta, "CONNECT", "inproc://base", NULL); zstr_sendx (beta, "PUBLISH", "inproc://beta-1", "service1", NULL); zstr_sendx (beta, "PUBLISH", "inproc://beta-2", "service2", NULL); // got nothing zclock_sleep (200); zstr_send (alpha, "STATUS"); char *command, *status, *key, *value; zstr_recvx (alpha, &command, &key, &value, NULL); assert (streq (command, "DELIVER")); assert (streq (key, "inproc://alpha-1")); assert (streq (value, "service1")); zstr_free (&command); zstr_free (&key); zstr_free (&value); zstr_recvx (alpha, &command, &key, &value, NULL); assert (streq (command, "DELIVER")); assert (streq (key, "inproc://alpha-2")); assert (streq (value, "service2")); zstr_free (&command); zstr_free (&key); zstr_free (&value); zstr_recvx (alpha, &command, &key, &value, NULL); assert (streq (command, "DELIVER")); assert (streq (key, "inproc://beta-1")); assert (streq (value, "service1")); zstr_free (&command); zstr_free (&key); zstr_free (&value); zstr_recvx (alpha, &command, &key, &value, NULL); assert (streq (command, "DELIVER")); assert (streq (key, "inproc://beta-2")); assert (streq (value, "service2")); zstr_free (&command); zstr_free (&key); zstr_free (&value); zstr_recvx (alpha, &command, &status, NULL); assert (streq (command, "STATUS")); assert (atoi (status) == 4); zstr_free (&command); zstr_free (&status); zactor_destroy (&base); zactor_destroy (&alpha); zactor_destroy (&beta); #ifdef CZMQ_BUILD_DRAFT_API // DRAFT-API: Security // curve if (zsys_has_curve()) {
if (verbose)
printf("testing CURVE support");
zclock_sleep (2000);
zactor_t *auth = zactor_new(zauth, NULL);
assert (auth);
if (verbose) {
zstr_sendx (auth, "VERBOSE", NULL);
zsock_wait (auth);
}
zstr_sendx(auth,"ALLOW","127.0.0.1",NULL);
zsock_wait(auth);
zstr_sendx (auth, "CURVE", CURVE_ALLOW_ANY, NULL);
zsock_wait (auth);
server = zactor_new (zgossip, "server");
if (verbose)
zstr_send (server, "VERBOSE");
assert (server);
zcert_t *client1_cert = zcert_new ();
zcert_t *server_cert = zcert_new ();
zstr_sendx (server, "SET PUBLICKEY", zcert_public_txt (server_cert), NULL);
zstr_sendx (server, "SET SECRETKEY", zcert_secret_txt (server_cert), NULL);
zstr_sendx (server, "ZAP DOMAIN", "TEST", NULL);
zstr_sendx (server, "BIND", "tcp://127.0.0.1:*", NULL);
zstr_sendx (server, "PORT", NULL);
zstr_recvx (server, &command, &value, NULL);
assert (streq (command, "PORT"));
int port = atoi (value);
zstr_free (&command);
zstr_free (&value);
char endpoint [32];
sprintf (endpoint, "tcp://127.0.0.1:%d", port);
zactor_t *client1 = zactor_new (zgossip, "client");
if (verbose)
zstr_send (client1, "VERBOSE");
assert (client1);
zstr_sendx (client1, "SET PUBLICKEY", zcert_public_txt (client1_cert), NULL);
zstr_sendx (client1, "SET SECRETKEY", zcert_secret_txt (client1_cert), NULL);
zstr_sendx (client1, "ZAP DOMAIN", "TEST", NULL);
const char *public_txt = zcert_public_txt (server_cert);
zstr_sendx (client1, "CONNECT", endpoint, public_txt, NULL);
zstr_sendx (client1, "PUBLISH", "tcp://127.0.0.1:9001", "service1", NULL);
zclock_sleep (500);
zstr_send (server, "STATUS");
zclock_sleep (500);
zstr_recvx (server, &command, &key, &value, NULL);
assert (streq (command, "DELIVER"));
assert (streq (value, "service1"));
zstr_free (&command);
zstr_free (&key);
zstr_free (&value);
zstr_sendx (client1, "$TERM", NULL);
zstr_sendx (server, "$TERM", NULL);
zclock_sleep(500);
zcert_destroy (&client1_cert);
zcert_destroy (&server_cert);
zactor_destroy (&client1);
zactor_destroy (&server);
zactor_destroy (&auth); } #endif #if defined (__WINDOWS__) zsys_shutdown(); #endif
AUTHORS¶
The czmq manual was written by the authors in the AUTHORS file.
RESOURCES¶
Main web site:
Report bugs to the email <zeromq-dev@lists.zeromq.org[1]>
COPYRIGHT¶
Copyright (c) the Contributors as noted in the AUTHORS file. This file is part of CZMQ, the high-level C binding for 0MQ: http://czmq.zeromq.org. This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. LICENSE included with the czmq distribution.
NOTES¶
- 1.
- zeromq-dev@lists.zeromq.org
03/05/2021 | CZMQ 4.2.1 |