00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package gov.lanl.isr.sensix.discovery;
00036
00037 import java.net.*;
00038 import java.io.*;
00039 import java.util.*;
00040 import org.omg.CORBA.*;
00041 import org.omg.PortableServer.*;
00042 import com.sun.corba.se.impl.orbutil.*;
00043 import gov.lanl.isr.sensix.*;
00044
00045
00046 public class DiscoveryService extends SelfPOA
00047 {
00048 public static final int DISCOVERY_PORT = 2999;
00049 public static final String MCAST_ADDR_IPv6 = "FF18::178";
00050 public static final String MCAST_ADDR_IPv4 = "224.0.0.178";
00051 public static final String MCAST_ADDR = MCAST_ADDR_IPv6;
00052 public static final boolean INSTRUMENT = true;
00053 public static final boolean DUMP_TO_FILE = false;
00054
00055
00056 public static final int MEMORY_LIMIT = 8096;
00057 public static final int HEADER_SIZE = 8;
00058
00059
00060 public static final int TX_TIMEOUT = 12;
00061 public static final int RETRY_ITVL = 4;
00062 public static final int EMPTY_TIMEOUT = 1000;
00063
00064 public static final int MAX_PARENTS = 128;
00065 public static final int MAX_CHILDREN = 512;
00066 public static final int MAX_SIBLINGS = 128;
00067 public static final int MAX_PEERS = (MAX_CHILDREN + MAX_SIBLINGS);
00068 public static final int MAX_CAPABLE = 32;
00069
00070
00071 public static byte[] i2b(int i) {
00072 return i2b(i, 4);
00073 }
00074
00075 public static byte[] i2b(int i, int size) {
00076 byte[] b = new byte[size];
00077 for (int j = 0; j < size; j++)
00078 b[j] = (byte)(i >> (8 * ((size - 1) - j)));
00079 return b;
00080 }
00081
00082 public static int b2i(byte b[]) {
00083 return b2i(b, 4);
00084 }
00085
00086 public static int b2i(byte b[], int size) {
00087 int ret = 0;
00088 for (int j = 0; j < size; j++)
00089 ret += ((b[j] & 0x00000000ff) << (8 * ((size - 1) - j)));
00090 return ret;
00091 }
00092
00093 public static int b2i_le(byte b[]) {
00094 return b2i_le(b, 4);
00095 }
00096
00097 public static int b2i_le(byte b[], int size) {
00098 int ret = 0;
00099 for (int j = 0; j < size; j++)
00100 ret += ((b[j] & 0x00000000ff) << (8 * j));
00101 return ret;
00102 }
00103
00104
00105
00106 public static byte[] announceHeader() {
00107 byte[] hdr = new byte[HEADER_SIZE];
00108
00109 hdr[0] = (byte)('D' & 0xff);
00110 hdr[1] = (byte)0x10;
00111 hdr[2] = (byte)0x00;
00112 hdr[3] = (byte)0x01;
00113 byte[] len_part = DiscoveryService.i2b(5);
00114 for (int i = 0; i < 4; i++)
00115 hdr[4 + i] = len_part[i];
00116
00117 return hdr;
00118 }
00119
00120 public static byte[] reportHeader() {
00121 byte[] hdr = new byte[HEADER_SIZE];
00122
00123 hdr[0] = (byte)('D' & 0xff);
00124 hdr[1] = (byte)0x10;
00125 hdr[2] = (byte)0x00;
00126 hdr[3] = (byte)0x02;
00127 byte[] len_part = DiscoveryService.i2b(6);
00128 for (int i = 0; i < 4; i++)
00129 hdr[4 + i] = len_part[i];
00130
00131 return hdr;
00132 }
00133
00134 public static byte[] requireHeader() {
00135 byte[] hdr = new byte[HEADER_SIZE];
00136
00137 hdr[0] = (byte)('D' & 0xff);
00138 hdr[1] = (byte)0x10;
00139 hdr[2] = (byte)0x00;
00140 hdr[3] = (byte)0x03;
00141 byte[] len_part = DiscoveryService.i2b(10);
00142 for (int i = 0; i < 4; i++)
00143 hdr[4 + i] = len_part[i];
00144
00145 return hdr;
00146 }
00147
00148 public static byte[] shareHeader(Share shr) {
00149 byte[] ior_part = shr.ior.getBytes();
00150 byte[] hdr = new byte[HEADER_SIZE];
00151
00152 hdr[0] = (byte)('D' & 0xff);
00153 hdr[1] = (byte)0x10;
00154 hdr[2] = (byte)0x00;
00155 hdr[3] = (byte)0x04;
00156 byte[] len_part = DiscoveryService.i2b(6 + ior_part.length);
00157 for (int i = 0; i < 4; i++)
00158 hdr[4 + i] = len_part[i];
00159
00160 return hdr;
00161 }
00162
00163
00164 public static byte[] loadAnnounce(Announce ann) {
00165 byte[] msg = new byte[5];
00166
00167 byte[] id_part = DiscoveryService.i2b(ann.nodeIdent);
00168 for (int i = 0; i < 4; i++)
00169 msg[i] = id_part[i];
00170 msg[4] = ann.hLevel;
00171
00172 return msg;
00173 }
00174
00175 public static byte[] loadReport(Report rpt) {
00176 byte[] msg = new byte[6];
00177
00178 byte[] id_part = DiscoveryService.i2b(rpt.nodeIdent);
00179 for (int i = 0; i < 4; i++)
00180 msg[i] = id_part[i];
00181 msg[4] = rpt.hLevel;
00182 msg[5] = rpt.cType;
00183
00184 return msg;
00185 }
00186
00187 public static byte[] loadRequire(Require req) {
00188 byte[] msg = new byte[10];
00189
00190 byte[] id_part = DiscoveryService.i2b(req.nodeIdent);
00191 for (int i = 0; i < 4; i++)
00192 msg[i] = id_part[i];
00193 msg[4] = req.hLevel;
00194 msg[5] = req.cType;
00195 byte[] target_part = DiscoveryService.i2b(req.targetIdent);
00196 for (int i = 0; i < 4; i++)
00197 msg[6 + i] = target_part[i];
00198
00199 return msg;
00200 }
00201
00202 public static byte[] loadShare(Share shr) {
00203 byte[] ior_part = shr.ior.getBytes();
00204 byte[] msg = new byte[14 + ior_part.length];
00205
00206 byte[] id_part = DiscoveryService.i2b(shr.nodeIdent);
00207 for (int i = 0; i < 4; i++)
00208 msg[i] = id_part[i];
00209 msg[4] = shr.hLevel;
00210 msg[5] = shr.cType;
00211
00212 for (int i = 0; i < ior_part.length; i++)
00213 msg[6 + i] = ior_part[i];
00214
00215 return msg;
00216 }
00217
00218
00219 private class Listener extends Thread
00220 {
00221 private DiscoveryService serv;
00222 private MulticastSocket socket;
00223
00224 public Listener(DiscoveryService ds, MulticastSocket s) {
00225 serv = ds;
00226 socket = s;
00227 }
00228
00229 public void run() {
00230 List<Integer> heard = new ArrayList<Integer>();
00231 boolean keep_going;
00232
00233 synchronized(serv) {
00234 keep_going = serv.running;
00235 }
00236 while (keep_going) {
00237 DiscoverMessage in = null;
00238 DiscoverMessageHeader hdr = null;
00239
00240 try {
00241 byte[] raw_hdr = new byte[DiscoveryService.HEADER_SIZE];
00242 DatagramPacket packet =
00243 new DatagramPacket(raw_hdr, raw_hdr.length);
00244 socket.setSoTimeout(EMPTY_TIMEOUT);
00245 socket.receive(packet);
00246 SocketAddress sender = packet.getSocketAddress();
00247 hdr = new DiscoverMessageHeader(raw_hdr);
00248
00249 byte[] msg = new byte[hdr.announceSize];
00250 packet = new DatagramPacket(msg, hdr.announceSize);
00251 socket.connect(sender);
00252 socket.setSoTimeout(TX_TIMEOUT);
00253 socket.receive(packet);
00254 socket.disconnect();
00255 in = new DiscoverMessage(hdr, msg);
00256 serv.log(Logger.LOG.DEBUG, "Discovery msg ID:" +
00257 in.nodeIdent + " level:" + in.hLevel + " type:" +
00258 in.cType + " ior:" + in.ior);
00259 }
00260 catch (SocketTimeoutException e) {
00261 socket.disconnect();
00262 synchronized(serv) {
00263 keep_going = serv.running;
00264 }
00265 continue;
00266 }
00267 catch (DiscoverMessageException dme) {
00268 serv.log(Logger.LOG.ERROR, dme.toString());
00269 synchronized(serv) {
00270 keep_going = serv.running;
00271 }
00272 continue;
00273 }
00274 catch (IOException e) {
00275 serv.log(Logger.LOG.ERROR,
00276 "Discovery listener IO exception");
00277 e.printStackTrace(System.err);
00278 synchronized(serv) {
00279 keep_going = serv.running;
00280 }
00281 continue;
00282 }
00283 if (hdr != null && in != null) {
00284 if (in.nodeIdent == node_id) {
00285 serv.log(Logger.LOG.DEBUG,
00286 "Dropped my own discover msg");
00287 synchronized(serv) {
00288 keep_going = serv.running;
00289 }
00290 continue;
00291 }
00292
00293 switch (hdr.announceType) {
00294 case 1:
00295 if (in.hLevel >= h_level) {
00296 if (in.hLevel == h_level) {
00297 boolean found = false;
00298 for (int i = 0; i < serv.siblings_size; i++) {
00299 if (serv.siblings_seen[i] == in.nodeIdent) {
00300 found = true;
00301 break;
00302 }
00303 }
00304 if (found == false &&
00305 serv.siblings_size < MAX_SIBLINGS) {
00306 serv.siblings_seen[serv.siblings_size] =
00307 in.nodeIdent;
00308 serv.siblings_size++;
00309 serv.log(Logger.LOG.DEBUG, "Added sibling");
00310 }
00311 }
00312 else {
00313 boolean found = false;
00314 for (int i = 0; i < serv.ancestors_size; i++) {
00315 if (serv.ancestors_seen[i] == in.nodeIdent) {
00316 found = true;
00317 break;
00318 }
00319 }
00320 if (found == false &&
00321 serv.ancestors_size < MAX_PARENTS) {
00322 serv.ancestors_seen[serv.ancestors_size] =
00323 in.nodeIdent;
00324 serv.ancestors_size++;
00325 serv.log(Logger.LOG.DEBUG,
00326 "Added ancestor");
00327 }
00328 }
00329
00330
00331 for (int i = 0; i < serv.refs_size; i++) {
00332 byte type = serv.references[i].type;
00333 Report out = new Report(node_id, h_level, type);
00334 serv.sendDiscovery(out);
00335 }
00336 } else if (in.hLevel == (h_level - 1)) {
00337
00338 boolean found = false;
00339 for (int i = 0; i < serv.descendants_size; i++) {
00340 if (serv.descendants_seen[i] == in.nodeIdent) {
00341 found = true;
00342 break;
00343 }
00344 }
00345 if (found == false &&
00346 serv.descendants_size < MAX_CHILDREN) {
00347 serv.descendants_seen[serv.descendants_size] =
00348 in.nodeIdent;
00349 serv.descendants_size++;
00350 serv.log(Logger.LOG.DEBUG, "Added descendant");
00351 }
00352
00353
00354 Announce out = new Announce(node_id, h_level);
00355 serv.sendDiscovery(out);
00356 }
00357 break;
00358 case 2:
00359 {
00360 if (in.hLevel == (h_level - 1)) {
00361
00362 boolean found = false;
00363 for (int i = 0; i < serv.desc_db_size; i++) {
00364 if (serv.desc_database[i].id ==
00365 in.nodeIdent &&
00366 serv.desc_database[i].type == in.cType){
00367 found = true;
00368 break;
00369 }
00370 }
00371 if (found == false) {
00372 int idx = serv.desc_db_size;
00373 serv.desc_database[idx] =
00374 new ObjectReference();
00375 serv.desc_database[idx].id = in.nodeIdent;
00376 serv.desc_database[idx].type = in.cType;
00377 serv.desc_database[idx].ior = null;
00378 serv.desc_database[idx].valid = false;
00379 serv.desc_db_size++;
00380 serv.log(Logger.LOG.DEBUG,
00381 "Added descendant capability " +
00382 SenseUtil.capabilityToString(in.cType));
00383 }
00384 }
00385 else if (in.hLevel == h_level) {
00386 boolean found = false;
00387 for (int i = 0; i < serv.sib_db_size; i++) {
00388 if (serv.sib_database[i].id ==
00389 in.nodeIdent &&
00390 serv.sib_database[i].type == in.cType) {
00391 found = true;
00392 break;
00393 }
00394 }
00395 if (found == false) {
00396 int idx = serv.sib_db_size;
00397 serv.sib_database[idx] =
00398 new ObjectReference();
00399 serv.sib_database[idx].id = in.nodeIdent;
00400 serv.sib_database[idx].type = in.cType;
00401 serv.sib_database[idx].ior = null;
00402 serv.sib_database[idx].valid = false;
00403 serv.sib_db_size++;
00404 serv.log(Logger.LOG.DEBUG,
00405 "Added sibling capability " +
00406 SenseUtil.capabilityToString(in.cType));
00407 }
00408 }
00409
00410 Require out = new Require(node_id, h_level,
00411 in.cType, in.nodeIdent);
00412 serv.sendDiscovery(out);
00413 }
00414 break;
00415 case 3:
00416 if (in.targetIdent == node_id) {
00417 for (int i = 0; i < serv.refs_size; i++) {
00418 if (serv.references[i].type == in.cType) {
00419 Share out =
00420 new Share(node_id, h_level, in.cType,
00421 serv.references[i].ior);
00422 serv.sendDiscovery(out);
00423 }
00424 }
00425 }
00426 break;
00427 case 4:
00428 if (in.hLevel == (h_level - 1)) {
00429
00430 boolean found = false;
00431 for (int i = 0; i < serv.desc_db_size; i++) {
00432 if (serv.desc_database[i].id == in.nodeIdent &&
00433 serv.desc_database[i].type == in.cType) {
00434 found = true;
00435 if (serv.desc_database[i].ior == null) {
00436 serv.desc_database[i].ior = in.ior;
00437 serv.desc_database[i].valid = true;
00438 }
00439 break;
00440 }
00441 }
00442 if (found == false) {
00443 int idx = serv.desc_db_size;
00444 serv.desc_database[idx] = new ObjectReference();
00445 serv.desc_database[idx].id = in.nodeIdent;
00446 serv.desc_database[idx].type = in.cType;
00447 serv.desc_database[idx].ior = in.ior;
00448 serv.desc_database[idx].valid = true;
00449 serv.desc_db_size++;
00450 serv.log(Logger.LOG.DEBUG,
00451 "Added descendant IOR for " +
00452 SenseUtil.capabilityToString(in.cType));
00453 }
00454 }
00455 else if (in.hLevel == h_level) {
00456 boolean found = false;
00457 for (int i = 0; i < serv.sib_db_size; i++) {
00458 if (serv.sib_database[i].id == in.nodeIdent &&
00459 serv.sib_database[i].type == in.cType) {
00460 found = true;
00461 if (serv.sib_database[i].ior == null) {
00462 serv.sib_database[i].ior = in.ior;
00463 serv.sib_database[i].valid = true;
00464 }
00465 break;
00466 }
00467 }
00468 if (found == false) {
00469 int idx = serv.sib_db_size;
00470 serv.sib_database[idx] = new ObjectReference();
00471 serv.sib_database[idx].id = in.nodeIdent;
00472 serv.sib_database[idx].type = in.cType;
00473 serv.sib_database[idx].ior = in.ior;
00474 serv.sib_database[idx].valid = true;
00475 serv.sib_db_size++;
00476 serv.log(Logger.LOG.DEBUG,
00477 "Added sibling IOR for " +
00478 SenseUtil.capabilityToString(in.cType));
00479 }
00480 }
00481 break;
00482 default:
00483 break;
00484 }
00485 }
00486
00487 synchronized(serv) {
00488 keep_going = serv.running;
00489 }
00490 }
00491 socket.close();
00492 serv.log(Logger.LOG.DEBUG, "Shutting down Discovery");
00493 }
00494 }
00495
00496
00497 public class ObjectReference
00498 {
00499 public static final int SIZE = 1024;
00500
00501 public int id;
00502 public byte type;
00503 public String ior;
00504 public boolean valid;
00505 }
00506
00507
00508
00509
00510
00511
00512 public ORB orb;
00513 public POA poa;
00514 public boolean running, initialized;
00515 public ObjectReference[] references;
00516 public int refs_size;
00517 public ObjectReference[] sib_database;
00518 public int sib_db_size;
00519 public ObjectReference[] desc_database;
00520 public int desc_db_size;
00521 public int[] ancestors_seen;
00522 public int ancestors_size;
00523 public int[] siblings_seen;
00524 public int siblings_size;
00525 public int[] descendants_seen;
00526 public int descendants_size;
00527
00528 private int node_id;
00529 private byte h_level;
00530 private OthersImpl others;
00531 private Thread launch;
00532 private Logger logger;
00533 private MulticastSocket[] mcast_sockets;
00534 private Listener[] m_threads;
00535 private int service_port;
00536
00537
00538 public DiscoveryService(ORB orb, int id, byte level)
00539 throws org.omg.CORBA.SystemException, org.omg.CORBA.UserException,
00540 DiscoveryException {
00541 this(orb, id, level, "DiscoveryService", (byte)0, DISCOVERY_PORT);
00542 }
00543
00544 public DiscoveryService(ORB orb, int id, byte level, String app)
00545 throws org.omg.CORBA.SystemException, org.omg.CORBA.UserException,
00546 DiscoveryException {
00547 this(orb, id, level, app, (byte)0, DISCOVERY_PORT);
00548 }
00549
00550 public DiscoveryService(ORB orb, int id, byte level,
00551 String app, byte debug_level)
00552 throws org.omg.CORBA.SystemException, org.omg.CORBA.UserException,
00553 DiscoveryException {
00554 this(orb, id, level, app, debug_level, DISCOVERY_PORT);
00555 }
00556
00557 public DiscoveryService(ORB orb_val, int id, byte level,
00558 String app, byte debug_level, int port)
00559 throws org.omg.CORBA.SystemException, org.omg.CORBA.UserException,
00560 DiscoveryException {
00561 running = false;
00562 initialized = false;
00563 node_id = id;
00564 h_level = level;
00565 service_port = port;
00566 orb = orb_val;
00567
00568 logger = new Logger(app, debug_level);
00569 others = new OthersImpl(this);
00570
00571 references = new ObjectReference[MAX_CAPABLE];
00572 refs_size = 0;
00573 sib_database = new ObjectReference[MAX_SIBLINGS];
00574 sib_db_size = 0;
00575 desc_database = new ObjectReference[MAX_CHILDREN];
00576 desc_db_size = 0;
00577
00578 ancestors_seen = new int[MAX_PARENTS];
00579 ancestors_size = 0;
00580 siblings_seen = new int[MAX_SIBLINGS];
00581 siblings_size = 0;
00582 descendants_seen = new int[MAX_CHILDREN];
00583 descendants_size = 0;
00584
00585 POA rootPOA =
00586 POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
00587 rootPOA.the_POAManager().activate();
00588
00589 Policy[] poaPolicy = new Policy[3];
00590 poaPolicy[0] = rootPOA.create_lifespan_policy(
00591 LifespanPolicyValue.TRANSIENT);
00592 poaPolicy[1] = rootPOA.create_id_assignment_policy(
00593 IdAssignmentPolicyValue.SYSTEM_ID);
00594 poaPolicy[2] = rootPOA.create_servant_retention_policy(
00595 ServantRetentionPolicyValue.RETAIN);
00596
00597 poa = rootPOA.create_POA("DiscoveryService",
00598 rootPOA.the_POAManager(), poaPolicy);
00599 poa.the_POAManager().activate();
00600
00601 running = true;
00602 NetworkInterface[] nifs = getAllInterfaces();
00603 if (nifs.length < 1)
00604 throw new DiscoveryException(DiscoveryError.NoNetwork,
00605 "No external network interfaces found.");
00606
00607 mcast_sockets = new MulticastSocket[nifs.length];
00608 m_threads = new Listener[nifs.length];
00609
00610 log(Logger.LOG.INFO, "Discovery interfaces:");
00611 for (int i = 0; i < nifs.length; i++) {
00612 log(Logger.LOG.INFO, " " + nifs[i].getName());
00613 try {
00614 mcast_sockets[i] = new MulticastSocket(service_port);
00615 mcast_sockets[i].setReuseAddress(true);
00616 mcast_sockets[i].setNetworkInterface(nifs[i]);
00617 mcast_sockets[i].joinGroup(InetAddress.getByName(MCAST_ADDR));
00618
00619 m_threads[i] = new Listener(this, mcast_sockets[i]);
00620 m_threads[i].start();
00621 } catch (SocketException e) {
00622 throw new DiscoveryException(DiscoveryError.NoNetwork,
00623 "Socket error " + e.getMessage() +
00624 " on " + MCAST_ADDR);
00625 } catch (UnknownHostException e) {
00626 throw new DiscoveryException(DiscoveryError.NoNetwork,
00627 "Unknown host " + MCAST_ADDR +
00628 " " + e.getMessage());
00629 } catch (IOException e) {
00630 throw new DiscoveryException(DiscoveryError.NoNetwork,
00631 "IO error " + e.getMessage());
00632 }
00633 }
00634
00635 sendDiscovery(new Announce(node_id, h_level));
00636 }
00637
00638
00639 public void log(Logger.LOG type, String msg) {
00640 logger.log(type, msg);
00641 }
00642
00643 public void shutdown() {
00644 synchronized(this) {
00645 running = false;
00646 }
00647 try {
00648 for (int i = 0; i < m_threads.length; i++)
00649 m_threads[i].join();
00650 } catch (InterruptedException e) {}
00651 }
00652
00653
00654 public ObjectReference newObjRef() {
00655 return new ObjectReference();
00656 }
00657
00658 private NetworkInterface[] getAllInterfaces() {
00659 try {
00660 int size = 0, idx = 0;
00661 NetworkInterface[] nifs;
00662 Enumeration<NetworkInterface> ife;
00663
00664 ife = NetworkInterface.getNetworkInterfaces();
00665 for (NetworkInterface nif : Collections.list(ife)) {
00666 if (nif.getName().toLowerCase().startsWith("lo") == false)
00667 size++;
00668 }
00669 if (size < 1)
00670 return null;
00671
00672 nifs = new NetworkInterface[size];
00673 ife = NetworkInterface.getNetworkInterfaces();
00674 for (NetworkInterface nif : Collections.list(ife)) {
00675 if (nif.getName().toLowerCase().startsWith("lo") == false) {
00676 nifs[idx] = nif;
00677 idx++;
00678 }
00679 }
00680 return nifs;
00681 } catch (SocketException e) {
00682 log(Logger.LOG.ERROR, "Socket exception on getNetworkInterfaces");
00683 return null;
00684 }
00685 }
00686
00687
00688 public void sendDiscovery(Announce ann) {
00689 byte[] hdr = announceHeader();
00690 byte[] msg = loadAnnounce(ann);
00691 log(Logger.LOG.DEBUG, "Sending announce- ID:" + ann.nodeIdent +
00692 " level:" + ann.hLevel);
00693 sendAnnouncement(hdr, msg);
00694 }
00695
00696 public void sendDiscovery(Report report) {
00697 byte[] hdr = reportHeader();
00698 byte[] msg = loadReport(report);
00699 log(Logger.LOG.DEBUG, "Sending report- ID:" + report.nodeIdent +
00700 " level:" + report.hLevel + " type:" + report.cType);
00701 sendAnnouncement(hdr, msg);
00702 }
00703
00704 public void sendDiscovery(Require require) {
00705 byte[] hdr = requireHeader();
00706 byte[] msg = loadRequire(require);
00707 log(Logger.LOG.DEBUG, "Sending require- ID:" + require.nodeIdent +
00708 " level:" + require.hLevel + " type:" + require.cType +
00709 " target:" + require.targetIdent);
00710 sendAnnouncement(hdr, msg);
00711 }
00712
00713 public void sendDiscovery(Share share) {
00714 byte[] hdr = shareHeader(share);
00715 byte[] msg = loadShare(share);
00716 log(Logger.LOG.DEBUG, "Sending share- ID:" + share.nodeIdent +
00717 " level:" + share.hLevel + " type:" + share.cType +
00718 " ior:" + share.ior);
00719 sendAnnouncement(hdr, msg);
00720 }
00721
00722 private void sendAnnouncement(byte[] hdr, byte[] msg) {
00723 while (true) {
00724 long total_len = 0;
00725 DatagramPacket hpacket = null;
00726 DatagramPacket mpacket = null;
00727
00728 try {
00729 hpacket = new DatagramPacket(hdr, hdr.length,
00730 InetAddress.getByName(MCAST_ADDR),
00731 service_port);
00732 mpacket = new DatagramPacket(msg, msg.length,
00733 InetAddress.getByName(MCAST_ADDR),
00734 service_port);
00735 total_len = (hdr.length + msg.length) * mcast_sockets.length;
00736 for (int i = 0; i < mcast_sockets.length; i++) {
00737 mcast_sockets[i].send(hpacket);
00738 mcast_sockets[i].send(mpacket);
00739 }
00740 if (INSTRUMENT) {
00741 if (DUMP_TO_FILE) {
00742 String id = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
00743 FileWriter os = new FileWriter("jcorba." + id + ".msgct", true);
00744 os.write(total_len + " bytes\n");
00745 os.close();
00746 } else
00747 System.err.println(total_len + " bytes");
00748 }
00749 break;
00750 }
00751 catch (IllegalArgumentException iae) {
00752 try {
00753 Thread.sleep(TX_TIMEOUT + RETRY_ITVL);
00754 } catch (InterruptedException ie) {}
00755 }
00756 catch (IOException e) {
00757 log(Logger.LOG.ERROR, "Discovery IO exception");
00758 e.printStackTrace(System.err);
00759 break;
00760 }
00761 }
00762 }
00763
00764
00765
00766
00767
00768
00769 public byte level() {
00770 return h_level;
00771 }
00772
00773 public int identifier() {
00774 return node_id;
00775 }
00776
00777 public byte[] capabilities() {
00778 byte[] caps = new byte[refs_size];
00779 for (int i = 0; i < refs_size; i++)
00780 caps[i] = references[i].type;
00781 return caps;
00782 }
00783
00784 public void registerObject(byte capability, gov.lanl.isr.sensix.Request obj)
00785 throws DiscoveryException {
00786 registerObject(capability, obj, false);
00787 }
00788
00789 public void registerObject(byte capability, gov.lanl.isr.sensix.Request obj,
00790 boolean override) throws DiscoveryException {
00791 boolean found = false;
00792
00793 if (orb == null)
00794 throw new DiscoveryException(DiscoveryError.BadORB,
00795 "Discovery service ORB was lost");
00796 if (obj == null)
00797 throw new DiscoveryException(DiscoveryError.InvalidObject,
00798 "Attempting to register a null object");
00799 if (refs_size > MAX_CAPABLE)
00800 throw new DiscoveryException(DiscoveryError.RegistryFull,
00801 "Registry size exceeds " + MAX_CAPABLE);
00802
00803 for (int i = 0; i < refs_size; i++) {
00804 if (references[i].type == capability) {
00805 found = true;
00806 if (orb == null)
00807 throw new DiscoveryException(DiscoveryError.BadORB,
00808 "Discovery service ORB was lost");
00809 if (override == true)
00810 references[i].ior = orb.object_to_string(obj);
00811 else
00812 throw new DiscoveryException(
00813 DiscoveryError.AlreadyRegistered,
00814 "Prior registration found for " +
00815 SenseUtil.capabilityToString(capability) + " capability");
00816 }
00817 }
00818
00819 if (found == false) {
00820 references[refs_size] = new ObjectReference();
00821 references[refs_size].id = node_id;
00822 references[refs_size].ior = orb.object_to_string(obj);
00823 references[refs_size].type = capability;
00824 references[refs_size].valid = true;
00825 refs_size++;
00826 sendDiscovery(new Report(node_id, h_level, capability));
00827 }
00828 }
00829
00830
00831
00832
00833
00834 public boolean findNodeInFamily(int id) throws DiscoveryException {
00835 return others.findNode(id);
00836 }
00837
00838 public gov.lanl.isr.sensix.Request[] queryFamilyNetwork(byte capability)
00839 throws DiscoveryException {
00840 return others.queryNetwork(capability);
00841 }
00842
00843 public gov.lanl.isr.sensix.Request queryFamilyNode(int id, byte capability)
00844 throws DiscoveryException {
00845 return others.queryNode(id, capability);
00846 }
00847
00848 public boolean findNodeInDescendants(int id) throws DiscoveryException {
00849 return others.descendants().findNode(id);
00850 }
00851
00852 public gov.lanl.isr.sensix.Request[] queryDescendantNetwork(byte capability)
00853 throws DiscoveryException {
00854 return others.descendants().queryNetwork(capability);
00855 }
00856
00857 public gov.lanl.isr.sensix.Request queryDescendantNode(int id,
00858 byte capability)
00859 throws DiscoveryException {
00860 return others.descendants().queryNode(id, capability);
00861 }
00862
00863 public byte[] descendantCapabilities() throws DiscoveryException {
00864 return others.descendants().capabilities();
00865 }
00866
00867 public int[] descendantNodes() throws DiscoveryException {
00868 return others.descendants().nodes();
00869 }
00870
00871 public boolean findNodeInSiblings(int id) throws DiscoveryException {
00872 return others.siblings().findNode(id);
00873 }
00874
00875 public gov.lanl.isr.sensix.Request[] querySiblingNetwork(byte capability)
00876 throws DiscoveryException {
00877 return others.siblings().queryNetwork(capability);
00878 }
00879
00880 public gov.lanl.isr.sensix.Request querySiblingNode(int id, byte capability)
00881 throws DiscoveryException {
00882 return others.siblings().queryNode(id, capability);
00883 }
00884
00885 public byte[] siblingCapabilities() throws DiscoveryException {
00886 return others.siblings().capabilities();
00887 }
00888
00889 public int[] siblingNodes() throws DiscoveryException {
00890 return others.siblings().nodes();
00891 }
00892
00893 public boolean findNodeInAncestors(int id) throws DiscoveryException {
00894 return others.ancestors().findNode(id);
00895 }
00896
00897 public int[] ancestorNodes() throws DiscoveryException {
00898 return others.ancestors().nodes();
00899 }
00900 }