00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package sensix;
00036
00037 import java.io.*;
00038 import java.net.*;
00039 import java.util.*;
00040
00041
00042 public class SensixNetworking
00043 {
00044 public static final int SENSIX_PORT = 2989;
00045 public static final String MCAST_ADDR_IPv6 = "FF18::178";
00046 public static final String MCAST_ADDR_IPv4 = "224.0.0.178";
00047 public static final String MCAST_ADDR = MCAST_ADDR_IPv4;
00048
00049
00050 public static final int TX_TIMEOUT = 12;
00051 public static final int RETRY_ITVL = 4;
00052 public static final int EMPTY_TIMEOUT = 1000;
00053
00054
00055 private class Listener extends Thread
00056 {
00057 private SensixMarshalling marshal;
00058 private SensixNetworking net;
00059 private MulticastSocket socket;
00060
00061 public Listener(SensixNetworking n, SensixMarshalling m,
00062 MulticastSocket s) {
00063 net = n;
00064 marshal = m;
00065 socket = s;
00066 }
00067
00068 public void run() {
00069 boolean keep_going;
00070
00071 synchronized(net) {
00072 keep_going = net.running;
00073 }
00074 while (keep_going) {
00075 byte[] raw_msg = new byte[SensixMarshalling.MAX_CMD_SIZE];
00076 DatagramPacket packet = null;
00077 try {
00078 packet = new DatagramPacket(raw_msg, raw_msg.length);
00079 socket.setSoTimeout(EMPTY_TIMEOUT);
00080 socket.receive(packet);
00081 SocketAddress sender = packet.getSocketAddress();
00082 }
00083 catch (SocketTimeoutException e) {
00084 socket.disconnect();
00085 synchronized(net) {
00086 keep_going = net.running;
00087 }
00088 continue;
00089 }
00090 catch (IOException e) {
00091 net.log(Logger.LOG.ERROR,
00092 "Sensix network listener IO exception: " + e);
00093 e.printStackTrace(System.err);
00094 synchronized(net) {
00095 keep_going = net.running;
00096 }
00097 continue;
00098 }
00099 marshal.parse_packet(raw_msg, packet.getLength());
00100
00101 synchronized(net) {
00102 keep_going = net.running;
00103 }
00104 }
00105 socket.close();
00106 net.log(Logger.LOG.DEBUG, "Shutting down Sensix network");
00107 }
00108 }
00109
00110
00111 public boolean running;
00112 private SensixMarshalling marshal;
00113 private Logger logger;
00114 private MulticastSocket[] mcast_sockets;
00115 private Listener[] m_threads;
00116
00117 public SensixNetworking(SensixMarshalling m, String app, byte debug)
00118 throws Exception {
00119 marshal = m;
00120 logger = new Logger(app, debug);
00121
00122 NetworkInterface[] nifs = getAllInterfaces();
00123 if (nifs.length < 1)
00124 throw new Exception("No external network interfaces found.");
00125
00126 running = true;
00127 mcast_sockets = new MulticastSocket[nifs.length];
00128 m_threads = new Listener[nifs.length];
00129
00130 log(Logger.LOG.INFO, "Networking interfaces:");
00131 for (int i = 0; i < nifs.length; i++) {
00132 log(Logger.LOG.INFO, " " + nifs[i].getName());
00133 try {
00134 mcast_sockets[i] = new MulticastSocket(SENSIX_PORT);
00135 mcast_sockets[i].setReuseAddress(true);
00136 mcast_sockets[i].setNetworkInterface(nifs[i]);
00137 mcast_sockets[i].joinGroup(InetAddress.getByName(MCAST_ADDR));
00138
00139 m_threads[i] = new Listener(this, marshal, mcast_sockets[i]);
00140 m_threads[i].start();
00141 } catch (SocketException e) {
00142 throw new Exception("Socket error " + e.getMessage() +
00143 " on " + MCAST_ADDR);
00144 } catch (UnknownHostException e) {
00145 throw new Exception("Unknown host " + MCAST_ADDR +
00146 " " + e.getMessage());
00147 } catch (IOException e) {
00148 throw new Exception("IO error " + e.getMessage());
00149 }
00150 }
00151 }
00152
00153 public int identifier() {
00154 return marshal.identifier();
00155 }
00156
00157 public byte level() {
00158 return marshal.level();
00159 }
00160
00161 public void log(Logger.LOG type, String msg) {
00162 logger.log(type, msg);
00163 }
00164
00165 public void shutdown() {
00166 synchronized(this) {
00167 running = false;
00168 }
00169 try {
00170 for (int i = 0; i < m_threads.length; i++)
00171 m_threads[i].join();
00172 } catch (InterruptedException e) {}
00173 }
00174
00175
00176 private NetworkInterface[] getAllInterfaces() {
00177 try {
00178 int size = 0, idx = 0;
00179 NetworkInterface[] nifs;
00180 Enumeration<NetworkInterface> ife;
00181
00182 ife = NetworkInterface.getNetworkInterfaces();
00183 for (NetworkInterface nif : Collections.list(ife)) {
00184 if (nif.getName().toLowerCase().startsWith("lo") == false)
00185 size++;
00186 }
00187 if (size < 1)
00188 return null;
00189
00190 nifs = new NetworkInterface[size];
00191 ife = NetworkInterface.getNetworkInterfaces();
00192 for (NetworkInterface nif : Collections.list(ife)) {
00193 if (nif.getName().toLowerCase().startsWith("lo") == false) {
00194 nifs[idx] = nif;
00195 idx++;
00196 }
00197 }
00198 return nifs;
00199 } catch (SocketException e) {
00200 log(Logger.LOG.ERROR, "Socket exception on getNetworkInterfaces: "
00201 + e);
00202 return null;
00203 }
00204 }
00205
00206 private void send(byte[] msg) {
00207 while (true) {
00208 DatagramPacket mpacket = null;
00209 try {
00210 mpacket = new DatagramPacket(msg, msg.length,
00211 InetAddress.getByName(MCAST_ADDR),
00212 SENSIX_PORT);
00213 for (int i = 0; i < mcast_sockets.length; i++)
00214 mcast_sockets[i].send(mpacket);
00215 break;
00216 }
00217 catch (IllegalArgumentException iae) {
00218 try {
00219 Thread.sleep(TX_TIMEOUT + RETRY_ITVL);
00220 } catch (InterruptedException ie) {}
00221 }
00222 catch (IOException e) {
00223 log(Logger.LOG.ERROR, "Sensix network IO exception: " + e);
00224 e.printStackTrace(System.err);
00225 break;
00226 }
00227 }
00228 }
00229 }