wikipedia

Support Wikipedia

Monday, September 21, 2009

Bare bones JMX

Some times you wish there was a way to monitor objects in a app while it is running to check for potential problems, or just to troubleshoot a problem. Well JMX can do that and much more. JMX can also do notification for example if a Queue is full or database is out of space, things like that. But I wanted to get started with just being able to monitor an object whenever I wished to.

The example used here is an application which is basically a Socket server serving clients with data from a queue. The use case here is to look at the app and query for number of clients connected or number of objects in the queue. pretty simple huh?

Here is the code for the app.
We will use JConsole to connect to the app process and invoke the operations defined.

Now you might say that this can be done by logging the information (queue size and number of clients). True but like I said earlier using JMX gives you a way to check on things at runtime. also you could have an operation to clear the queue for a secure user (how? check for later posts!). Also why do you want to waste cpu cycles and disk space to keep logging stuff which you may or not look for.

The Java artifacts you need are an interface that declares the methods that you want to expose, a class that implements that interface. That's it. TestJMX.java is just to create and register a JMX bean.

Interface....
package com.example.testJMX;

public interface DataSocketServerMBean {
 // operations
 public int numberofClients();
 public int dataQueueSize();
}

The implementor...

package com.example.testJMX;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
 

 

 

 
import org.apache.log4j.Logger;

public class DataSocketServer extends Thread implements DataSocketServerMBean {
 private final static Logger log = Logger.getLogger(DataSocketServer.class);
 private final static int LISTENING_PORT = 3000;
 private ServerSocket weatherDataServer;
 public boolean shutdown = false;
 protected LinkedBlockingQueue outputDataQueue;
 private ConnectToNewClient newClient;
 private boolean byteData;
 private int listeningPort;

 public DataSocketServer() throws Exception {
  weatherDataServer = new ServerSocket(LISTENING_PORT);
  log.info("Server listening on port " + LISTENING_PORT);
  this.start();
  //start a new thread for writing data to each client socket
  newClient = new ConnectToNewClient();
 }

 public DataSocketServer(LinkedBlockingQueue dataQueue) { 
  this.outputDataQueue = dataQueue;

  try {
   weatherDataServer = new ServerSocket(LISTENING_PORT);
  } catch (IOException e) {
   log.error("Error creating ServerSocket :", e);
  }
  log.info("Server listening on port " + LISTENING_PORT);
  this.setName("WeatherDataSocketServerfor" + listeningPort);
  this.start();
  //start a new thread for writing data to each client socket
  newClient = new ConnectToNewClient();
 }

 public void run() {
  while (true) {
   try {
    log.debug("Waiting for connections.");
    if (weatherDataServer != null){
     Socket client = weatherDataServer.accept();
     log.info("Accepted a connection from: " + client.getInetAddress());
     newClient.addClient(new ClientSocket(client));
    }
    Thread.sleep(500);
   } catch (InterruptedException ix) {
    log.error("Shutting down socket server", ix);
    newClient.interrupt();
    break;
   } catch (Exception e) {
    log.error("Error accepting connections", e);
   }
  }
 }

 public void ConnectToClient(Socket clientSocket) {

 }

 /**
  * Helper class to handle each client as a separate thread.
  */
 class ConnectToNewClient extends Thread {
  // DECLARE
  private List clientSockets = Collections.synchronizedList(new ArrayList());

  // INIT
  public ConnectToNewClient() {
   this.setName("ConnectToNewClientFor" + DataSocketServer.this.listeningPort);
   this.start();
  }

  public void addClient(ClientSocket clientSocket) {
   clientSockets.add(clientSocket);
  }

  public ConnectToNewClient(ArrayList clientList) {
   this.clientSockets = clientList;
   this.setName("ConnectToNewClientFor" + DataSocketServer.this.listeningPort);
   this.start();
  }

