1

I'm fairly new to TCP messaging (and programming in general) and I am trying to develop a simple ROUTER/DEALER message pair with ZeroMQ but am struggling in getting the router to receive a message from the dealer and send one back.

I can do a simple REQ/REP pattern with no problem, where I can send one message from my machine to my VM.

However, when trying to develop a ROUTER/DEALER pair, I can't seem to get the ROUTER-instance to receive the message (ROUTER on VM, DEALER on main box). I have had some success where I could spam 50 messages in a while(){...} loop, but can't send a single message and have the ROUTER send one back.

So from what I have read, a TCP message in a ROUTER/DEALER pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER to register an incoming message.

So I just want to send the message "ROUTER_TEST" to my server, and for my server to respond with "RECEIVED".

DEALER

#include <cstdlib>
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>

#include "zmq.h"

const char connection[] = "tcp://10.0.10.76:5555";
int main(void)
{
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    printf("\nInstalled ZeroMQ version: %d.%d.%d\n", major, minor, patch);
    printf("Connecting to: %s\n", connection);

    void *context = zmq_ctx_new();

    void *requester = zmq_socket(context, ZMQ_DEALER);

    int zc = zmq_connect(requester, connection); 
    std::cout << "zmq_connect = " << zc << std::endl;

    int sm = zmq_socket_monitor(requester, connection, ZMQ_EVENT_ALL);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;

    char messageSend[] = "ROUTER_TEST";

    int request_nbr;
    int n = zmq_send(requester, NULL, 0, ZMQ_DONTWAIT|ZMQ_SNDMORE );
    int ii = 0;
    if(n==0) {
        std::cout << "n = " << n << std::endl;
    while (ii < 50)
    {
        n = zmq_send(requester, messageSend, 31, ZMQ_DONTWAIT);

        ii++;
    }
    }

    return 0;
}

ROUTER

// SERVER
#include <cstdlib>
#include <iostream>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>

#include "zmq.h"

int main(void)
{
    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_ROUTER);
    printf("THIS IS WORKING - ROUTER\n");
    int rc = zmq_bind(responder, "tcp://*:5555");
    assert(rc == 0);

    zmq_pollitem_t pollItems[] = {
        {responder, 0, ZMQ_POLLIN, -1}};

    int sm = zmq_socket_monitor(responder, "tcp://*:5555", ZMQ_EVENT_LISTENING);
    std::cout << "zmq_socket_monitor = " << sm << std::endl;
    uint8_t buffer[15];
    while (1)
    {
        int rc = zmq_recv(responder, buffer, 5, ZMQ_DONTWAIT);
        if (rc == 0)
        {
            std::cout << "zmq_recv = " << rc << std::endl;
            zmq_send(responder, "RECIEVED", 9,0);
        }

        zmq_poll(pollItems, sizeof(pollItems), -1);

    }
    return 0;
}
user3666197
  • 1
  • 6
  • 45
  • 89

1 Answers1

0

Welcome to the Zen-of-Zero.

In case one has never worked with ZeroMQ,
one may here enjoy to first look at "ZeroMQ Principles in less than Five Seconds" before diving into further details


Your code calls, on the DEALER-side a series of:

void *requester = zmq_socket( context,
                              ZMQ_DEALER           // <--   .STO <ZMQ_DEALER>, *requester
                              );
...
int n   = zmq_send( requester, // <~~ <~~ <~~ <~~ <~~ <~~   .STO 0, n
                    NULL,     //                             NULL,sizeof(NULL)== 0
                    0,       //                              explicitly declared 0
                    ZMQ_DONTWAIT                   //           _DONTWAIT flag
                  | ZMQ_SNDMORE                    //---- 1x ZMQ_SNDMORE  flag== 
                    );                             //        1.Frame in 1st MSG
int ii  = 0;                                       //        MSG-under-CONSTRUCTION
if ( n == 0 )                                      //     ...until complete, not yet sent
{    
     std::cout << "PREVIOUS[" << ii << ".] CALL of zmq_send() has returned n = " << n << std::endl;

     while ( ii < 50 )
     {       ii++;
             n = zmq_send( requester,   //---------//---- 1x ZMQ_SNDMORE following
                           messageSend, //         //        2.Frame in 1st MSG
                           31,          //         //        MSG-under-CONSTRUCTION, but
                           ZMQ_DONTWAIT //         //        NOW complete & will get sent
                           );           //---------//----49x monoFrame MSGs follow
     }
}
...

What happens on the opposite side, the ROUTER-side code ?

...
while (1)
{       
        int  rc  = zmq_recv( responder, //----------------- 1st .recv()
                             buffer,
                             5,
                             ZMQ_DONTWAIT
                             );
        if ( rc == 0 )
        {
            std::cout << "zmq_recv = " << rc << std::endl;
            zmq_send( responder,  // _____________________ void  *socket
                      "RECEIVED", // _____________________ void  *buffer
                      9,  // _____________________________ size_t len
                      0   // _____________________________ int    flags
                      );
        }
        zmq_poll( pollItems,
                  sizeof( pollItems ),
                  -1 // __________________________________ block ( forever )
                  );//                                     till  ( if ever ) ...?
}

Here, most probably, the rc == 0 but once, if not missed, but never more

Kindly notice, that your code does not detect in any way if a .recv()-call is also being flagged by a ZMQ_RECVMORE - signaling a need to first also .recv()-all-the-rest multi-Frame parts of the first message, before becoming able to .send()-any-answer...

An application that processes multi-part messages must use the ZMQ_RCVMORE zmq_getsockopt(3) option after calling zmq_recv() to determine if there are further parts to receive.

Next, the buffer and messageSend message-"payloads" are a kind of fragile entities and ought be re-composed ( for details best read again all details about how to carefully initialise, work with and safely-touch any zmq_msg_t object(s) ), as after a successful .send()/.recv() the low level API ( since 2.11.x+ ) considers 'em disposed-off, not re-useable. Also note, that messageSend is not (as imperatively put into the code) a 31-char[]-long, was it? Was there any particular intention to do this?

The zmq_send() function shall return number of bytes in the message if successful. Otherwise it shall return -1 and set errno to one of the values defined below. { EAGAIN, ENOTSUP, EINVAL, EFSM, ETERM, ENOTSOCK, EINTR, EHOSTUNREACH }

Not testing error-state means knowing nothing about the actual state ( see EFSM and other potential trouble explainers ) of REQ/REP and DEALER/ROUTER (extended) .send()/.recv()/.send()/.recv()/... mandatory dFSA's order of these steps


"So from what I have read, a TCP message in a ROUTER/DEALER pair are sent with a delimiter of 0 at the beginning, and this 0 must be sent first to the ROUTER to register an incoming message."

This seems to be a mis-uderstood part. The app-side is free to compose any number of monoframe or multi-frame messages, yet the "trick" of a ROUTER pre-pended identity-frame is performed without users assistance ( message-labelling is performed automatically, before any ( now, principally all ) multi-frame(d) messages get delivered to the app-side ( using the receiver's side .recv()-method ). Due handling of multi-frame messages was noted above.

user3666197
  • 1
  • 6
  • 45
  • 89