The example used here is an application which is basically a Socket server serving clients with data from a queue. The use case here is to look at the app and query for number of clients connected or number of objects in the queue. pretty simple huh?
Here is the code for the app.
We will use JConsole to connect to the app process and invoke the operations defined.
Now you might say that this can be done by logging the information (queue size and number of clients). True but like I said earlier using JMX gives you a way to check on things at runtime. also you could have an operation to clear the queue for a secure user (how? check for later posts!). Also why do you want to waste cpu cycles and disk space to keep logging stuff which you may or not look for.
The Java artifacts you need are an interface that declares the methods that you want to expose, a class that implements that interface. That's it. TestJMX.java is just to create and register a JMX bean.
Interface....
package com.example.testJMX;
public interface DataSocketServerMBean {
// operations
public int numberofClients();
public int dataQueueSize();
}
The implementor...
package com.example.testJMX; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; public class DataSocketServer extends Thread implements DataSocketServerMBean { private final static Logger log = Logger.getLogger(DataSocketServer.class); private final static int LISTENING_PORT = 3000; private ServerSocket weatherDataServer; public boolean shutdown = false; protected LinkedBlockingQueue outputDataQueue; private ConnectToNewClient newClient; private boolean byteData; private int listeningPort; public DataSocketServer() throws Exception { weatherDataServer = new ServerSocket(LISTENING_PORT); log.info("Server listening on port " + LISTENING_PORT); this.start(); //start a new thread for writing data to each client socket newClient = new ConnectToNewClient(); } public DataSocketServer(LinkedBlockingQueue dataQueue) { this.outputDataQueue = dataQueue; try { weatherDataServer = new ServerSocket(LISTENING_PORT); } catch (IOException e) { log.error("Error creating ServerSocket :", e); } log.info("Server listening on port " + LISTENING_PORT); this.setName("WeatherDataSocketServerfor" + listeningPort); this.start(); //start a new thread for writing data to each client socket newClient = new ConnectToNewClient(); } public void run() { while (true) { try { log.debug("Waiting for connections."); if (weatherDataServer != null){ Socket client = weatherDataServer.accept(); log.info("Accepted a connection from: " + client.getInetAddress()); newClient.addClient(new ClientSocket(client)); } Thread.sleep(500); } catch (InterruptedException ix) { log.error("Shutting down socket server", ix); newClient.interrupt(); break; } catch (Exception e) { log.error("Error accepting connections", e); } } } public void ConnectToClient(Socket clientSocket) { } /** * Helper class to handle each client as a separate thread. */ class ConnectToNewClient extends Thread { // DECLARE private ListclientSockets = Collections.synchronizedList(new ArrayList } } /** * Socket wrapper for each client. */ class ClientSocket { // DECLARE private Socket socket = null; private ObjectInputStream ois = null; private OutputStream sos = null; private ExecutorService executor; private ObjectOutputStream oos = null; private BufferedOutputStream bos = null; // INIT public ClientSocket(Socket clientSocket) { this.socket = clientSocket; try { sos = socket.getOutputStream(); if (DataSocketServer.this.byteData) bos = new BufferedOutputStream(sos); else oos = new ObjectOutputStream(new BufferedOutputStream(sos)); } catch (Exception e1) { try { socket.close(); } catch (Exception e) { log.error("Error creating client socket :" , e); } } } public void close() { try { socket.close(); if (oos != null) oos.close(); if (bos != null) bos.close(); } catch (Exception e) { } socket = null; } public void writeObject(Object o) { try { oos.flush(); if (o instanceof byte[]){ byte[] buf = (byte[])o; bos.write(buf); } else { oos.writeObject(o); } oos.flush(); } catch (IOException e) { // close socket.. connection maybe lost. The only way to find out is when //we get a socketexception!!!!!! log.error("Error writing to socket. closing socket...", e); try { socket.close(); } catch (IOException e1) { } socket = null; } catch (Exception ex) { log.error("Error writing data to client socket for client " + this.getInetAddress(),ex); } } // UTIL public boolean checkConnection() { if (socket != null && !socket.isClosed() && socket.isConnected() && socket.isBound()) return true; return false; } // ACCESS public String getInetAddress() { if (socket != null) return socket.getInetAddress().getHostName(); return ""; } public Socket getSocket() { return socket; } public void writeBytes(byte[] buf) { try { if (socket != null) { log.info("writing data " + new String(buf)); bos.flush(); bos.write(buf); bos.flush(); } } catch (IOException e) { // close socket.. connection maybe lost. The only way to find out is when //we get a socketexception!!!!!! log.error("Error writing to socket. listening port: " + DataSocketServer.this.listeningPort + " closing socket...", e); try { socket.close(); } catch (IOException e1) { } socket = null; } catch (Exception ex) { log.error("Error writing data to client socket for client " + this.getInetAddress(),ex); } } } public int dataQueueSize() { if (outputDataQueue != null) return outputDataQueue.size(); return 0; } public int numberofClients() { if (newClient != null) return newClient.clientSockets.size(); return 0; } }()); // INIT public ConnectToNewClient() { this.setName("ConnectToNewClientFor" + DataSocketServer.this.listeningPort); this.start(); } public void addClient(ClientSocket clientSocket) { clientSockets.add(clientSocket); } public ConnectToNewClient(ArrayList clientList) { this.clientSockets = clientList; this.setName("ConnectToNewClientFor" + DataSocketServer.this.listeningPort); this.start(); } // ACTION public void run() { Object qobj = null; try { while (true) { while (clientSockets.size() > 0 && (qobj = DataSocketServer.this.outputDataQueue.peek()) != null) { log.info("Number of clients =" + clientSockets.size()); //loop through each client and write to it. for (int i = 0; i < clientSockets.size(); i++) { ClientSocket clientSocket = null; try { //check if the socket is active. clientSocket = clientSockets.get(i); if (clientSocket.checkConnection()) { DataSocketServer.log.info("Sending data to client " + clientSocket.getInetAddress()); if (DataSocketServer.this.byteData && qobj instanceof byte[]) clientSocket.writeBytes((byte[]) qobj); else clientSocket.writeObject(qobj); DataSocketServer.this.outputDataQueue.remove(qobj); } else { //remove client from list DataSocketServer.log.info("Removing client from list : " + clientSocket.getInetAddress()); clientSocket.close(); clientSockets.remove(clientSocket); clientSocket = null; } } catch (RuntimeException e) { DataSocketServer.log.error("Error sending data to client ", e); //the client socket may be dead. remove it clientSockets.remove(clientSocket); clientSocket = null; } } } Thread.sleep(1000); } } catch (Exception e) { DataSocketServer.log.error("Error :", e); DataSocketServer.log.error("outputDataQueue =" + DataSocketServer.this.outputDataQueue); DataSocketServer.log.error("qobj = " + qobj); }
and finally the TestJMX class...
package com.example.testJMX;
import java.lang.management.*;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import javax.management.*;
import org.json.JSONObject;
public class Main {
public static void main(String[] args) throws Exception {
// Get the Platform MBean Server
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
LinkedBlockingQueue testQueue = new LinkedBlockingQueue();
JSONObject sampleObj = new JSONObject();
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
sampleObj.put("The current time on host machine", new Date());
testQueue.add(sampleObj.toString());
// Construct the ObjectName for the MBean we will register
ObjectName name = new ObjectName("com.example.testJMX:type=DataSocketServer");
// Create the jmx MBean
DataSocketServer mbean = new DataSocketServer(testQueue);
// Register the jmx MBean
mbs.registerMBean(mbean, name);
// Wait forever
System.out.println("Waiting forever...");
Thread.sleep(Long.MAX_VALUE);
}
}
Now run the TestJMX java class.
Launch JConsole and connect to the process
Invoking dataQueueSize and numberofClients operations...
connect to the socket server using telnet/netcat or any of those tools.
On linux ...
netcat localhost 4233
Now invoking the same operations again gives you the following results...
No comments:
Post a Comment