1

I made an example of my situation:

I have a Client producing Doubles Numbers (Really I need send bytes) in the other side I have a Server attending the clients request (for this example only One), and creates an Worker...

The problem is that the Worker (with InputStream created according to Socket accepted by Server) is not detecting when the Client (closing the OutputStream and Socket) is terminated.

Here my code:

Client

class ProducerDouble extends Thread {
  private Socket consSocket = null;
  private OutputStream output = null;
  private byte[] innerBytes = null;
  private boolean isRunning = false;
  public ProducerDouble(String consHost , int consPort) {
    try {
      consSocket = new Socket(consHost, consPort);
    }
    catch (UnknownHostException e) {
      System.out.println("Unknown host!");
    }
    catch (IOException e) {
      System.out.println("Consumer :"+e.getMessage());
    }
    try {
      if (!(consSocket == null)) {
        output = consSocket.getOutputStream();
      }
    }
    catch (IOException e) { }
  }
  @Override
  public void run() {
    if (!(consSocket == null)) {
      isRunning = true;
      int read = 0;
      while (isRunning) {
        try {
          if (!(innerBytes == null)) {
            output.write(innerBytes, 0, innerBytes.length);
            System.out.println("ProducerDouble "+innerBytes.length +" bytes written");
            innerBytes = null;
          } else {
            try {
              Thread.sleep(2000);
              String sSending = "The generated double is:"+Double.toString(Math.random()*100.0);
              innerBytes = sSending.getBytes();
            } catch (InterruptedException ex) { }
          }
        } catch (IOException e) { }
      }
      try {
        output.close();
      } catch (IOException e) { }
      try {
        consSocket.close();
      } catch (IOException e) { }
      System.out.println("\n Exiting ProducerDouble...\n");
    }
  }

  public void stopExecute() {
    isRunning = false;
  }
}

Worker

class ConsumerDouble extends Thread {
  private Socket clientSocket = null;
  private InputStream input = null;
  private byte[] innerBytes = null;
  private boolean isRunning = false;

  public ConsumerDouble(Socket newClientSocket) {
    clientSocket = newClientSocket;
    try {
      input  = clientSocket.getInputStream();
    } catch (IOException e) { }
  }
  @Override
  public void run() {
    isRunning = true;
    while (isRunning) {
      System.out.println("ConsumerDouble running");
      try {
        if (innerBytes == null) {
          if (input.available() > 0) {
            innerBytes = new byte[input.available()];
            int read = input.read(innerBytes, 0, innerBytes.length);
            System.out.println("ConsumerDouble " + innerBytes.length 
              +" bytes read from Host ");
          } else {
            try {
              Thread.sleep(1500);
            } catch (InterruptedException ex) { }
          }
        }
      } catch (IOException e) {
        isRunning = false;
        System.out.println("\nConsumerDouble IOException:"+e.getMessage()+"\n");
      }
    }
    System.out.println("\n ConsumerDouble Request Terminated...\n");
    try {  if (input != null) input.close(); } catch (IOException e) { }
    try {  if (clientSocket != null) clientSocket.close(); } catch (IOException e) { }
  }

  public void stopExecute() {
    isRunning = false;
  }
}

Launching and Stopping the Server

if (jToggleButtonServer.isSelected()) {
  thrdSrvrDouble = new Thread() {
    @Override
    public void run() {
      try {
        SrvrSocketDouble = new ServerSocket(1023);
        System.out.println("ServerDouble Listening on port number: "+1023);
      } catch (IOException e) {
        System.out.println("Could not listen on port: "+1023);
      }
      bThrdServerDoubleRunning = true;
      while (bThrdServerDoubleRunning) {
        Socket clientSocket = null;
        try {
          clientSocket = SrvrSocketDouble.accept();
          System.out.println("New Client Address: " + clientSocket.getInetAddress() + "  Port:" + clientSocket.getPort());
        } catch (IOException e) {
          if(!bThrdServerDoubleRunning) {
            System.out.println("ServerDouble Stopped.") ;
            break;
          }
          throw new RuntimeException("Error: ServerDouble accepting client connections", e);
        }
        if (consumerDouble == null) {
          consumerDouble = new ConsumerDouble(clientSocket);
          consumerDouble.start();
        }
      }
      consumerDouble.stopExecute();
      System.out.println("\n  Exiting ServerDouble!!!");
    }
  };
  thrdSrvrDouble.start();
} else  {
  bThrdServerDoubleRunning = false;
  try {
    SrvrSocketDouble.close();
  } catch (IOException e) { }
}

