00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package gov.lanl.isr.sensix;
00036
00037 import java.util.*;
00038 import org.omg.CORBA.*;
00039 import org.omg.PortableServer.*;
00040 import org.omg.PortableServer.POAManagerPackage.*;
00041 import gov.lanl.isr.sensix.discovery.*;
00042 import gov.lanl.isr.sensix.sensing.*;
00043
00044
00045 public class SenseClient extends ResponsePOA
00046 {
00047 public static boolean EXTERNAL_DISCOVERY = false;
00048
00049 protected ORB orb;
00050 protected POA poa;
00051 protected DiscoveryService discover;
00052 protected TaskTracking track;
00053 protected SenseServer local_server;
00054 protected Response resp;
00055 protected Logger logger;
00056 protected boolean own_discover;
00057
00058
00059 public SenseClient(ORB orb_val, POA poa_val, DiscoveryService ds,
00060 String app, byte debug_level)
00061 throws org.omg.CORBA.SystemException,
00062 org.omg.CORBA.UserException {
00063 logger = new Logger(app, debug_level);
00064 orb = orb_val;
00065 poa = poa_val;
00066 discover = ds;
00067 own_discover = false;
00068 local_server = null;
00069 track = new TaskTracking();
00070
00071 org.omg.CORBA.Object ref = poa.servant_to_reference(this);
00072 resp = ResponseHelper.narrow(ref);
00073 }
00074
00075
00076 public SenseClient(int id, int level, String[] args, Properties props,
00077 String app, byte debug_level)
00078 throws org.omg.CORBA.SystemException, org.omg.CORBA.UserException,
00079 DiscoveryException {
00080 logger = new Logger(app, debug_level);
00081 orb = ORB.init(args, props);
00082 poa = POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
00083 poa.the_POAManager().activate();
00084
00085 local_server = null;
00086 track = new TaskTracking();
00087
00088 if (EXTERNAL_DISCOVERY) {
00089 org.omg.CORBA.Object objref =
00090 orb.resolve_initial_references("DiscoveryService");
00091 discover = (DiscoveryService)SelfHelper.narrow(objref);
00092 own_discover = false;
00093 }
00094 else {
00095 discover = new DiscoveryService(orb, id, (byte)level, app,
00096 debug_level);
00097 own_discover = true;
00098 }
00099
00100 org.omg.CORBA.Object ref = poa.servant_to_reference(this);
00101 resp = ResponseHelper.narrow(ref);
00102 }
00103
00104
00105 public void log(Logger.LOG type, String msg) {
00106 logger.log(type, msg);
00107 }
00108
00109 public POA getPoa() {
00110 return poa;
00111 }
00112
00113 public synchronized void linkServer(SenseServer serv) {
00114 local_server = serv;
00115 }
00116
00117
00118 public synchronized void shutdown() {
00119 if (own_discover)
00120 discover.shutdown();
00121
00122 if (local_server == null) {
00123 try {
00124 poa.the_POAManager().deactivate(false, false);
00125 } catch (AdapterInactive e) {}
00126 orb.shutdown(false);
00127 orb.destroy();
00128 }
00129 log(Logger.LOG.INFO, "Shutdown");
00130 }
00131
00132
00133
00138 public void aggregate(Functor funct) {
00139 if (funct == null) {
00140 log(Logger.LOG.ERROR, "Aggregate on a null functor");
00141 return;
00142 }
00143 Functor f = re_evolve(funct);
00144 if (f != null)
00145 f._release();
00146 }
00147
00148
00149
00150
00151 protected synchronized Functor re_evolve(Functor funct) {
00152 log(Logger.LOG.DEBUG, "aggregate: " + funct.asString());
00153 TaskImpl task = track.get(funct);
00154 Functor tf = null;
00155
00156 if (task != null) {
00157 tf = task.f();
00158
00159 TaskImpl ti = track.get(tf);
00160 if (ti != null) {
00161 Functor tf_parent = ti.superf();
00162 if (tf_parent != null &&
00163 (tf_parent.identifier() == SenseCorba.THETA ||
00164 tf_parent.identifier() == SenseCorba.PSI))
00165 tf_parent.results(funct.results());
00166 }
00167
00168 track.num_peers(tf, track.num_peers(tf) - 1);
00169 if (track.num_peers(tf) <= 0) {
00170 Request[] array = task.subtasks();
00171 for (int i = 0; i < array.length; i++)
00172 array[i]._release();
00173
00174 Functor f_prime = tf;
00175 Functor f_prev = null;
00176 while (f_prime != null) {
00177 f_prev = f_prime;
00178 TaskImpl t = track.get(f_prime);
00179 track.remove(f_prime);
00180 if (t != null) {
00181 f_prime = t.superf();
00182 if (f_prime != null)
00183 log(Logger.LOG.DEBUG, "aggregate: " +
00184 f_prime.asString());
00185 }
00186 else
00187 f_prime = null;
00188 }
00189
00190 if (local_server != null)
00191 local_server.dataready(f_prev);
00192 tf = f_prev;
00193 }
00194 }
00195
00196 funct._release();
00197 return tf;
00198 }
00199
00200
00201
00202 protected boolean reduction_test(Sensory f_prime, Functor[] array) {
00203 if (array != null && array.length > 0 &&
00204 (f_prime.identifier() == SenseCorba.IOTA ||
00205 (discover.level() == 2 && (f_prime.level() == SenseCorba.INVALID ||
00206 f_prime.level() >= discover.level())) ||
00207 (f_prime.level() != SenseCorba.INVALID &&
00208 f_prime.level() >= discover.level())))
00209 return true;
00210 return false;
00211 }
00212
00213
00219 public int devolve(Functor funct) {
00220 if (funct == null) {
00221 log(Logger.LOG.ERROR, "Devolve on a null functor");
00222 return -1;
00223 }
00224 return select(null, funct);
00225 }
00226
00227
00228 protected synchronized int select(Functor prev, Functor funct) {
00229 log(Logger.LOG.DEBUG, "devolve: " + funct.asString());
00230
00231 TaskImpl task = new TaskImpl(prev, funct, null);
00232 track.add(task);
00233
00234 if (discover == null)
00235 return -2;
00236
00237 Sensory f_prime = SensoryHelper.narrow(funct);
00238 Functor[] array = f_prime.subfunctors();
00239 if (reduction_test(f_prime, array)) {
00240 int rv = 0;
00241 for (int i = 0; i < array.length; i++)
00242 rv += select(f_prime, array[i]);
00243 if ((rv / array.length) > -1)
00244 return 0;
00245 else
00246 return -1;
00247 }
00248
00249 Request[] reqs = null;
00250 try {
00251 reqs = discover.queryFamilyNetwork(f_prime.sensor());
00252 } catch (DiscoveryException de) {
00253 log(Logger.LOG.ERROR,
00254 "Discovery exception on queryNetwork for " +
00255 SenseUtil.capabilityToString(f_prime.sensor()) +
00256 ": " + de.description);
00257 return -1;
00258 }
00259
00260 for (int i = 0; i < reqs.length; i++) {
00261 try {
00262 task.addSubtask(reqs[i], f_prime);
00263 track.num_peers(funct, track.num_peers(funct) + 1);
00264 reqs[i].apply(f_prime, resp);
00265 } catch (Exception e) {
00266 log(Logger.LOG.ERROR, "Bad Param2: " + f_prime);
00267 e.printStackTrace(System.err);
00268 }
00269 }
00270 return 0;
00271 }
00272
00273
00274
00280 public int detask(Functor funct) {
00281 if (funct == null) {
00282 log(Logger.LOG.ERROR, "Detask on a null functor");
00283 return -1;
00284 }
00285 log(Logger.LOG.DEBUG, "detask: " + funct.asString());
00286 TaskImpl task = null;
00287 synchronized(this) {
00288 task = track.get(funct);
00289 }
00290 if (task == null)
00291 return -1;
00292
00293 if (discover == null)
00294 return -2;
00295
00296 Sensory f_prime = SensoryHelper.narrow(task.f());
00297 Functor[] array = f_prime.subfunctors();
00298 if (reduction_test(f_prime, array)) {
00299 int rv = 0;
00300 for (int i = 0; i < array.length; i++)
00301 rv += detask(array[i]);
00302 if ((rv / array.length) > -1)
00303 return 0;
00304 else
00305 return -1;
00306 }
00307
00308 Request[] subs = task.subtasks();
00309 for (int i = 0; i < subs.length; i++)
00310 subs[i].cancel(SensoryHelper.narrow(task.f()));
00311 synchronized(this) {
00312 track.remove(task.f());
00313 task.f().results(new Data[0]);
00314 }
00315 return 0;
00316 }
00317 }