  // ACTION
  public void run() {
   Object qobj = null;
   try {
    while (true) {
     while (clientSockets.size() > 0 && (qobj = DataSocketServer.this.outputDataQueue.peek()) != null) {
      log.info("Number of clients =" + clientSockets.size());
      //loop through each client and write to it.
      for (int i = 0; i < clientSockets.size(); i++) {
       ClientSocket clientSocket = null;
       try {
        //check if the socket is active.
        clientSocket = clientSockets.get(i);
        if (clientSocket.checkConnection()) {
         DataSocketServer.log.info("Sending data to client "
           + clientSocket.getInetAddress());
         if (DataSocketServer.this.byteData && qobj instanceof byte[])
          clientSocket.writeBytes((byte[]) qobj);
         else
          clientSocket.writeObject(qobj);
         DataSocketServer.this.outputDataQueue.remove(qobj);
        } else {
         //remove client from list
         DataSocketServer.log.info("Removing client from list : "
           + clientSocket.getInetAddress());
         clientSocket.close();
         clientSockets.remove(clientSocket);
         clientSocket = null;
        }
       } catch (RuntimeException e) {
        DataSocketServer.log.error("Error sending data to client ", e);
        //the client socket may be dead. remove it
        clientSockets.remove(clientSocket);
        clientSocket = null;
       }
      }
     }
     Thread.sleep(1000);
    }
   } catch (Exception e) {
    DataSocketServer.log.error("Error :", e);
    DataSocketServer.log.error("outputDataQueue =" + DataSocketServer.this.outputDataQueue);
    DataSocketServer.log.error("qobj = " + qobj);
   }
}
}

/**
* Socket wrapper for each client.
*/
class ClientSocket {
// DECLARE
private Socket socket = null;
private ObjectInputStream ois = null;
private OutputStream sos = null;
private ExecutorService executor;
private ObjectOutputStream oos = null;
private BufferedOutputStream bos = null;

// INIT
public ClientSocket(Socket clientSocket) {
this.socket = clientSocket;
try {
sos = socket.getOutputStream();
if (DataSocketServer.this.byteData)
bos   = new BufferedOutputStream(sos);
else
oos = new ObjectOutputStream(new BufferedOutputStream(sos));
} catch (Exception e1) {
try {
socket.close();
} catch (Exception e) {
log.error("Error creating client socket :" , e);
}
}
}

public void close() {
try {
socket.close();
if (oos != null)
oos.close();
if (bos != null)
bos.close();
} catch (Exception e) {
}
socket = null;
}

public void writeObject(Object o) {
try {
oos.flush();
if (o instanceof byte[]){
byte[] buf = (byte[])o;
bos.write(buf);
} else {
oos.writeObject(o);
}
oos.flush();
} catch (IOException e) {
// close socket.. connection maybe lost. The only way to find out is when
//we get a socketexception!!!!!!
log.error("Error writing to socket. closing socket...", e);
try {
socket.close();
} catch (IOException e1) {
}
socket = null;
} catch (Exception ex) {
log.error("Error writing data to client socket for client " + this.getInetAddress(),ex);
}
}

// UTIL
public boolean checkConnection() {
if (socket != null && !socket.isClosed() && socket.isConnected() && socket.isBound())
return true;
return false;
}

// ACCESS
public String getInetAddress() {
if (socket != null)
return socket.getInetAddress().getHostName();
return "";
}

public Socket getSocket() {
return socket;
}

public void writeBytes(byte[] buf) {
try {
if (socket != null) {
log.info("writing data " + new String(buf));
bos.flush();
bos.write(buf);
bos.flush();
}
} catch (IOException e) {
// close socket.. connection maybe lost. The only way to find out is when
//we get a socketexception!!!!!!
log.error("Error writing to socket. listening port: " + DataSocketServer.this.listeningPort + " closing socket...", e);
try {
socket.close();
} catch (IOException e1) {
}
socket = null;
} catch (Exception ex) {
log.error("Error writing data to client socket for client " + this.getInetAddress(),ex);
}
}
}

public int dataQueueSize() {
if (outputDataQueue != null)
return outputDataQueue.size();
return 0;
}

public int numberofClients() {
if (newClient != null)
return newClient.clientSockets.size();
return 0;
}

}

and finally the TestJMX class...
package com.example.testJMX;

import java.lang.management.*;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;

import javax.management.*;

import org.json.JSONObject;

public class Main {
    public static void main(String[] args) throws Exception {
    // Get the Platform MBean Server
    MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

    LinkedBlockingQueue testQueue = new LinkedBlockingQueue();
    JSONObject sampleObj = new JSONObject();
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    sampleObj.put("The current time on host machine", new Date());
    testQueue.add(sampleObj.toString());
    
    // Construct the ObjectName for the MBean we will register
    ObjectName name = new ObjectName("com.example.testJMX:type=DataSocketServer");

    // Create the jmx MBean
    DataSocketServer mbean = new DataSocketServer(testQueue);

    // Register the jmx MBean
    mbs.registerMBean(mbean, name);

    // Wait forever
    System.out.println("Waiting forever...");
    Thread.sleep(Long.MAX_VALUE);
    }
} 

Now run the TestJMX java class.
Launch JConsole and connect to the process
Invoking dataQueueSize and numberofClients operations...













connect to the socket server using telnet/netcat or any of those tools.
On linux ...
netcat localhost 4233
Now invoking the same operations again gives you the following results...


No comments:

Post a Comment