00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package sensix.discovery;
00036
00037 import java.net.*;
00038 import java.io.*;
00039 import java.util.*;
00040 import sensix.*;
00041
00042
00043 public class DiscoveryService implements Self
00044 {
00045 public static final int DISCOVERY_PORT = 2999;
00046 public static final String MCAST_ADDR_IPv6 = "FF18::178";
00047 public static final String MCAST_ADDR_IPv4 = "224.0.0.178";
00048 public static final String MCAST_ADDR = MCAST_ADDR_IPv4;
00049
00050
00051 public static final int MEMORY_LIMIT = 8096;
00052 public static final int HEADER_SIZE = 8;
00053
00054
00055 public static final int TX_TIMEOUT = 12;
00056 public static final int RETRY_ITVL = 4;
00057 public static final int EMPTY_TIMEOUT = 1000;
00058
00059 public static final int MAX_PARENTS = 128;
00060 public static final int MAX_CHILDREN = 512;
00061 public static final int MAX_SIBLINGS = 128;
00062 public static final int MAX_PEERS = (MAX_CHILDREN + MAX_SIBLINGS);
00063 public static final int MAX_CAPABLE = 32;
00064
00065
00066 public static byte[] i2b(int i) {
00067 return i2b(i, 4);
00068 }
00069
00070 public static byte[] i2b(int i, int size) {
00071 byte[] b = new byte[size];
00072 for (int j = 0; j < size; j++)
00073 b[j] = (byte)(i >> (8 * ((size - 1) - j)));
00074 return b;
00075 }
00076
00077 public static int b2i(byte b[]) {
00078 return b2i(b, 4);
00079 }
00080
00081 public static int b2i(byte b[], int size) {
00082 int ret = 0;
00083 for (int j = 0; j < size; j++)
00084 ret += ((b[j] & 0x00000000ff) << (8 * ((size - 1) - j)));
00085 return ret;
00086 }
00087
00088 public static int b2i_le(byte b[]) {
00089 return b2i_le(b, 4);
00090 }
00091
00092 public static int b2i_le(byte b[], int size) {
00093 int ret = 0;
00094 for (int j = 0; j < size; j++)
00095 ret += ((b[j] & 0x00000000ff) << (8 * j));
00096 return ret;
00097 }
00098
00099
00100
00101 public static byte[] announceHeader() {
00102 byte[] hdr = new byte[HEADER_SIZE];
00103
00104 hdr[0] = (byte)'D';
00105 hdr[1] = (byte)0x10;
00106 hdr[2] = (byte)0x00;
00107 hdr[3] = (byte)0x01;
00108 byte[] len_part = DiscoveryService.i2b(5);
00109 for (int i = 0; i < 4; i++)
00110 hdr[4 + i] = len_part[i];
00111
00112 return hdr;
00113 }
00114
00115 public static byte[] reportHeader() {
00116 byte[] hdr = new byte[HEADER_SIZE];
00117
00118 hdr[0] = (byte)'D';
00119 hdr[1] = (byte)0x10;
00120 hdr[2] = (byte)0x00;
00121 hdr[3] = (byte)0x02;
00122 byte[] len_part = DiscoveryService.i2b(6);
00123 for (int i = 0; i < 4; i++)
00124 hdr[4 + i] = len_part[i];
00125
00126 return hdr;
00127 }
00128
00129 public static byte[] loadAnnounce(Announce ann) {
00130 byte[] msg = new byte[5];
00131
00132 byte[] id_part = DiscoveryService.i2b(ann.nodeIdent);
00133 for (int i = 0; i < 4; i++)
00134 msg[i] = id_part[i];
00135 msg[4] = ann.hLevel;
00136
00137 return msg;
00138 }
00139
00140 public static byte[] loadReport(Report rpt) {
00141 byte[] msg = new byte[6];
00142
00143 byte[] id_part = DiscoveryService.i2b(rpt.nodeIdent);
00144 for (int i = 0; i < 4; i++)
00145 msg[i] = id_part[i];
00146 msg[4] = rpt.hLevel;
00147 msg[5] = rpt.cType;
00148
00149 return msg;
00150 }
00151
00152 private class Listener extends Thread
00153 {
00154 private DiscoveryService serv;
00155 private MulticastSocket socket;
00156
00157 public Listener(DiscoveryService ds, MulticastSocket s) {
00158 serv = ds;
00159 socket = s;
00160 }
00161
00162 public void run() {
00163 List<Integer> heard = new ArrayList<Integer>();
00164 boolean keep_going;
00165
00166 synchronized(serv) {
00167 keep_going = serv.running;
00168 }
00169 while (keep_going) {
00170 DiscoverMessage in = null;
00171 DiscoverMessageHeader hdr = null;
00172
00173 try {
00174 byte[] raw_hdr = new byte[DiscoveryService.HEADER_SIZE];
00175 DatagramPacket packet =
00176 new DatagramPacket(raw_hdr, raw_hdr.length);
00177 socket.setSoTimeout(EMPTY_TIMEOUT);
00178 socket.receive(packet);
00179 SocketAddress sender = packet.getSocketAddress();
00180 hdr = new DiscoverMessageHeader(raw_hdr);
00181
00182 byte[] msg = new byte[hdr.announceSize];
00183 packet = new DatagramPacket(msg, hdr.announceSize);
00184 socket.connect(sender);
00185 socket.setSoTimeout(TX_TIMEOUT);
00186 socket.receive(packet);
00187 socket.disconnect();
00188 in = new DiscoverMessage(hdr, msg);
00189 serv.log(Logger.LOG.DEBUG, "Discovery msg ID:" +
00190 in.nodeIdent + " level:" + in.hLevel + " type:");
00191 }
00192 catch (SocketTimeoutException e) {
00193 socket.disconnect();
00194 synchronized(serv) {
00195 keep_going = serv.running;
00196 }
00197 continue;
00198 }
00199 catch (DiscoverMessageException dme) {
00200 serv.log(Logger.LOG.ERROR, dme.toString());
00201 synchronized(serv) {
00202 keep_going = serv.running;
00203 }
00204 continue;
00205 }
00206 catch (IOException e) {
00207 serv.log(Logger.LOG.ERROR,
00208 "Discovery listener IO exception");
00209 e.printStackTrace(System.err);
00210 synchronized(serv) {
00211 keep_going = serv.running;
00212 }
00213 continue;
00214 }
00215 if (hdr != null && in != null) {
00216 if (in.nodeIdent == node_id) {
00217 serv.log(Logger.LOG.DEBUG,
00218 "Dropped my own discover msg");
00219 synchronized(serv) {
00220 keep_going = serv.running;
00221 }
00222 continue;
00223 }
00224
00225 switch (hdr.announceType) {
00226 case 1:
00227 if (in.hLevel >= h_level) {
00228 if (in.hLevel == h_level) {
00229 boolean found = false;
00230 for (int i = 0; i < serv.siblings_size; i++) {
00231 if (serv.siblings_seen[i] == in.nodeIdent) {
00232 found = true;
00233 break;
00234 }
00235 }
00236 if (found == false &&
00237 serv.siblings_size < MAX_SIBLINGS) {
00238 serv.siblings_seen[serv.siblings_size] =
00239 in.nodeIdent;
00240 serv.siblings_size++;
00241 serv.log(Logger.LOG.DEBUG, "Added sibling");
00242 }
00243 }
00244 else {
00245 boolean found = false;
00246 for (int i = 0; i < serv.ancestors_size; i++) {
00247 if (serv.ancestors_seen[i] == in.nodeIdent) {
00248 found = true;
00249 break;
00250 }
00251 }
00252 if (found == false &&
00253 serv.ancestors_size < MAX_PARENTS) {
00254 serv.ancestors_seen[serv.ancestors_size] =
00255 in.nodeIdent;
00256 serv.ancestors_size++;
00257 serv.log(Logger.LOG.DEBUG,
00258 "Added ancestor");
00259 }
00260 }
00261
00262
00263 for (int i = 0; i < serv.refs_size; i++) {
00264 byte type = serv.references[i].type;
00265 Report out = new Report(node_id, h_level, type);
00266 serv.sendDiscovery(out);
00267 }
00268 } else if (in.hLevel == (h_level - 1)) {
00269
00270 boolean found = false;
00271 for (int i = 0; i < serv.descendants_size; i++) {
00272 if (serv.descendants_seen[i] == in.nodeIdent) {
00273 found = true;
00274 break;
00275 }
00276 }
00277 if (found == false &&
00278 serv.descendants_size < MAX_CHILDREN) {
00279 serv.descendants_seen[serv.descendants_size] =
00280 in.nodeIdent;
00281 serv.descendants_size++;
00282 serv.log(Logger.LOG.DEBUG, "Added descendant");
00283 }
00284
00285
00286 Announce out = new Announce(node_id, h_level);
00287 serv.sendDiscovery(out);
00288 }
00289 break;
00290 case 2:
00291 {
00292 if (in.hLevel == (h_level - 1)) {
00293
00294 boolean found = false;
00295 for (int i = 0; i < serv.desc_db_size; i++) {
00296 if (serv.desc_database[i].id ==
00297 in.nodeIdent &&
00298 serv.desc_database[i].type == in.cType){
00299 found = true;
00300 break;
00301 }
00302 }
00303 if (found == false) {
00304 int idx = serv.desc_db_size;
00305 serv.desc_database[idx] =
00306 new ObjectReference();
00307 serv.desc_database[idx].id = in.nodeIdent;
00308 serv.desc_database[idx].type = in.cType;
00309 serv.desc_db_size++;
00310 serv.log(Logger.LOG.DEBUG,
00311 "Added descendant capability " +
00312 SenseUtil.capabilityToString(in.cType));
00313 }
00314 }
00315 else if (in.hLevel == h_level) {
00316 boolean found = false;
00317 for (int i = 0; i < serv.sib_db_size; i++) {
00318 if (serv.sib_database[i].id ==
00319 in.nodeIdent &&
00320 serv.sib_database[i].type == in.cType) {
00321 found = true;
00322 break;
00323 }
00324 }
00325 if (found == false) {
00326 int idx = serv.sib_db_size;
00327 serv.sib_database[idx] =
00328 new ObjectReference();
00329 serv.sib_database[idx].id = in.nodeIdent;
00330 serv.sib_database[idx].type = in.cType;
00331 serv.sib_db_size++;
00332 serv.log(Logger.LOG.DEBUG,
00333 "Added sibling capability " +
00334 SenseUtil.capabilityToString(in.cType));
00335 }
00336 }
00337 }
00338 break;
00339 case 3:
00340 case 4:
00341 default:
00342 break;
00343 }
00344 }
00345
00346 synchronized(serv) {
00347 keep_going = serv.running;
00348 }
00349 }
00350 socket.close();
00351 serv.log(Logger.LOG.DEBUG, "Shutting down Discovery");
00352 }
00353 }
00354
00355
00356 public class ObjectReference
00357 {
00358 public static final int SIZE = 1024;
00359
00360 public int id;
00361 public byte type;
00362 }
00363
00364
00365
00366
00367
00368
00369 public boolean running, initialized;
00370 public ObjectReference[] references;
00371 public int refs_size;
00372 public ObjectReference[] sib_database;
00373 public int sib_db_size;
00374 public ObjectReference[] desc_database;
00375 public int desc_db_size;
00376 public int[] ancestors_seen;
00377 public int ancestors_size;
00378 public int[] siblings_seen;
00379 public int siblings_size;
00380 public int[] descendants_seen;
00381 public int descendants_size;
00382
00383 private int node_id;
00384 private byte h_level;
00385 private Others others;
00386 private Thread launch;
00387 private Logger logger;
00388 private MulticastSocket[] mcast_sockets;
00389 private Listener[] m_threads;
00390 private int service_port;
00391
00392
00393 public DiscoveryService(int id, byte level) throws DiscoveryException {
00394 this(id, level, "DiscoveryService", (byte)0, DISCOVERY_PORT);
00395 }
00396
00397 public DiscoveryService(int id, byte level, String app)
00398 throws DiscoveryException {
00399 this(id, level, app, (byte)0, DISCOVERY_PORT);
00400 }
00401
00402 public DiscoveryService(int id, byte level, String app, byte debug_level)
00403 throws DiscoveryException {
00404 this(id, level, app, debug_level, DISCOVERY_PORT);
00405 }
00406
00407 public DiscoveryService(int id, byte level, String app, byte debug_level,
00408 int port) throws DiscoveryException {
00409 running = false;
00410 initialized = false;
00411 node_id = id;
00412 h_level = level;
00413 service_port = port;
00414
00415 logger = new Logger(app, debug_level);
00416 others = new Others(this);
00417
00418 references = new ObjectReference[MAX_CAPABLE];
00419 refs_size = 0;
00420 sib_database = new ObjectReference[MAX_SIBLINGS];
00421 sib_db_size = 0;
00422 desc_database = new ObjectReference[MAX_CHILDREN];
00423 desc_db_size = 0;
00424
00425 ancestors_seen = new int[MAX_PARENTS];
00426 ancestors_size = 0;
00427 siblings_seen = new int[MAX_SIBLINGS];
00428 siblings_size = 0;
00429 descendants_seen = new int[MAX_CHILDREN];
00430 descendants_size = 0;
00431
00432 running = true;
00433 NetworkInterface[] nifs = getAllInterfaces();
00434 if (nifs.length < 1)
00435 throw new DiscoveryException(DiscoveryError.NoNetwork,
00436 "No external network interfaces found.");
00437
00438 mcast_sockets = new MulticastSocket[nifs.length];
00439 m_threads = new Listener[nifs.length];
00440
00441 log(Logger.LOG.INFO, "Discovery interfaces:");
00442 for (int i = 0; i < nifs.length; i++) {
00443 log(Logger.LOG.INFO, " " + nifs[i].getName());
00444 try {
00445 mcast_sockets[i] = new MulticastSocket(service_port);
00446 mcast_sockets[i].setReuseAddress(true);
00447 mcast_sockets[i].setNetworkInterface(nifs[i]);
00448 mcast_sockets[i].joinGroup(InetAddress.getByName(MCAST_ADDR));
00449
00450 m_threads[i] = new Listener(this, mcast_sockets[i]);
00451 m_threads[i].start();
00452 } catch (SocketException e) {
00453 throw new DiscoveryException(DiscoveryError.NoNetwork,
00454 "Socket error " + e.getMessage() +
00455 " on " + MCAST_ADDR);
00456 } catch (UnknownHostException e) {
00457 throw new DiscoveryException(DiscoveryError.NoNetwork,
00458 "Unknown host " + MCAST_ADDR +
00459 " " + e.getMessage());
00460 } catch (IOException e) {
00461 throw new DiscoveryException(DiscoveryError.NoNetwork,
00462 "IO error " + e.getMessage());
00463 }
00464 }
00465
00466 sendDiscovery(new Announce(node_id, h_level));
00467 }
00468
00469
00470 public void log(Logger.LOG type, String msg) {
00471 logger.log(type, msg);
00472 }
00473
00474 public void shutdown() {
00475 synchronized(this) {
00476 running = false;
00477 }
00478 try {
00479 for (int i = 0; i < m_threads.length; i++)
00480 m_threads[i].join();
00481 } catch (InterruptedException e) {}
00482 }
00483
00484
00485 private NetworkInterface[] getAllInterfaces() {
00486 try {
00487 int size = 0, idx = 0;
00488 NetworkInterface[] nifs;
00489 Enumeration<NetworkInterface> ife;
00490
00491 ife = NetworkInterface.getNetworkInterfaces();
00492 for (NetworkInterface nif : Collections.list(ife)) {
00493 if (nif.getName().toLowerCase().startsWith("lo") == false)
00494 size++;
00495 }
00496 if (size < 1)
00497 return null;
00498
00499 nifs = new NetworkInterface[size];
00500 ife = NetworkInterface.getNetworkInterfaces();
00501 for (NetworkInterface nif : Collections.list(ife)) {
00502 if (nif.getName().toLowerCase().startsWith("lo") == false) {
00503 nifs[idx] = nif;
00504 idx++;
00505 }
00506 }
00507 return nifs;
00508 } catch (SocketException e) {
00509 log(Logger.LOG.ERROR, "Socket exception on getNetworkInterfaces");
00510 return null;
00511 }
00512 }
00513
00514
00515 public void sendDiscovery(Announce ann) {
00516 byte[] hdr = announceHeader();
00517 byte[] msg = loadAnnounce(ann);
00518 log(Logger.LOG.DEBUG, "Sending announce- ID:" + ann.nodeIdent +
00519 " level:" + ann.hLevel);
00520 sendAnnouncement(hdr, msg);
00521 }
00522
00523 public void sendDiscovery(Report report) {
00524 byte[] hdr = reportHeader();
00525 byte[] msg = loadReport(report);
00526 log(Logger.LOG.DEBUG, "Sending report- ID:" + report.nodeIdent +
00527 " level:" + report.hLevel + " type:" + report.cType);
00528 sendAnnouncement(hdr, msg);
00529 }
00530
00531 private void sendAnnouncement(byte[] hdr, byte[] msg) {
00532 while (true) {
00533 DatagramPacket hpacket = null;
00534 DatagramPacket mpacket = null;
00535
00536 try {
00537 hpacket = new DatagramPacket(hdr, hdr.length,
00538 InetAddress.getByName(MCAST_ADDR),
00539 service_port);
00540 mpacket = new DatagramPacket(msg, msg.length,
00541 InetAddress.getByName(MCAST_ADDR),
00542 service_port);
00543 for (int i = 0; i < mcast_sockets.length; i++) {
00544 mcast_sockets[i].send(hpacket);
00545 mcast_sockets[i].send(mpacket);
00546 }
00547 break;
00548 }
00549 catch (IllegalArgumentException iae) {
00550 try {
00551 Thread.sleep(TX_TIMEOUT + RETRY_ITVL);
00552 } catch (InterruptedException ie) {}
00553 }
00554 catch (IOException e) {
00555 log(Logger.LOG.ERROR, "Discovery IO exception");
00556 e.printStackTrace(System.err);
00557 break;
00558 }
00559 }
00560 }
00561
00562
00563
00564
00565
00566
00567 public byte level() {
00568 return h_level;
00569 }
00570
00571 public int identifier() {
00572 return node_id;
00573 }
00574
00575 public byte[] capabilities() {
00576 byte[] caps = new byte[refs_size];
00577 for (int i = 0; i < refs_size; i++)
00578 caps[i] = references[i].type;
00579 return caps;
00580 }
00581
00582 public void registerObject(byte capability, sensix.Request obj)
00583 throws DiscoveryException {
00584 registerObject(capability, obj, false);
00585 }
00586
00587 public void registerObject(byte capability, sensix.Request obj,
00588 boolean override) throws DiscoveryException {
00589 boolean found = false;
00590
00591 if (obj == null)
00592 throw new DiscoveryException(DiscoveryError.InvalidObject,
00593 "Attempting to register a null object");
00594 if (refs_size > MAX_CAPABLE)
00595 throw new DiscoveryException(DiscoveryError.RegistryFull,
00596 "Registry size exceeds " + MAX_CAPABLE);
00597
00598 for (int i = 0; i < refs_size; i++) {
00599 if (references[i].type == capability) {
00600 found = true;
00601 if (override != true)
00602 throw new DiscoveryException(
00603 DiscoveryError.AlreadyRegistered,
00604 "Prior registration found for " +
00605 SenseUtil.capabilityToString(capability) + " capability");
00606 }
00607 }
00608
00609 if (found == false) {
00610 references[refs_size] = new ObjectReference();
00611 references[refs_size].id = node_id;
00612 references[refs_size].type = capability;
00613 refs_size++;
00614 sendDiscovery(new Report(node_id, h_level, capability));
00615 }
00616 }
00617
00618
00619
00620
00621
00622 public boolean findNodeInFamily(int id) throws DiscoveryException {
00623 return others.findNode(id);
00624 }
00625
00626 public sensix.Request[] queryFamilyNetwork(byte capability)
00627 throws DiscoveryException {
00628 return others.queryNetwork(capability);
00629 }
00630
00631 public sensix.Request queryFamilyNode(int id, byte capability)
00632 throws DiscoveryException {
00633 return others.queryNode(id, capability);
00634 }
00635
00636 public boolean findNodeInDescendants(int id) throws DiscoveryException {
00637 return others.descendants().findNode(id);
00638 }
00639
00640 public sensix.Request[] queryDescendantNetwork(byte capability)
00641 throws DiscoveryException {
00642 return others.descendants().queryNetwork(capability);
00643 }
00644
00645 public sensix.Request queryDescendantNode(int id, byte capability)
00646 throws DiscoveryException {
00647 return others.descendants().queryNode(id, capability);
00648 }
00649
00650 public byte[] descendantCapabilities() throws DiscoveryException {
00651 return others.descendants().capabilities();
00652 }
00653
00654 public int[] descendantNodes() throws DiscoveryException {
00655 return others.descendants().nodes();
00656 }
00657
00658 public boolean findNodeInSiblings(int id) throws DiscoveryException {
00659 return others.siblings().findNode(id);
00660 }
00661
00662 public sensix.Request[] querySiblingNetwork(byte capability)
00663 throws DiscoveryException {
00664 return others.siblings().queryNetwork(capability);
00665 }
00666
00667 public sensix.Request querySiblingNode(int id, byte capability)
00668 throws DiscoveryException {
00669 return others.siblings().queryNode(id, capability);
00670 }
00671
00672 public byte[] siblingCapabilities() throws DiscoveryException {
00673 return others.siblings().capabilities();
00674 }
00675
00676 public int[] siblingNodes() throws DiscoveryException {
00677 return others.siblings().nodes();
00678 }
00679
00680 public boolean findNodeInAncestors(int id) throws DiscoveryException {
00681 return others.ancestors().findNode(id);
00682 }
00683
00684 public int[] ancestorNodes() throws DiscoveryException {
00685 return others.ancestors().nodes();
00686 }
00687 }