Variables

static ServerSocket SrvrSocketDouble;
Thread thrdSrvrDouble = null;
static boolean bThrdServerDoubleRunning = false;
ProducerDouble producerDouble = null;

Launching and Stopping the Client (ProducerDouble)

if (jToggleButtonProducer.isSelected()) {
  String sConsHost = jtfSrClConsHost.getText();
  producerDouble = new ProducerDouble(sConsHost, 1023);
  producerDouble.start();
} else {
  producerDouble.stopExecute();
}

The OUPUT

run:
ServerDouble Listening on port number: 1023
New Client Address: /192.168.0.16  Port:51056
ConsumerDouble running
ConsumerDouble running
ProducerDouble 41 bytes written
ConsumerDouble running
ConsumerDouble 41 bytes read from Host 
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ProducerDouble 41 bytes written

 Exiting ProducerDouble...

ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ConsumerDouble running
ServerDouble Stopped.

  Exiting ServerDouble!!!

 ConsumerDouble Request Terminated...

BUILD SUCCESSFUL (total time: 2 minutes 0 seconds)

When ProducerDouble finishes Exiting ProducerDouble... at least the ConsumerDouble must be finalized too printing the Message ConsumerDouble IOException: (the message Exception)!

Question

  1. What is the reason for the exception is not thrown into the while Loop of Worker (ConsumerDouble)?
  2. How Can I solve this?

PD: I was reading...

How to detect a remote side socket close? but for me OutputStream is working detecting when InputStream is closed (the problem is with InputStream is not detecting when OutputStream is closed even if the Socket creator of output (in the other side) is closed)!

Socket close vs Inputstream close is not related to my question.

Thank you!

Community
  • 1
  • 1
QA_Col
  • 1,095
  • 9
  • 23

2 Answers2

0
  1. What is the reason for the exception is not thrown into the while Loop of Worker (ConsumerDouble)?

Why should it be? What exception? What are you expecting? The way that InputStream.read() signals end of stream is by returning -1. However as you are never testing or even displaying that value, you are never detecting end of stream. The problem is made worse by your misuse of available() which you should just remove. At present you are only calling read() when there is data to be read immediately, which will never be true at end of stream.

  1. How Can I solve this?

Test for read() returning -1.

You also have a misleading message where you claim that the length of the buffer is the number of bytes that were just read. It isn't. You are also ignoring IOException in the client. Don't do that.

user207421
  • 298,294
  • 41
  • 291
  • 462
  • Reading: https://docs.oracle.com/javase/8/docs/api/java/io/InputStream.html#available-- I was mistake with: A subclass' implementation of this method may choose to throw an IOException if this input stream has been closed by invoking the close() method. – QA_Col Feb 25 '16 at 23:18
  • The Question is: What happen when in the other side (Outputstream is not sending but remains open)? What value is returned by `inputstream(read)` 0 or -1? – QA_Col Feb 25 '16 at 23:19
  • The size array of bytes to receive bytes from `inputstread.read(ArrayBytes)` is calculated (I knpw that is not exact) according to `inputstread.available()` method. – QA_Col Feb 25 '16 at 23:35
  • @QA_Col 1. The input stream *hasn't* been closed by invoking the `close()` method. The peer has closed the connection. A remote host can't close your input stream. – user207421 Feb 26 '16 at 03:23
  • @QA_Col 2. This is not 'the question', it is a *new* question, but the answer is 'neither'. While the other side isn't sending, `read()` *blocks.* This also is in the Javadoc. – user207421 Feb 26 '16 at 03:24
  • @QA_Col 3. You can't assume that the value returned by a prior `available()` call equals the number of bytes just read by `read().` And unless you check that value you will never detect end of stream. There is no point in sizing the size of the receiving array via `available()` for every read Just use a fixed size, and the same buffer for all reads. Saves garbage too. – user207421 Feb 26 '16 at 03:24
