00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package sensix;
00036
00037 import java.applet.*;
00038 import java.util.*;
00039 import java.net.*;
00040 import java.io.*;
00041 import sensix.sensing.*;
00042
00043
00044 public class SenseCtrl extends SenseClient
00045 {
00046 private class inputThread implements Runnable
00047 {
00048 private Thread t;
00049 private SenseCtrl parent;
00050
00051
00052 public inputThread(SenseCtrl p) {
00053 parent = p;
00054 }
00055
00056 public void init() {
00057 t = new Thread(this);
00058 t.start();
00059 }
00060
00061 public void run() {
00062 boolean keep_going;
00063 synchronized(parent) {
00064 keep_going = parent.running;
00065 }
00066
00067 while (keep_going) {
00068 BufferedReader in;
00069
00070 try {
00071 parent.socket.setSoTimeout(1000);
00072 parent.client_socket = parent.socket.accept();
00073 in = new BufferedReader(
00074 new InputStreamReader(client_socket.getInputStream()));
00075 parent.out =
00076 new PrintWriter(client_socket.getOutputStream(), true);
00077 }
00078 catch (SocketTimeoutException e) {
00079 synchronized(parent) {
00080 keep_going = parent.running;
00081 }
00082 continue;
00083 }
00084 catch (IOException e) {
00085 parent.log(Logger.LOG.ERROR,
00086 "Exception accepting connection");
00087 e.printStackTrace(System.err);
00088 parent.client_socket = null;
00089 parent.out = null;
00090 synchronized(parent) {
00091 keep_going = parent.running;
00092 }
00093 continue;
00094 }
00095
00096 String input;
00097 try {
00098 while ((input = in.readLine()) != null) {
00099 String cmd = new String(input).trim();
00100
00101 parent.log(Logger.LOG.DEBUG, "Recvd cmd: " + cmd);
00102
00103 if (cmd.equalsIgnoreCase("quit") ||
00104 cmd.equalsIgnoreCase("exit"))
00105 parent.shutdown();
00106 else {
00107 Functor f = validateCmd(cmd);
00108 if (f != null)
00109 parent.devolve(f);
00110 }
00111 }
00112
00113 in.close();
00114 parent.out.close();
00115 parent.client_socket.close();
00116 }
00117 catch (IOException e) {}
00118
00119 parent.out = null;
00120 parent.client_socket = null;
00121
00122 synchronized(parent) {
00123 keep_going = parent.running;
00124 }
00125 }
00126 try {
00127 parent.socket.close();
00128 }
00129 catch (IOException e) {}
00130 }
00131 }
00132
00133
00134
00135
00136
00137
00138 public static final String appname = "SENSIX Control";
00139
00140 public ServerSocket socket;
00141 public Socket client_socket;
00142 public PrintWriter out;
00143
00144 private boolean running, use_gui;
00145 private Thread input;
00146
00147
00148 public SenseCtrl(int id, int level, byte debug_level)
00149 throws Exception {
00150 this(id, level, appname, debug_level);
00151 }
00152
00153 public SenseCtrl(int id, int level, String name, byte debug_level)
00154 throws Exception {
00155 super(id, level, name, debug_level);
00156
00157 client_socket = null;
00158 out = null;
00159
00160 running = true;
00161 try {
00162 socket = new ServerSocket(Sensix.SERVER_PORT);
00163 }
00164 catch (IOException e) {
00165 logger.log(Logger.LOG.ERROR, "Could not listen on port " +
00166 Sensix.SERVER_PORT);
00167 System.exit(1);
00168 }
00169
00170 inputThread it = new inputThread(this);
00171 it.init();
00172 }
00173
00174
00175
00176 public synchronized void aggregate(Functor funct) {
00177 log(Logger.LOG.DEBUG, "control aggregate: " + funct.asString());
00178 Functor f = re_evolve(funct);
00179 if (f == null)
00180 return;
00181
00182 switch (f.identifier()) {
00183 case Sensix.ALPHA:
00184 {
00185 Sense s = (Sense)f;
00186 String res = "Sense result: " +
00187 SenseUtil.capabilityToString(s.sensor()) + "";
00188 Data d = s.results()[0];
00189 if (SenseUtil.isIntegerData(d.discriminator()))
00190 res += d.iresult();
00191 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00192 res += d.fresult();
00193 else
00194 res = "Invalid, impossible result";
00195 outputResults(res);
00196 }
00197 break;
00198 case Sensix.BETA:
00199 {
00200 PeakSense s = (PeakSense)f;
00201 String res = "PeakSense result: " +
00202 SenseUtil.capabilityToString(s.sensor()) + "";
00203 Data d = s.results()[0];
00204 if (SenseUtil.isIntegerData(d.discriminator()))
00205 res += d.iresult();
00206 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00207 res += d.fresult();
00208 else
00209 res = "Invalid, impossible result";
00210 outputResults(res);
00211 }
00212 break;
00213 case Sensix.THETA:
00214 {
00215 TimeSeries series = (TimeSeries)f;
00216 outputResults("TimeSeries results:");
00217 Sensory s = series.sense();
00218 for (int i = 0; i < series.results().length; i++)
00219 aggregate(s);
00220 }
00221 break;
00222 case Sensix.PSI:
00223 {
00224 SpatialSeries series = (SpatialSeries)f;
00225 outputResults("SpatialSeries results:");
00226 Sensory s = series.sense();
00227 for (int i = 0; i < series.results().length; i++)
00228 aggregate(s);
00229 }
00230 break;
00231 case Sensix.IOTA:
00232 {
00233 Recite agg = (Recite)f;
00234 String res = "Recite results ";
00235 for (int i = 0; i < agg.collectors().length; i++) {
00236 sensix.sensing.Collection c = agg.collectors()[i];
00237 if (c.identifier() == Sensix.THETA)
00238 res += "over TimeSeries ";
00239 else if (c.identifier() == Sensix.PSI)
00240 res += "over SpatialSeries ";
00241 res += "of " +
00242 SenseUtil.capabilityToString(c.sense().sensor());
00243 res += ": ";
00244 for (int j = 0; j < agg.results().length; j++) {
00245 Data d = agg.results()[j];
00246 if (SenseUtil.isIntegerData(d.discriminator()))
00247 res += d.iresult();
00248 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00249 res += d.fresult();
00250 else
00251 res += "(Invalid, impossible result)";
00252 if (j < agg.results().length - 1)
00253 res += ", ";
00254 }
00255 if (i < agg.collectors().length - 1)
00256 res += " and ";
00257 }
00258 outputResults(res);
00259 }
00260 break;
00261 case Sensix.SUMMA:
00262 {
00263 Sum agg = (Sum)f;
00264 String res = "Sum results ";
00265 for (int i = 0; i < agg.collectors().length; i++) {
00266 sensix.sensing.Collection c = agg.collectors()[i];
00267 if (c.identifier() == Sensix.THETA)
00268 res += "over TimeSeries ";
00269 else if (c.identifier() == Sensix.PSI)
00270 res += "over SpatialSeries ";
00271 res += "of " +
00272 SenseUtil.capabilityToString(c.sense().sensor());
00273 res += ": ";
00274 Data d = agg.results()[0];
00275 if (SenseUtil.isIntegerData(d.discriminator()))
00276 res += d.iresult();
00277 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00278 res += d.fresult();
00279 else
00280 res += "(Invalid, impossible result)";
00281 if (i < agg.collectors().length - 1)
00282 res += " and ";
00283 }
00284 outputResults(res);
00285 }
00286 break;
00287 case Sensix.DELTA:
00288 {
00289 Delta agg = (Delta)f;
00290 String res = "Delta results ";
00291 for (int i = 0; i < agg.collectors().length; i++) {
00292 sensix.sensing.Collection c = agg.collectors()[i];
00293 if (c.identifier() == Sensix.THETA)
00294 res += "over TimeSeries ";
00295 else if (c.identifier() == Sensix.PSI)
00296 res += "over SpatialSeries ";
00297 res += "of " +
00298 SenseUtil.capabilityToString(c.sense().sensor());
00299 res += ": ";
00300 Data d = agg.results()[0];
00301 if (SenseUtil.isIntegerData(d.discriminator()))
00302 res += d.iresult();
00303 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00304 res += d.fresult();
00305 else
00306 res += "(Invalid, impossible result)";
00307 if (i < agg.collectors().length - 1)
00308 res += " and ";
00309 }
00310 outputResults(res);
00311 }
00312 break;
00313 case Sensix.BARX:
00314 {
00315 Mean agg = (Mean)f;
00316 String res = "Mean results ";
00317 for (int i = 0; i < agg.collectors().length; i++) {
00318 sensix.sensing.Collection c = agg.collectors()[i];
00319 if (c.identifier() == Sensix.THETA)
00320 res += "over TimeSeries ";
00321 else if (c.identifier() == Sensix.PSI)
00322 res += "over SpatialSeries ";
00323 res += "of " +
00324 SenseUtil.capabilityToString(c.sense().sensor());
00325 res += ": ";
00326 Data d = agg.results()[0];
00327 if (SenseUtil.isIntegerData(d.discriminator()))
00328 res += d.iresult();
00329 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00330 res += d.fresult();
00331 else
00332 res += "(Invalid, impossible result)";
00333 if (i < agg.collectors().length - 1)
00334 res += " and ";
00335 }
00336 outputResults(res);
00337 }
00338 break;
00339 case Sensix.SIGMA:
00340 {
00341 Sigma agg = (Sigma)f;
00342 String res = "Sigma results ";
00343 for (int i = 0; i < agg.collectors().length; i++) {
00344 sensix.sensing.Collection c = agg.collectors()[i];
00345 if (c.identifier() == Sensix.THETA)
00346 res += "over TimeSeries ";
00347 else if (c.identifier() == Sensix.PSI)
00348 res += "over SpatialSeries ";
00349 res += "of " +
00350 SenseUtil.capabilityToString(c.sense().sensor());
00351 res += ": ";
00352 Data d = agg.results()[0];
00353 if (SenseUtil.isIntegerData(d.discriminator()))
00354 res += d.iresult();
00355 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00356 res += d.fresult();
00357 else
00358 res += "(Invalid, impossible result)";
00359 if (i < agg.collectors().length - 1)
00360 res += " and ";
00361 }
00362 outputResults(res);
00363 }
00364 break;
00365 case Sensix.LAMBDA:
00366 {
00367 Lambda agg = (Lambda)f;
00368 String res = "Lambda results: ";
00369 for (int i = 0; i < agg.collectors().length; i++) {
00370 sensix.sensing.Collection c = agg.collectors()[i];
00371 if (c.identifier() == Sensix.THETA)
00372 res += "over TimeSeries ";
00373 else if (c.identifier() == Sensix.PSI)
00374 res += "over SpatialSeries ";
00375 res += "of " +
00376 SenseUtil.capabilityToString(c.sense().sensor());
00377 res += ": ";
00378 Data d = agg.results()[0];
00379 if (SenseUtil.isIntegerData(d.discriminator()))
00380 res += d.iresult();
00381 else if (SenseUtil.isFloatingPointData(d.discriminator()))
00382 res += d.fresult();
00383 else
00384 res += "(Invalid, impossible result)";
00385 if (i < agg.collectors().length - 1)
00386 res += " and ";
00387 }
00388 outputResults(res);
00389 }
00390 break;
00391 default:
00392 log(Logger.LOG.ERROR, "Unknown functor");
00393 outputResults("Error: unknown functor");
00394 break;
00395 }
00396
00397 if (f.errors() != null) {
00398 for (int i = 0; i < f.errors().length; i++) {
00399 FunctorError err = f.errors()[i];
00400 String result = "";
00401
00402 if (err == new FunctorError(FunctorError.Refused))
00403 result += "Request refused ";
00404 else if (err == new FunctorError(FunctorError.Incapable))
00405 result += "Incapable of fulfilling request ";
00406 else
00407 return;
00408
00409 switch (f.identifier()) {
00410 case Sensix.ALPHA:
00411 result += "for Sense functor";
00412 break;
00413 case Sensix.BETA:
00414 result += "for PeakSense functor";
00415 break;
00416 case Sensix.THETA:
00417 result += "for TimeSeries functor";
00418 break;
00419 case Sensix.PSI:
00420 result += "for SpatialSeries functor";
00421 break;
00422 case Sensix.IOTA:
00423 result += "for Reort functor";
00424 break;
00425 case Sensix.SUMMA:
00426 result += "for Sum functor";
00427 break;
00428 case Sensix.DELTA:
00429 result += "for Delta functor";
00430 break;
00431 case Sensix.BARX:
00432 result += "for Mean functor";
00433 break;
00434 case Sensix.SIGMA:
00435 result += "for Sigma functor";
00436 break;
00437 case Sensix.LAMBDA:
00438 result += "for Lambda functor";
00439 break;
00440 default:
00441 result += "for invalid functor";
00442 break;
00443 }
00444 outputResults(result);
00445 }
00446 }
00447 f = null;
00448 }
00449
00450
00451 public void shutdown() {
00452 synchronized(this) {
00453 running = false;
00454 }
00455 super.shutdown();
00456 logger.log(Logger.LOG.INFO, "Shutdown");
00457 System.exit(0);
00458 }
00459
00460
00461 public void outputResults(String results_str) {
00462 if (client_socket != null) {
00463 logger.log(Logger.LOG.DEBUG, "Sending result: " + results_str);
00464 out.println(results_str);
00465 }
00466 else
00467 System.out.println(results_str);
00468 }
00469
00470
00471 public Functor validateCmd(String task_str) {
00472 if (task_str.equalsIgnoreCase("h") ||
00473 task_str.equalsIgnoreCase("?") ||
00474 task_str.equalsIgnoreCase("help")) {
00475 outputResults(service_usage());
00476 return null;
00477 }
00478 Sensory s = null;
00479 logger.log(Logger.LOG.DEBUG, "Task: " + task_str);
00480 try {
00481 s = SenseParser.senseParser(task_str);
00482 }
00483 catch (IOException e) {
00484 s = null;
00485 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00486 "Bad command string (buffer string reader failed)");
00487 }
00488 catch (TokenMgrError e) {
00489 s = null;
00490 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00491 "Failed lexical analysis (invalid user string)");
00492 }
00493 catch (sensix.sensing.ParseException e) {
00494 s = null;
00495 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00496 "Failed parsing (improper user string)");
00497 }
00498 catch (NumberFormatException e) {
00499 s = null;
00500 logger.log(Logger.LOG.ERROR, "Validating/Parsing user input - " +
00501 "Bad/malformed parameter");
00502 }
00503 return (Functor)s;
00504 }
00505
00506
00507 public String service_usage() {
00508 String prefix = " ";
00509 String use = " SENSIX task specification: (EBNF)\n";
00510 use += prefix + "task <- sense | aggregate\n";
00511 use += prefix + "sense <- s_type '(' s_params ')'\n";
00512 use += prefix + "collection <- c_type '(' c_params task ')'\n";
00513 use += prefix + "aggregate <- a_type '(' a_params collection ')'\n";
00514 use += prefix + "s_params <- {} | s_param ',' s_params*\n";
00515 use += prefix + "c_params <- {} | c_param ',' c_params*\n";
00516 use += prefix + "a_params <- {} | l_param ','\n";
00517 use += prefix + "s_type <- \"sense\" | \"peaksense\"\n";
00518 use += prefix + "c_type <- \"timeseries\" | \"spatialseries\"\n";
00519 use += prefix + "a_type <- \"recite\" | \"sum\" | \"delta\" | " +
00520 "\"mean\" | \"sigma\"\n";
00521 use += prefix + "l_param <- \"level\" \"=\" Integer\n";
00522 use += prefix + "s_param <- l_param | \"sensor\" \"=\" sensor | " +
00523 "\"rate\" \"=\" Double | \"threshold\" \"=\" Double\n";
00524 use += prefix + "c_param <- l_param | \"duration\" \"=\" Double | " +
00525 "\"angle\" \"=\" Double | \"distance\" \"=\" Double\n";
00526 use += prefix + "sensor <- \"light\" | \"temperature\" | " +
00527 "\"accelerometer\" | \"magnetometer\" | \"humidity\" | " +
00528 "\"GPS\" | \"battery\"\n";
00529 use += "\n";
00530 use += " Examples:\n";
00531 use += prefix + "mean(spatialseries(sense(sensor=temperature)))\n";
00532 use += prefix + " reports the average of all temperature " +
00533 "readings in the known network.\n";
00534 use += prefix + "sum(level=4,spatialseries(level=4, " +
00535 "mean(level=3,spatialseries(level=3, " +
00536 "delta(level=2,spatialseries(level=2, " +
00537 "(sigma(level=1,timeseries(level=1,duration=10," +
00538 "sense(level=1,sensor=magnetometer,rate=2))))))))))\n";
00539 use += prefix + " sums the averages of the maximum difference of " +
00540 "the standard deviations of magnetometer measurements over 5 " +
00541 "seconds by hierarchical level.\n";
00542 use += prefix + "sum(spatialseries(sense(level=1,sensor=light)))\n";
00543 use += prefix + " accumulates only leaf node light readings.\n";
00544 use += "\n";
00545 return use;
00546 }
00547
00548
00549
00550 public static void main(String[] args)
00551 {
00552 SenseUtil su = new SenseUtil(appname, "sensix-ctl.jar", args);
00553 SenseCtrl control = null;
00554 try {
00555 control = new SenseCtrl(su.id, su.level, su.debug_level);
00556 }
00557 catch (Exception e) {
00558 System.err.println(SenseCtrl.appname + ": Unhandled exception: ");
00559 e.printStackTrace(System.err);
00560 System.exit(1);
00561 }
00562
00563 control.log(Logger.LOG.INFO, "daemon started - node ID " + su.id +
00564 " at level " + su.level);
00565 }
00566 }