00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package gov.lanl.isr.sensix;
00036
00037 import java.util.*;
00038 import java.lang.Math;
00039 import gov.lanl.isr.sensix.discovery.DiscoveryService;
00040 import gov.lanl.isr.sensix.sensing.*;
00041 import org.omg.CORBA.*;
00042 import org.omg.PortableServer.*;
00043
00044
00045 public class SensixMarshalling
00046 {
00047 public static final byte MAGIC = (byte)0xff;
00048 public static final byte VERSION = (byte)0x01;
00049 public static final byte HEADER_SIZE = 5;
00050 public static final byte WORDSIZE = 4;
00051 public static final byte LITTLE_ENDIAN = (byte)0x80;
00052
00053 public static final byte METADATA = 1;
00054 public static final byte DATA = 2;
00055 public static final byte CMD = 3;
00056
00057 public static final int MAX_CMD_SIZE = 4096;
00058 public static final int REDISCOVER_TIME = 100000;
00059 public static final int REDISCOVER_RESET = (3 * REDISCOVER_TIME / 4);
00060
00061
00062 private Client client;
00063 private long last_ancestor, last_sibling, last_descendant;
00064 private Logger logger;
00065 private POA poa;
00066
00067 public SensixMarshalling(Client c, String app, byte debug) {
00068 client = c;
00069 poa = c.poa();
00070 last_ancestor = 0;
00071 last_sibling = 0;
00072 last_descendant = 0;
00073 logger = new Logger(app, debug);
00074 }
00075
00076
00077 public static byte[] i2b(int i) {
00078 return i2b(i, 4);
00079 }
00080
00081 public static byte[] i2b(int i, int size) {
00082 byte[] array = new byte[size];
00083 for (int j = 0; j < size; j++)
00084 array[j] = (byte)(i >> (8 * ((size - 1) - j)));
00085 return array;
00086 }
00087
00088 public static int b2i(byte[] array) {
00089 return b2i(array, 4);
00090 }
00091
00092 public static int b2i(byte[] array, int size) {
00093 int ret = 0;
00094 for (int j = 0; j < size; j++)
00095 ret += ((array[j] & 0xff) << (8 * ((size - 1) - j)));
00096 return ret;
00097 }
00098
00099 public static int b2i_le(byte[] array) {
00100 return b2i_le(array, 4);
00101 }
00102
00103 public static int b2i_le(byte[] array, int size) {
00104 int ret = 0;
00105 for (int j = 0; j < size; j++)
00106 ret += ((array[j] & 0xff) << (8 * j));
00107 return ret;
00108 }
00109
00110
00111 public static void short2byte_arraycopy(short[] shorts, int sidx,
00112 byte[] bytes, int bidx, int len) {
00113 for (int i = 0; i < len; i++)
00114 bytes[bidx + i] = (byte)(shorts[sidx + i] & 0xff);
00115 }
00116
00117 public static void byte2short_arraycopy(byte[] bytes, int bidx,
00118 short[] shorts, int sidx, int len) {
00119 for (int i = 0; i < len; i++)
00120 shorts[sidx + i] = (short)(bytes[bidx + i] & 0xff);
00121 }
00122
00123
00124
00125 public void log(Logger.LOG type, String msg) {
00126 logger.log(type, msg);
00127 }
00128
00129
00130 public byte[] load_header(int what, int data_size) {
00131 byte[] hdr = new byte[HEADER_SIZE];
00132 hdr[0] = MAGIC;
00133 hdr[1] = VERSION;
00134 hdr[2] = WORDSIZE;
00135
00136 hdr[3] = (byte)(what & 0xff);
00137 hdr[4] = (byte)(data_size & 0xff);
00138 return hdr;
00139 }
00140
00141
00142 public void xmit_data(Functor f) {
00143 xmit_data(f, false);
00144 }
00145
00146 public void xmit_data(Functor f, boolean peer) {
00147 log(Logger.LOG.DEBUG, "xmit data: --");
00148 Data[] results = f.results();
00149 int fsize = results.length;
00150 for (int i = 0; i < fsize; i++ ) {
00151 char d = results[i].discriminator();
00152 if (d == 'd' || d == 'i' || d == 'u' || d == 'x')
00153 log(Logger.LOG.DEBUG, "" + results[i].iresult());
00154 else
00155 log(Logger.LOG.DEBUG, "" + results[i].fresult());
00156 }
00157 log(Logger.LOG.DEBUG, " --");
00158
00159 Sensory s = (Sensory)f;
00160 int msg_size = (fsize + 2) * WORDSIZE + 1;
00161 byte[] msg = new byte[msg_size];
00162 int idx = 0;
00163
00164 byte[] id = i2b(client.identity());
00165 System.arraycopy(id, 0, msg, idx, WORDSIZE);
00166 idx += WORDSIZE;
00167
00168 byte[] req = i2b(0);
00169 System.arraycopy(req, 0, msg, idx, WORDSIZE);
00170 idx += WORDSIZE;
00171
00172 msg[idx] = SenseCorba.ENERGY;
00173 idx++;
00174
00175 byte[] e = i2b((int)(s.energyused() * 1000));
00176 System.arraycopy(e, 0, msg, idx, WORDSIZE);
00177 idx += WORDSIZE;
00178
00179 msg[idx] = SenseCorba.TIME;
00180 idx++;
00181 byte[] t = i2b((int)s.timeused());
00182 System.arraycopy(t, 0, msg, idx, WORDSIZE);
00183 idx += WORDSIZE;
00184
00185 msg[idx] = s.identifier();
00186 idx++;
00187 msg[idx] = (byte)(s.sequencer() & 0xff);
00188 idx++;
00189 byte[] s_bytes = i2b(fsize);
00190 System.arraycopy(s_bytes, 0, msg, idx, WORDSIZE);
00191 idx += WORDSIZE;
00192 for (int i = 0; i < fsize; i++) {
00193 byte[] d_bytes = null;
00194 char d = results[i].discriminator();
00195 if (d == 'd' || d == 'i' || d == 'u' || d == 'x')
00196 d_bytes = i2b((int)results[i].iresult());
00197 else
00198 d_bytes = i2b((int)results[i].fresult());
00199 System.arraycopy(d_bytes, 0, msg, idx, WORDSIZE);
00200 idx += WORDSIZE;
00201 }
00202
00203 if (peer)
00204 client.bcast(DATA, msg_size, msg);
00205 else
00206 client.send_up(DATA, msg_size, msg);
00207 }
00208
00209
00210 public void xmit_metadata(byte meta) {
00211 if (meta == SenseCorba.ANCESTORS) {
00212 long now = System.currentTimeMillis();
00213 if ((now - last_ancestor) > REDISCOVER_RESET) {
00214 int asize = ((client.discovery().ancestors_size + 1)
00215 * (WORDSIZE + 1)) + 1 + WORDSIZE;
00216 byte[] anc_msg = new byte[asize];
00217
00218 anc_msg[0] = meta;
00219 int idx = 1;
00220 int anc_size = client.discovery().ancestors_size + 1;
00221 byte[] sizebytes = i2b(anc_size);
00222 System.arraycopy(sizebytes, 0, anc_msg, idx, WORDSIZE);
00223 idx += WORDSIZE;
00224
00225
00226 anc_msg[idx] = WORDSIZE;
00227 idx++;
00228 byte[] id = i2b(client.identity());
00229 System.arraycopy(id, 0, anc_msg, idx, WORDSIZE);
00230 idx += WORDSIZE;
00231
00232
00233 for (int i = 0; i < client.discovery().ancestors_size; i++) {
00234 anc_msg[idx] = WORDSIZE;
00235 idx++;
00236 byte[] o_id =
00237 i2b(client.discovery().ancestors_seen[i]);
00238 System.arraycopy(o_id, 0, anc_msg, idx, WORDSIZE);
00239 idx += WORDSIZE;
00240 }
00241 client.send_dn(METADATA, asize, anc_msg);
00242 last_ancestor = now;
00243 }
00244 }
00245 else if (meta == SenseCorba.SIBLINGS) {
00246 long now = System.currentTimeMillis();
00247 if ((now - last_sibling) > REDISCOVER_RESET) {
00248 int cap_size = 0;
00249 SortedMap<Byte,Capability> capabilities = client.capabilities();
00250 if (capabilities != null)
00251 cap_size = capabilities.size();
00252 int ssize = WORDSIZE + 3 + cap_size;
00253 byte[] sib_msg = new byte[ssize];
00254
00255 sib_msg[0] = meta;
00256 sib_msg[1] = (byte)1;
00257 int idx = 2;
00258 byte[] id = i2b(client.identity());
00259 System.arraycopy(id, 0, sib_msg, idx, WORDSIZE);
00260 idx += WORDSIZE;
00261 sib_msg[idx] = (byte)(cap_size & 0xff);
00262 idx++;
00263 if (capabilities != null) {
00264 Iterator iter = capabilities.keySet().iterator();
00265 while (iter.hasNext()) {
00266 sib_msg[idx] = ((Byte)iter.next()).byteValue();
00267 idx++;
00268 }
00269 }
00270 client.bcast(METADATA, ssize, sib_msg);
00271 last_sibling = now;
00272 }
00273 }
00274 else if (meta == SenseCorba.DESCENDANTS) {
00275 long now = System.currentTimeMillis();
00276 if ((now - last_descendant) > REDISCOVER_RESET) {
00277 int cap_size = 0;
00278 SortedMap<Byte,Capability> capabilities = client.capabilities();
00279 if (capabilities != null)
00280 cap_size = capabilities.size();
00281 int dsize = (3 * WORDSIZE) + 3 + cap_size;
00282 byte[] desc_msg = new byte[dsize];
00283
00284 desc_msg[0] = meta;
00285 desc_msg[1] = (byte)1;
00286 int idx = 2;
00287 byte[] id = i2b(client.identity());
00288 System.arraycopy(id, 0, desc_msg, idx, WORDSIZE);
00289 idx += WORDSIZE;
00290 desc_msg[idx] = (byte)(cap_size & 0xff);
00291 idx++;
00292 if (capabilities != null) {
00293 Iterator iter = capabilities.keySet().iterator();
00294 while (iter.hasNext()) {
00295 desc_msg[idx] = ((Byte)iter.next()).byteValue();
00296 idx++;
00297 }
00298 }
00299 desc_msg[idx] = SenseCorba.ENERGY;
00300 idx++;
00301 byte[] en = i2b((int)(client.check_energy()
00302 * 1000));
00303 System.arraycopy(en, 0, desc_msg, idx, WORDSIZE);
00304 idx += WORDSIZE;
00305
00306 client.send_up(METADATA, dsize, desc_msg);
00307 last_descendant = now;
00308 }
00309 }
00310 }
00311
00312
00313 public void xmit_command(Functor f) {
00314 xmit_command(f, false);
00315 }
00316
00317 public void xmit_command(Functor f, boolean peer) {
00318 int hdr_size = WORDSIZE + 2;
00319 byte[] params = compile_params(f.identifier(), f);
00320 int msg_size = params.length + hdr_size;
00321 int idx = 0;
00322 byte[] msg = new byte[msg_size];
00323
00324 log(Logger.LOG.DEBUG, "xmit command");
00325 byte[] id = i2b(client.identity());
00326 System.arraycopy(id, 0, msg, idx, WORDSIZE);
00327 idx += WORDSIZE;
00328 msg[idx] = 0;
00329 idx++;
00330 msg[idx] = f.priority();
00331 idx++;
00332 System.arraycopy(params, 0, msg, hdr_size, params.length);
00333 if (peer)
00334 client.bcast(CMD, msg_size, msg);
00335 else
00336 client.send_dn(CMD, msg_size, msg);
00337 }
00338
00339 private byte[] compile_params(byte cmd, Functor f) {
00340 int size = 2;
00341 byte[] params = new byte[MAX_CMD_SIZE];
00342
00343 log(Logger.LOG.DEBUG, "compile params for " + f.asString());
00344 params[0] = cmd;
00345 params[1] = (byte)(f.sequencer() & 0xff);
00346
00347 switch(cmd) {
00348 case SenseCorba.ALPHA:
00349 {
00350 Sense s = SenseHelper.narrow(f);
00351 params[2] = 2;
00352 params[3] = SenseCorba.SENSOR;
00353 params[4] = s.sensor();
00354 params[5] = SenseCorba.FREQ;
00355 size += 4;
00356 int rt = (int)s.rate();
00357 if (rt == 0)
00358 rt = 1;
00359 byte[] rt_bytes = i2b(rt);
00360 System.arraycopy(rt_bytes, 0, params, size, WORDSIZE);
00361 size += WORDSIZE;
00362 }
00363 break;
00364 case SenseCorba.BETA:
00365 {
00366 PeakSense s = PeakSenseHelper.narrow(f);
00367 params[2] = 4;
00368 params[3] = SenseCorba.SENSOR;
00369 params[4] = s.sensor();
00370 params[5] = SenseCorba.FREQ;
00371 size += 4;
00372 int rt = (int)s.rate();
00373 if (rt == 0)
00374 rt = 1;
00375 byte[] rt_bytes = i2b(rt);
00376 System.arraycopy(rt_bytes, 0, params, size, WORDSIZE);
00377 size += WORDSIZE;
00378
00379 params[size] = SenseCorba.THR_PLUS;
00380 size++;
00381 byte[] ht = i2b((int)s.highthreshold());
00382 System.arraycopy(ht, 0, params, size, WORDSIZE);
00383 size += WORDSIZE;
00384 params[size] = SenseCorba.THR_MINUS;
00385 size++;
00386 byte[] lt = i2b((int)s.lowthreshold());
00387 System.arraycopy(lt, 0, params, size, WORDSIZE);
00388 size += WORDSIZE;
00389 }
00390 break;
00391 case SenseCorba.THETA:
00392 {
00393 TimeSeries s = TimeSeriesHelper.narrow(f);
00394 params[2] = 2;
00395 params[3] = SenseCorba.TIME;
00396 size += 2;
00397 byte[] dur = i2b((int)s.duration());
00398 System.arraycopy(dur, 0, params, size, WORDSIZE);
00399 size += WORDSIZE;
00400 byte[] subp = compile_params(s.subfunctors()[0].identifier(),
00401 s.subfunctors()[0]);
00402 System.arraycopy(subp, 0, params, size, subp.length);
00403 size += subp.length;
00404 }
00405 break;
00406 case SenseCorba.PSI:
00407 {
00408 SpatialSeries s = SpatialSeriesHelper.narrow(f);
00409 params[2] = 3;
00410 size++;
00411 params[size] = SenseCorba.DIST;
00412 size++;
00413 byte[] dist = i2b((int)s.distance());
00414 System.arraycopy(dist, 0, params, size, WORDSIZE);
00415 size += WORDSIZE;
00416 params[size] = SenseCorba.ANGLE;
00417 size++;
00418 byte[] angle = i2b((int)s.angle());
00419 System.arraycopy(angle, 0, params, size, WORDSIZE);
00420 size += WORDSIZE;
00421 byte[] subp = compile_params(s.subfunctors()[0].identifier(),
00422 s.subfunctors()[0]);
00423 System.arraycopy(subp, 0, params, size, subp.length);
00424 size += subp.length;
00425 }
00426 break;
00427 case SenseCorba.IOTA:
00428 case SenseCorba.SUMMA:
00429 case SenseCorba.DELTA:
00430 case SenseCorba.BARX:
00431 case SenseCorba.SIGMA:
00432 {
00433 Aggregate s = AggregateHelper.narrow(f);
00434 int subs = s.subfunctors().length;
00435 params[2] = (byte)(subs & 0xff);
00436 size++;
00437 for (int i = 0; i < subs; i++) {
00438 byte[] subp =
00439 compile_params(s.subfunctors()[i].identifier(),
00440 s.subfunctors()[i]);
00441 System.arraycopy(subp, 0, params, size, subp.length);
00442 size += subp.length;
00443 if (i < subs - 1) {
00444 params[size] = SenseCorba.AND;
00445 size++;
00446 }
00447 }
00448 }
00449 break;
00450 case SenseCorba.LAMBDA:
00451 log(Logger.LOG.ERROR,
00452 "Invalid attempt to marshal lambda aggregate");
00454 default:
00455 return null;
00456 }
00457
00458 byte[] retp = new byte[size];
00459 System.arraycopy(params, 0, retp, 0, size);
00460 return retp;
00461 }
00462
00463
00464
00465
00466 public byte parse_packet(byte[] msg, int len) {
00467 byte hdr_size = HEADER_SIZE;
00468 byte what = msg[3];
00469 byte data_size = msg[4];
00470 byte base_len = (byte)(msg[2] & ~LITTLE_ENDIAN);
00471 boolean convert = false;
00472
00473 if (msg[0] != MAGIC || msg[1] != VERSION)
00474 return -1 ;
00475 if ((msg[2] & LITTLE_ENDIAN) != 0)
00476 convert = true;
00477
00478 if (what == METADATA) {
00479 parse_metadata(base_len, data_size, msg, convert);
00480 }
00481 else if (what == DATA) {
00482 parse_data(base_len, data_size, msg, convert);
00483 }
00484 else if (what == CMD) {
00485 Functor f = null;
00486 try {
00487 f = parse_command(base_len, msg, data_size, convert);
00488 } catch (org.omg.CORBA.UserException e) {
00489 }
00490 if (f != null)
00491 client.apply(f, null);
00492 }
00493
00494 return what;
00495 }
00496
00497
00498 public int parse_metadata(int base, int size, byte[] msg,
00499 boolean convert) {
00500 int idx = HEADER_SIZE;
00501 if (msg == null)
00502 return -1;
00503
00504 switch(msg[idx]) {
00505 case SenseCorba.ANCESTORS:
00506 idx++;
00507 byte[] len_bytes = new byte[base];
00508 System.arraycopy(msg, idx, len_bytes, 0, base);
00509 int num_ancestors = 0;
00510 if (convert)
00511 num_ancestors = b2i_le(len_bytes, base);
00512 else
00513 num_ancestors = b2i(len_bytes, base);
00514 idx += base;
00515
00516 for (int i = 0; i < num_ancestors; i++) {
00517 int len = (msg[idx] & 0xff);
00518 idx++;
00519 byte[] id_bytes = new byte[len];
00520 System.arraycopy(msg, idx, id_bytes, 0, len);
00521 idx += len;
00522 int a_id = 0;
00523 if (convert)
00524 a_id = b2i_le(id_bytes, len);
00525 else
00526 a_id = b2i(len_bytes, len);
00527 boolean found = false;
00528 int asize = client.discovery().ancestors_size;
00529 for (int j = 0; j < asize; j++) {
00530 if (client.discovery().ancestors_seen[j] == a_id) {
00531 found = true;
00532 break;
00533 }
00534 }
00535 if (found == false && asize < DiscoveryService.MAX_PARENTS) {
00536 int aidx = client.discovery().ancestors_size;
00537 client.discovery().ancestors_seen[aidx] = a_id;
00538 client.discovery().ancestors_size++;
00539 log(Logger.LOG.DEBUG, "Added ancestor " + a_id);
00540 }
00541 }
00542 xmit_metadata(SenseCorba.DESCENDANTS);
00543 break;
00544
00545 case SenseCorba.SIBLINGS:
00546 {
00547 idx++;
00548 idx++;
00549 byte[] id_bytes = new byte[base];
00550 System.arraycopy(msg, idx, id_bytes, 0, base);
00551 idx += base;
00552 int s_id = 0;
00553 if (convert)
00554 s_id = b2i_le(id_bytes, base);
00555 else
00556 s_id = b2i(id_bytes, base);
00557
00558 boolean found = false;
00559 int ssize = client.discovery().siblings_size;
00560 for (int i = 0; i < ssize; i++) {
00561 if (client.discovery().siblings_seen[i] == s_id) {
00562 found = true;
00563 break;
00564 }
00565 }
00566 if (found == false && ssize < DiscoveryService.MAX_SIBLINGS) {
00567 int sidx = client.discovery().siblings_size;
00568 client.discovery().siblings_seen[sidx] = s_id;
00569 client.discovery().siblings_size++;
00570 log(Logger.LOG.DEBUG, "Added sibling " + s_id);
00571 }
00572
00573 int cap_size = (msg[idx] & 0xff);
00574 idx++;
00575 for (int j = 0; j < cap_size; j++) {
00576 byte s_cap = msg[idx];
00577 idx++;
00578
00579 found = false;
00580 for (int k = 0; k < client.discovery().sib_db_size; k++) {
00581 if (client.discovery().sib_database[k].id == s_id &&
00582 client.discovery().sib_database[k].type == s_cap) {
00583 found = true;
00584 break;
00585 }
00586 }
00587 if (found == false) {
00588 int sidx = client.discovery().sib_db_size;
00589 client.discovery().sib_database[sidx] =
00590 client.discovery().newObjRef();
00591 client.discovery().sib_database[sidx].id = s_id;
00592 client.discovery().sib_database[sidx].type = s_cap;
00593 client.discovery().sib_db_size++;
00594 log(Logger.LOG.DEBUG, "Added sibling capability " +
00595 SenseUtil.capabilityToString(s_cap));
00596 }
00597 }
00598 xmit_metadata(SenseCorba.SIBLINGS);
00599 }
00600 break;
00601
00602 case SenseCorba.DESCENDANTS:
00603 {
00604 idx++;
00605 idx++;
00606 byte[] id_bytes = new byte[base];
00607 System.arraycopy(msg, idx, id_bytes, 0, base);
00608 idx += base;
00609 int d_id = 0;
00610 if (convert)
00611 d_id = b2i_le(id_bytes, base);
00612 else
00613 d_id = b2i(id_bytes, base);
00614
00615 boolean found = false;
00616 int dsize = client.discovery().descendants_size;
00617 for (int i = 0; i < dsize; i++) {
00618 if (client.discovery().descendants_seen[i] == d_id) {
00619 found = true;
00620 break;
00621 }
00622 }
00623 if (found == false && dsize < DiscoveryService.MAX_CHILDREN) {
00624 int didx = client.discovery().descendants_size;
00625 client.discovery().descendants_seen[didx] = d_id;
00626 client.discovery().descendants_size++;
00627 log(Logger.LOG.DEBUG, "Added descendant " + d_id);
00628 }
00629
00630 int cap_size = (msg[idx] & 0xff);
00631 idx++;
00632 for (int j = 0; j < cap_size; j++) {
00633 byte d_cap = msg[idx];
00634 idx++;
00635
00636 found = false;
00637 for (int k = 0; k < client.discovery().desc_db_size; k++) {
00638 if (client.discovery().desc_database[k].id == d_id &&
00639 client.discovery().desc_database[k].type == d_cap) {
00640 found = true;
00641 break;
00642 }
00643 }
00644 if (found == false) {
00645 int didx = client.discovery().desc_db_size;
00646 client.discovery().desc_database[didx] =
00647 client.discovery().newObjRef();
00648 client.discovery().desc_database[didx].id = d_id;
00649 client.discovery().desc_database[didx].type = d_cap;
00650 client.discovery().desc_db_size++;
00651 log(Logger.LOG.DEBUG, "Added descendant capability " +
00652 SenseUtil.capabilityToString(d_cap));
00653 }
00654 }
00655 if (msg[idx] != SenseCorba.ENERGY)
00656 break;
00657 idx++;
00658 byte[] energy_bytes = new byte[base];
00659 System.arraycopy(msg, idx, energy_bytes, 0, base);
00660 double energy = 0;
00661 if (convert)
00662 energy = b2i_le(energy_bytes, base);
00663 else
00664 energy = b2i(energy_bytes, base);
00665 energy = energy / 1000.0;
00666 idx += base;
00668 xmit_metadata(SenseCorba.ANCESTORS);
00669 }
00670 break;
00671 }
00672 return 0;
00673 }
00674
00675
00676 public int parse_data(int base, int size, byte[] msg, boolean convert) {
00677 int idx = HEADER_SIZE;
00678 byte[] id_bytes = new byte[base];
00679 System.arraycopy(msg, idx, id_bytes, 0, base);
00680 int node_id = 0;
00681 if (convert)
00682 node_id = b2i_le(id_bytes, base);
00683 else
00684 node_id = b2i(id_bytes, base);
00685 idx += base;
00686
00687 byte[] to_bytes = new byte[base];
00688 System.arraycopy(msg, idx, to_bytes, 0, base);
00689 int node_to = 0;
00690 if (convert)
00691 node_to = b2i_le(id_bytes, base);
00692 else
00693 node_to = b2i(id_bytes, base);
00694 idx += base;
00695 if (node_to != client.identity())
00696 return 0;
00697
00698 if (msg[idx] != SenseCorba.ENERGY)
00699 return -1;
00700 idx++;
00701 byte[] energy_bytes = new byte[base];
00702 System.arraycopy(msg, idx, energy_bytes, 0, base);
00703 double energy = 0;
00704 if (convert)
00705 energy = b2i_le(energy_bytes, base);
00706 else
00707 energy = b2i(energy_bytes, base);
00708 energy = energy / 1000.0;
00709 idx += base;
00710
00711 if (msg[idx] != SenseCorba.TIME)
00712 return -1;
00713 idx++;
00714 byte[] time_bytes = new byte[base];
00715 System.arraycopy(msg, idx, time_bytes, 0, base);
00716 long time = 0;
00717 if (convert)
00718 time = b2i_le(time_bytes, base);
00719 else
00720 time = b2i(time_bytes, base);
00721 idx += base;
00722
00723 int ftype = (msg[idx] & 0xff);
00724 int fid = (msg[idx+1] & 0xff);
00725 idx += 2;
00726
00727 byte[] count_bytes = new byte[base];
00728 System.arraycopy(msg, idx, count_bytes, 0, base);
00729 int num = 0;
00730 if (convert)
00731 num = b2i_le(count_bytes, base);
00732 else
00733 num = b2i(count_bytes, base);
00734 idx += base;
00735
00736 Data[] array = new Data[num];
00737 for (int i = 0; i < num; i++) {
00738 byte[] res_bytes = new byte[base];
00739 System.arraycopy(msg, idx, res_bytes, 0, base);
00740 int result = 0;
00741 if (convert)
00742 result = b2i_le(res_bytes, base);
00743 else
00744 result = b2i(res_bytes, base);
00745 idx += base;
00746
00747 array[i] = new Data();
00748 array[i].iresult(result);
00749 }
00750
00751 TaskImpl task = client.tracker().tree.get((int)fid);
00752 if (task == null)
00753 return -1;
00754 Functor f = task.f();
00755 if (f == null)
00756 return -1;
00757 Sensory target = SensoryHelper.narrow(f);
00758 target.energyused(energy + target.energyused());
00759 if (time < target.timeused())
00760 time = target.timeused();
00761 target.timeused(time);
00762 target.results(array);
00763 task.callback().aggregate(target);
00764 return 0;
00765 }
00766
00767
00768 public Functor parse_command(int base, byte[] msg, int size,
00769 boolean convert)
00770 throws org.omg.CORBA.UserException {
00771 int idx = 0;
00772
00773 log(Logger.LOG.DEBUG, "parse command");
00774 if (msg == null)
00775 return null;
00776
00777
00778 byte[] requestor = new byte[base];
00779 System.arraycopy(msg, idx, requestor, 0, base);
00780 idx += base;
00781 int bits = 0;
00782 boolean peer_req = (msg[idx] != 0);
00783 idx++;
00784 int priority = (msg[idx] & 0xff);
00785 idx++;
00786 int f_type = (msg[idx] & 0xff);
00787 idx++;
00788
00789 int f_id = (msg[idx] & 0xff);
00790 idx++;
00791 int num_params = (msg[idx] & 0xff);
00792 idx++;
00793 long f_t = System.currentTimeMillis();
00794 int f_e = (int)client.check_energy();
00796
00797 byte sensor = SenseCorba.INVALID;
00798 byte level = SenseCorba.INVALID;
00799 double rate = 0.0;
00800 double hi_threshold = Double.POSITIVE_INFINITY;
00801 double lo_threshold = Double.NEGATIVE_INFINITY;
00802 long duration = 0;
00803 double distance = 0.0;
00804 double angle = 0.0;
00805
00806 gov.lanl.isr.sensix.sensing.Collection[] collectors =
00807 new gov.lanl.isr.sensix.sensing.Collection[num_params];
00808 for (int i = 0; i < num_params; i++)
00809 collectors[i] = null;
00810 int cidx = 0;
00811
00812 for (int i = 0; i < num_params; i++) {
00813 int err;
00814 byte code = msg[idx];
00815 idx++;
00816
00817 switch(code) {
00818 case SenseCorba.AND:
00819 cidx++;
00820 break;
00821 case SenseCorba.LEVEL:
00822 level = msg[idx];
00823 idx++;
00824 break;
00825 case SenseCorba.SENSOR:
00826 sensor = msg[idx];
00827 idx++;
00828 break;
00829 case SenseCorba.FREQ:
00830 {
00831 byte[] rate_bytes = new byte[base];
00832 System.arraycopy(msg, idx, rate_bytes, 0, base);
00833 if (convert)
00834 rate = b2i_le(rate_bytes, base);
00835 else
00836 rate = b2i(rate_bytes, base);
00837 idx += base;
00838 }
00839 break;
00840 case SenseCorba.THR_PLUS:
00841 {
00842 byte[] thresh_bytes = new byte[base];
00843 System.arraycopy(msg, idx, thresh_bytes, 0, base);
00844 if (convert)
00845 hi_threshold = b2i_le(thresh_bytes,
00846 base);
00847 else
00848 hi_threshold = b2i(thresh_bytes, base);
00849 idx += base;
00850 }
00851 break;
00852 case SenseCorba.THR_MINUS:
00853 {
00854 byte[] thresh_bytes = new byte[base];
00855 System.arraycopy(msg, idx, thresh_bytes, 0, base);
00856 if (convert)
00857 lo_threshold = b2i_le(thresh_bytes,
00858 base);
00859 else
00860 lo_threshold = b2i(thresh_bytes, base);
00861 idx += base;
00862 }
00863 break;
00864 case SenseCorba.TIME:
00865 {
00866 byte[] time_bytes = new byte[base];
00867 System.arraycopy(msg, idx, time_bytes, 0, base);
00868 if (convert)
00869 duration = b2i_le(time_bytes, base);
00870 else
00871 duration = b2i(time_bytes, base);
00872 idx += base;
00873 }
00874 break;
00875 case SenseCorba.DIST:
00876 {
00877 byte[] dist_bytes = new byte[base];
00878 System.arraycopy(msg, idx, dist_bytes, 0, base);
00879 if (convert)
00880 distance = b2i_le(dist_bytes, base);
00881 else
00882 distance = b2i(dist_bytes, base);
00883 idx += base;
00884 }
00885 break;
00886 case SenseCorba.ANGLE:
00887 {
00888 byte[] angle_bytes = new byte[base];
00889 System.arraycopy(msg, idx, angle_bytes, 0, base);
00890 if (convert)
00891 angle = b2i_le(angle_bytes, base);
00892 else
00893 angle = b2i(angle_bytes, base);
00894 idx += base;
00895 }
00896 break;
00897 default:
00898 int nsize = size - idx;
00899 byte[] next = new byte[nsize];
00900 System.arraycopy(msg, idx, next, 0, nsize);
00901 Functor sub = parse_command(base, next, nsize, convert);
00902 if (sub == null)
00903 return null;
00904 collectors[cidx] = (gov.lanl.isr.sensix.sensing.Collection)sub;
00905 break;
00906 }
00907 }
00908
00909 Functor f = null;
00910 switch(f_type) {
00911 case SenseCorba.ALPHA:
00912 f = FunctorHelper.narrow(poa.servant_to_reference(
00913 new SenseImpl(level, sensor, rate)));
00914 break;
00915 case SenseCorba.BETA:
00916 f = FunctorHelper.narrow(poa.servant_to_reference(
00917 new PeakSenseImpl(level, sensor, rate,
00918 hi_threshold, lo_threshold)));
00919 break;
00920 case SenseCorba.THETA:
00921 f = FunctorHelper.narrow(poa.servant_to_reference(
00922 new TimeSeriesImpl(level, collectors[0], duration)));
00923 break;
00924 case SenseCorba.PSI:
00925 f = FunctorHelper.narrow(poa.servant_to_reference(
00926 new SpatialSeriesImpl(level, collectors[0],
00927 angle, distance)));
00928 break;
00929 case SenseCorba.IOTA:
00930 f = FunctorHelper.narrow(poa.servant_to_reference(
00931 new ReciteImpl(level, collectors)));
00932 break;
00933 case SenseCorba.SUMMA:
00934 f = FunctorHelper.narrow(poa.servant_to_reference(
00935 new SumImpl(level, collectors)));
00936 break;
00937 case SenseCorba.DELTA:
00938 f = FunctorHelper.narrow(poa.servant_to_reference(
00939 new DeltaImpl(level, collectors)));
00940 break;
00941 case SenseCorba.BARX:
00942 f = FunctorHelper.narrow(poa.servant_to_reference(
00943 new MeanImpl(level, collectors)));
00944 break;
00945 case SenseCorba.SIGMA:
00946 f = FunctorHelper.narrow(poa.servant_to_reference(
00947 new SigmaImpl(level, collectors)));
00948 break;
00949 case SenseCorba.LAMBDA:
00950 f = FunctorHelper.narrow(poa.servant_to_reference(
00951 new LambdaImpl(level, collectors)));
00953
00954
00955 default:
00956 return null;
00957 }
00958 return f;
00959 }
00960 }