00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package gov.lanl.isr.sensix;
00036
00037 import java.applet.*;
00038 import java.util.*;
00039 import java.net.*;
00040 import java.io.*;
00041 import org.omg.CORBA.*;
00042 import org.omg.PortableServer.*;
00043 import gov.lanl.isr.sensix.discovery.*;
00044 import gov.lanl.isr.sensix.sensing.*;
00045
00046
00047 public class SenseCtrl extends SenseClient
00048 {
00049 private class inputThread implements Runnable
00050 {
00051 private Thread t;
00052 private SenseCtrl parent;
00053
00054
00055 public inputThread(SenseCtrl p) {
00056 parent = p;
00057 }
00058
00059 public void init() {
00060 t = new Thread(this);
00061 t.start();
00062 }
00063
00064 public void run() {
00065 boolean keep_going;
00066 synchronized(parent) {
00067 keep_going = parent.running;
00068 }
00069
00070 while (keep_going) {
00071 BufferedReader in;
00072
00073 try {
00074 parent.socket.setSoTimeout(1000);
00075 parent.client_socket = parent.socket.accept();
00076 in = new BufferedReader(
00077 new InputStreamReader(client_socket.getInputStream()));
00078 parent.out =
00079 new PrintWriter(client_socket.getOutputStream(), true);
00080 }
00081 catch (SocketTimeoutException e) {
00082 synchronized(parent) {
00083 keep_going = parent.running;
00084 }
00085 continue;
00086 }
00087 catch (IOException e) {
00088 parent.log(Logger.LOG.ERROR,
00089 "Exception accepting connection");
00090 e.printStackTrace(System.err);
00091 parent.client_socket = null;
00092 parent.out = null;
00093 synchronized(parent) {
00094 keep_going = parent.running;
00095 }
00096 continue;
00097 }
00098
00099 String input;
00100 try {
00101 while ((input = in.readLine()) != null) {
00102 String cmd = new String(input).trim();
00103
00104 parent.log(Logger.LOG.DEBUG, "Recvd cmd: " + cmd);
00105
00106 if (cmd.equalsIgnoreCase("quit") ||
00107 cmd.equalsIgnoreCase("exit"))
00108 parent.shutdown();
00109 else {
00110 Functor f = validateCmd(cmd);
00111 if (f != null)
00112 parent.devolve(f);
00113 }
00114 }
00115
00116 in.close();
00117 parent.out.close();
00118 parent.client_socket.close();
00119 }
00120 catch (IOException e) {}
00121
00122 parent.out = null;
00123 parent.client_socket = null;
00124
00125 synchronized(parent) {
00126 keep_going = parent.running;
00127 }
00128 }
00129 try {
00130 parent.socket.close();
00131 }
00132 catch (IOException e) {}
00133 }
00134 }
00135
00136
00137
00138
00139
00140
00141 public static final String appname = "SENSIX Control";
00142
00143 public ServerSocket socket;
00144 public Socket client_socket;
00145 public PrintWriter out;
00146
00147 private boolean running, use_gui;
00148 private Thread input;
00149
00150
00151 public SenseCtrl(int id, int level, String[] args, byte debug_level)
00152 throws org.omg.CORBA.SystemException,
00153 org.omg.CORBA.UserException {
00154 this(id, level, args, (java.util.Properties)null, appname,
00155 debug_level);
00156 }
00157
00158 public SenseCtrl(int id, int level, String[] args, Properties p,
00159 String name, byte debug_level)
00160 throws org.omg.CORBA.SystemException,
00161 org.omg.CORBA.UserException {
00162 super(id, level, args, null, name, debug_level);
00163
00164 client_socket = null;
00165 out = null;
00166
00167 running = true;
00168 try {
00169 socket = new ServerSocket(SenseCorba.SERVER_PORT);
00170 }
00171 catch (IOException e) {
00172 logger.log(Logger.LOG.ERROR, "Could not listen on port " +
00173 SenseCorba.SERVER_PORT);
00174 System.exit(1);
00175 }
00176
00177 inputThread it = new inputThread(this);
00178 it.init();
00179 }
00180
00181
00182
00183 public synchronized void aggregate(Functor funct) {
00184 log(Logger.LOG.DEBUG, "control aggregate: " + funct.asString());
00185 Functor f = re_evolve(funct);
00186 if (f == null)
00187 return;
00188
00189 switch (f.identifier()) {
00190 case SenseCorba.ALPHA:
00191 try {
00192 Sense s = SenseHelper.narrow(f);
00193 String res = "Sense result: " +
00194 SenseUtil.capabilityToString(s.sensor()) + "";
00195 Data d = s.results()[0];
00196 if (SenseUtil.isIntegerData(d.discriminator()))
00197 res += d.iresult();
00198 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00199 res += d.fresult();
00200 else
00201 res = "Invalid, impossible result";
00202 outputResults(res);
00203 } catch (org.omg.CORBA.BAD_PARAM e) {
00204 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00205 "SENSIX type do not match");
00206 }
00207 break;
00208 case SenseCorba.BETA:
00209 try {
00210 PeakSense s = PeakSenseHelper.narrow(f);
00211 String res = "PeakSense result: " +
00212 SenseUtil.capabilityToString(s.sensor()) + "";
00213 Data d = s.results()[0];
00214 if (SenseUtil.isIntegerData(d.discriminator()))
00215 res += d.iresult();
00216 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00217 res += d.fresult();
00218 else
00219 res = "Invalid, impossible result";
00220 outputResults(res);
00221 } catch (org.omg.CORBA.BAD_PARAM e) {
00222 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00223 "SENSIX type do not match");
00224 }
00225 break;
00226 case SenseCorba.THETA:
00227 try {
00228 TimeSeries series = TimeSeriesHelper.narrow(f);
00229 outputResults("TimeSeries results:");
00230 Sensory s = series.sense();
00231 for (int i = 0; i < series.results().length; i++)
00232 aggregate(s);
00233 } catch (org.omg.CORBA.BAD_PARAM e) {
00234 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00235 "SENSIX type do not match");
00236 }
00237 break;
00238 case SenseCorba.PSI:
00239 try {
00240 SpatialSeries series = SpatialSeriesHelper.narrow(f);
00241 outputResults("SpatialSeries results:");
00242 Sensory s = series.sense();
00243 for (int i = 0; i < series.results().length; i++)
00244 aggregate(s);
00245 } catch (org.omg.CORBA.BAD_PARAM e) {
00246 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00247 "SENSIX type do not match");
00248 }
00249 break;
00250 case SenseCorba.IOTA:
00251 try {
00252 Recite agg = ReciteHelper.narrow(f);
00253 String res = "Recite results ";
00254 for (int i = 0; i < agg.collectors().length; i++) {
00255 gov.lanl.isr.sensix.sensing.Collection c =
00256 agg.collectors()[i];
00257 if (c.identifier() == SenseCorba.THETA)
00258 res += "over TimeSeries ";
00259 else if (c.identifier() == SenseCorba.PSI)
00260 res += "over SpatialSeries ";
00261 res += "of " +
00262 SenseUtil.capabilityToString(c.sense().sensor());
00263 res += ": ";
00264 for (int j = 0; j < agg.results().length; j++) {
00265 Data d = agg.results()[j];
00266 if (SenseUtil.isIntegerData(d.discriminator()))
00267 res += d.iresult();
00268 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00269 res += d.fresult();
00270 else
00271 res += "(Invalid, impossible result)";
00272 if (j < agg.results().length - 1)
00273 res += ", ";
00274 }
00275 if (i < agg.collectors().length - 1)
00276 res += " and ";
00277 }
00278 outputResults(res);
00279 } catch (org.omg.CORBA.BAD_PARAM e) {
00280 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00281 "SENSIX type do not match");
00282 }
00283 break;
00284 case SenseCorba.SUMMA:
00285 try {
00286 Sum agg = SumHelper.narrow(f);
00287 String res = "Sum results ";
00288 for (int i = 0; i < agg.collectors().length; i++) {
00289 gov.lanl.isr.sensix.sensing.Collection c =
00290 agg.collectors()[i];
00291 if (c.identifier() == SenseCorba.THETA)
00292 res += "over TimeSeries ";
00293 else if (c.identifier() == SenseCorba.PSI)
00294 res += "over SpatialSeries ";
00295 res += "of " +
00296 SenseUtil.capabilityToString(c.sense().sensor());
00297 res += ": ";
00298 Data d = agg.results()[0];
00299 if (SenseUtil.isIntegerData(d.discriminator()))
00300 res += d.iresult();
00301 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00302 res += d.fresult();
00303 else
00304 res += "(Invalid, impossible result)";
00305 if (i < agg.collectors().length - 1)
00306 res += " and ";
00307 }
00308 outputResults(res);
00309 } catch (org.omg.CORBA.BAD_PARAM e) {
00310 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00311 "SENSIX type do not match");
00312 }
00313 break;
00314 case SenseCorba.DELTA:
00315 try {
00316 Delta agg = DeltaHelper.narrow(f);
00317 String res = "Delta results ";
00318 for (int i = 0; i < agg.collectors().length; i++) {
00319 gov.lanl.isr.sensix.sensing.Collection c =
00320 agg.collectors()[i];
00321 if (c.identifier() == SenseCorba.THETA)
00322 res += "over TimeSeries ";
00323 else if (c.identifier() == SenseCorba.PSI)
00324 res += "over SpatialSeries ";
00325 res += "of " +
00326 SenseUtil.capabilityToString(c.sense().sensor());
00327 res += ": ";
00328 Data d = agg.results()[0];
00329 if (SenseUtil.isIntegerData(d.discriminator()))
00330 res += d.iresult();
00331 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00332 res += d.fresult();
00333 else
00334 res += "(Invalid, impossible result)";
00335 if (i < agg.collectors().length - 1)
00336 res += " and ";
00337 }
00338 outputResults(res);
00339 } catch (org.omg.CORBA.BAD_PARAM e) {
00340 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00341 "SENSIX type do not match");
00342 }
00343 break;
00344 case SenseCorba.BARX:
00345 try {
00346 Mean agg = MeanHelper.narrow(f);
00347 String res = "Mean results ";
00348 for (int i = 0; i < agg.collectors().length; i++) {
00349 gov.lanl.isr.sensix.sensing.Collection c =
00350 agg.collectors()[i];
00351 if (c.identifier() == SenseCorba.THETA)
00352 res += "over TimeSeries ";
00353 else if (c.identifier() == SenseCorba.PSI)
00354 res += "over SpatialSeries ";
00355 res += "of " +
00356 SenseUtil.capabilityToString(c.sense().sensor());
00357 res += ": ";
00358 Data d = agg.results()[0];
00359 if (SenseUtil.isIntegerData(d.discriminator()))
00360 res += d.iresult();
00361 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00362 res += d.fresult();
00363 else
00364 res += "(Invalid, impossible result)";
00365 if (i < agg.collectors().length - 1)
00366 res += " and ";
00367 }
00368 outputResults(res);
00369 } catch (org.omg.CORBA.BAD_PARAM e) {
00370 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00371 "SENSIX type do not match");
00372 }
00373 break;
00374 case SenseCorba.SIGMA:
00375 try {
00376 Sigma agg = SigmaHelper.narrow(f);
00377 String res = "Sigma results ";
00378 for (int i = 0; i < agg.collectors().length; i++) {
00379 gov.lanl.isr.sensix.sensing.Collection c =
00380 agg.collectors()[i];
00381 if (c.identifier() == SenseCorba.THETA)
00382 res += "over TimeSeries ";
00383 else if (c.identifier() == SenseCorba.PSI)
00384 res += "over SpatialSeries ";
00385 res += "of " +
00386 SenseUtil.capabilityToString(c.sense().sensor());
00387 res += ": ";
00388 Data d = agg.results()[0];
00389 if (SenseUtil.isIntegerData(d.discriminator()))
00390 res += d.iresult();
00391 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00392 res += d.fresult();
00393 else
00394 res += "(Invalid, impossible result)";
00395 if (i < agg.collectors().length - 1)
00396 res += " and ";
00397 }
00398 outputResults(res);
00399 } catch (org.omg.CORBA.BAD_PARAM e) {
00400 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00401 "SENSIX type do not match");
00402 }
00403 break;
00404 case SenseCorba.LAMBDA:
00405 try {
00406 Lambda agg = LambdaHelper.narrow(f);
00407 String res = "Lambda results: ";
00408 for (int i = 0; i < agg.collectors().length; i++) {
00409 gov.lanl.isr.sensix.sensing.Collection c =
00410 agg.collectors()[i];
00411 if (c.identifier() == SenseCorba.THETA)
00412 res += "over TimeSeries ";
00413 else if (c.identifier() == SenseCorba.PSI)
00414 res += "over SpatialSeries ";
00415 res += "of " +
00416 SenseUtil.capabilityToString(c.sense().sensor());
00417 res += ": ";
00418 Data d = agg.results()[0];
00419 if (SenseUtil.isIntegerData(d.discriminator()))
00420 res += d.iresult();
00421 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00422 res += d.fresult();
00423 else
00424 res += "(Invalid, impossible result)";
00425 if (i < agg.collectors().length - 1)
00426 res += " and ";
00427 }
00428 outputResults(res);
00429 } catch (org.omg.CORBA.BAD_PARAM e) {
00430 log(Logger.LOG.ERROR, "Impossible situation: CORBA type and " +
00431 "SENSIX type do not match");
00432 }
00433 break;
00434 default:
00435 log(Logger.LOG.ERROR, "Unknown functor");
00436 outputResults("Error: unknown functor");
00437 break;
00438 }
00439
00440 if (f.errors() != null) {
00441 for (int i = 0; i < f.errors().length; i++) {
00442 FunctorError err = f.errors()[i];
00443 String result = "";
00444
00445 if (err == FunctorError.Refused)
00446 result += "Request refused ";
00447 else if (err == FunctorError.Incapable)
00448 result += "Incapable of fulfilling request ";
00449 else
00450 return;
00451
00452 switch (f.identifier()) {
00453 case SenseCorba.ALPHA:
00454 result += "for Sense functor";
00455 break;
00456 case SenseCorba.BETA:
00457 result += "for PeakSense functor";
00458 break;
00459 case SenseCorba.THETA:
00460 result += "for TimeSeries functor";
00461 break;
00462 case SenseCorba.PSI:
00463 result += "for SpatialSeries functor";
00464 break;
00465 case SenseCorba.IOTA:
00466 result += "for Reort functor";
00467 break;
00468 case SenseCorba.SUMMA:
00469 result += "for Sum functor";
00470 break;
00471 case SenseCorba.DELTA:
00472 result += "for Delta functor";
00473 break;
00474 case SenseCorba.BARX:
00475 result += "for Mean functor";
00476 break;
00477 case SenseCorba.SIGMA:
00478 result += "for Sigma functor";
00479 break;
00480 case SenseCorba.LAMBDA:
00481 result += "for Lambda functor";
00482 break;
00483 default:
00484 result += "for invalid functor";
00485 break;
00486 }
00487 outputResults(result);
00488 }
00489 }
00490 f._release();
00491 }
00492
00493
00494 public void shutdown() {
00495 synchronized(this) {
00496 running = false;
00497 }
00498 super.shutdown();
00499 logger.log(Logger.LOG.INFO, "Shutdown");
00500 System.exit(0);
00501 }
00502
00503
00504 public void outputResults(String results_str) {
00505 if (client_socket != null) {
00506 logger.log(Logger.LOG.DEBUG, "Sending result: " + results_str);
00507 out.println(results_str);
00508 }
00509 else
00510 System.out.println(results_str);
00511 }
00512
00513
00514 public Functor validateCmd(String task_str) {
00515 if (task_str.equalsIgnoreCase("h") ||
00516 task_str.equalsIgnoreCase("?") ||
00517 task_str.equalsIgnoreCase("help")) {
00518 outputResults(service_usage());
00519 return null;
00520 }
00521 Sensory s = null;
00522 logger.log(Logger.LOG.DEBUG, "Task: " + task_str);
00523 try {
00524 s = SenseParser.senseParser(poa, task_str);
00525 }
00526 catch (IOException e) {
00527 s = null;
00528 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00529 "Bad command string (buffer string reader failed)");
00530 }
00531 catch (TokenMgrError e) {
00532 s = null;
00533 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00534 "Failed lexical analysis (invalid user string)");
00535 }
00536 catch (ParseException e) {
00537 s = null;
00538 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00539 "Failed parsing (improper user string)");
00540 }
00541 catch (NumberFormatException e) {
00542 s = null;
00543 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00544 "Bad/malformed parameter");
00545 }
00546 catch (UserException e) {
00547 s = null;
00548 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00549 "Servant not active");
00550 }
00551 return (Functor)s;
00552 }
00553
00554
00555 public String service_usage() {
00556 String prefix = " ";
00557 String use = " SENSIX task specification: (EBNF)\n";
00558 use += prefix + "task <- sense | aggregate\n";
00559 use += prefix + "sense <- s_type '(' s_params ')'\n";
00560 use += prefix + "collection <- c_type '(' c_params task ')'\n";
00561 use += prefix + "aggregate <- a_type '(' a_params collection ')'\n";
00562 use += prefix + "s_params <- {} | s_param ',' s_params*\n";
00563 use += prefix + "c_params <- {} | c_param ',' c_params*\n";
00564 use += prefix + "a_params <- {} | l_param ','\n";
00565 use += prefix + "s_type <- \"sense\" | \"peaksense\"\n";
00566 use += prefix + "c_type <- \"timeseries\" | \"spatialseries\"\n";
00567 use += prefix + "a_type <- \"recite\" | \"sum\" | \"delta\" | " +
00568 "\"mean\" | \"sigma\"\n";
00569 use += prefix + "l_param <- \"level\" \"=\" Integer\n";
00570 use += prefix + "s_param <- l_param | \"sensor\" \"=\" sensor | " +
00571 "\"rate\" \"=\" Double | \"threshold\" \"=\" Double\n";
00572 use += prefix + "c_param <- l_param | \"duration\" \"=\" Double | " +
00573 "\"angle\" \"=\" Double | \"distance\" \"=\" Double\n";
00574 use += prefix + "sensor <- \"light\" | \"temperature\" | " +
00575 "\"accelerometer\" | \"magnetometer\" | \"humidity\" | " +
00576 "\"GPS\" | \"battery\"\n";
00577 use += "\n";
00578 use += " Examples:\n";
00579 use += prefix + "mean(spatialseries(sense(sensor=temperature)))\n";
00580 use += prefix + " reports the average of all temperature " +
00581 "readings in the known network.\n";
00582 use += prefix + "sum(level=4,spatialseries(level=4, " +
00583 "mean(level=3,spatialseries(level=3, " +
00584 "delta(level=2,spatialseries(level=2, " +
00585 "(sigma(level=1,timeseries(level=1,duration=10," +
00586 "sense(level=1,sensor=magnetometer,rate=2))))))))))\n";
00587 use += prefix + " sums the averages of the maximum difference of " +
00588 "the standard deviations of magnetometer measurements over 5 " +
00589 "seconds by hierarchical level.\n";
00590 use += prefix + "sum(spatialseries(sense(level=1,sensor=light)))\n";
00591 use += prefix + " accumulates only leaf node light readings.\n";
00592 use += "\n";
00593 return use;
00594 }
00595
00596
00597
00598 public static void main(String[] args)
00599 {
00600 SenseUtil su = new SenseUtil(appname, "sensix-ctl.jar", args);
00601 SenseCtrl control = null;
00602 try {
00603 control = new SenseCtrl(su.id, su.level, su.corba_args,
00604 su.debug_level);
00605 }
00606 catch (DiscoveryException e) {
00607 System.err.println(SenseCtrl.appname +
00608 ": Fatal exception accessing the SENSIX Discovery service: ");
00609 e.printStackTrace(System.err);
00610 System.exit(1);
00611 }
00612 catch (org.omg.CORBA.SystemException e) {
00613 System.err.println(SenseCtrl.appname +
00614 ": Fatal CORBA system exception: ");
00615 e.printStackTrace(System.err);
00616 System.exit(1);
00617 }
00618 catch (org.omg.CORBA.UserException e) {
00619 System.err.println(SenseCtrl.appname +
00620 ": Fatal CORBA user exception: ");
00621 e.printStackTrace(System.err);
00622 System.exit(1);
00623 }
00624 catch (Exception e) {
00625 System.err.println(SenseCtrl.appname + ": Unhandled exception: ");
00626 e.printStackTrace(System.err);
00627 System.exit(1);
00628 }
00629
00630 control.log(Logger.LOG.INFO, "daemon started - node ID " + su.id +
00631 " at level " + su.level);
00632 }
00633 }