0

You need change something in your Worker to detect when on the other side it has closed the OutputStream or Socket (InputStream counterpart to be verified), throwing Exceptions.

FIRST OPTION

Taking advantage that you do not use OutputStream (in ConsumerDouble) could force an exception sending bytes.

isRunning = true;
while (isRunning) {
  try {
    clientSocket.getOutputStream().write(0);
  } catch (IOException e) {
    isRunning = false;
    //  Software caused connection abort: socket write error
  }
  System.out.println("ConsumerDouble running");
  // from here on the rest of the code will remain the same as it is in your.

SECOND OPTION

You need to change the receive logic used

class ConsumerDouble extends Thread {
  private Socket clientSocket = null;
  private InputStream input = null;
  private byte[] innerBytes = new byte [1024];  // Is not null!
  private boolean isRunning = false;
  private int read = 0;  // moved from while loop!

Completely changing the While loop!!

    while (isRunning) {
      System.out.println("ConsumerDouble running");
      try {
        read = input.read(innerBytes, 0, innerBytes.length);
        System.out.println("ConsumerDouble " + read 
          +" bytes read from Host ");
        byte[] nBytes = new byte[read];
        System.arraycopy(innerBytes, 0, nBytes, 0, read);
        System.out.println("Received: " + new String(nBytes));
      } catch (IOException e) {
        isRunning = false;
         System.out.println("\nConsumerDouble IOException:"+e.getMessage()+"\n");
      } catch (NegativeArraySizeException e) {
        isRunning = false;
         System.out.println("\nConsumerDouble NegativeArraySizeException:"+e.getMessage()+"\n");
      }
    }

Following the recommendation...

    while (isRunning) {
      System.out.println("ConsumerDouble running");
      try {
        read = input.read(innerBytes, 0, innerBytes.length);
        if (read != -1) {
          System.out.println("ConsumerDouble " + read 
              +" bytes read from Host ");
          // this code is an example showing the message
          byte[] nBytes = new byte[read];
          System.arraycopy(innerBytes, 0, nBytes, 0, read);
          System.out.println("Received: " + new String(nBytes));
        } else {
          isRunning = false;  // the Stream was closed!
        }
      } catch (IOException e) {
        isRunning = false;
         System.out.println("\nConsumerDouble IOException:"+e.getMessage()+"\n");
      }
    }
joseluisbz
  • 2,590
  • 1
  • 30
  • 48
  • This still does not detect end of stream. You have to test for `read == -1`. The suggestion of writing a zero byte will ultimately stall if the peer isn't reading. – user207421 Feb 26 '16 at 18:57
  • You have reason, Sorry, I had omitted part of the code. Soved! when the `read = -1`, the exception NegativeArraySizeException will be thrown... – joseluisbz Feb 26 '16 at 21:23
  • 2
    Relying on a side-effect of an unnecessary piece of code is not an acceptable way to detect anything, let alone end of stream. Allocating another array and copying the data isn't necessary; is therefore liable to be removed during code review; and is therefore not an acceptable way to detect end of stream. There is only one correct way, and I've given it several times here. – user207421 Feb 27 '16 at 00:13
  • Sure! but the application is a decision of each person, I just show you an example... – joseluisbz Feb 27 '16 at 02:30
  • I don't consider that you've shown an example at all. You've shown some code that will accidentally work as long as some unnecessarily complex tracing code remains in the system. This isn't an example of anything except how not to write computer software. You could always consider fixing your code, instead of just arguing about it.. – user207421 Feb 27 '16 at 09:07
  • Instead to comment negatively all, you must to apply your appreciation according to code of the question. otherwise, it would assume we should all do things exactly as you think because your way is the only right. I invite you to put your code (embedded in the code QA_Col) with the correct way it should be solved the question of him (or her). – joseluisbz Feb 27 '16 at 16:14
  • 1
    It *is* the right way. Period. Any code reviewer would tell you the same thing, and for the same reasons I've already given you. You haven't even attempted to discuss them, let alone defend against them. If this came up in my company I would call you in and tell you, not ask you, not to write any more code like that on my client's dollar. I have already provided my own answer, and it is idle of you to now pretend otherwise. – user207421 Feb 28 '16 at 11:18