00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package sensix;
00036
00037 import java.util.*;
00038 import java.lang.Math;
00039 import sensix.sensing.*;
00040
00041
00042 public class SensixMarshalling
00043 {
00044 public static final byte MAGIC = (byte)0xff;
00045 public static final byte VERSION = (byte)0x01;
00046 public static final byte HEADER_SIZE = 5;
00047 public static final byte WORDSIZE = (byte)4;
00048 public static final byte LITTLE_ENDIAN = (byte)0x80;
00049
00050 public static final byte METADATA = (byte)1;
00051 public static final byte DATA = (byte)2;
00052 public static final byte CMD = (byte)3;
00053
00054 public static final int MAX_CMD_SIZE = 4096;
00055 public static final int REDISCOVER_TIME = 100000;
00056 public static final int REDISCOVER_RESET = (3 * REDISCOVER_TIME / 4);
00057
00058 public static final int MAX_PARENTS = 128;
00059 public static final int MAX_CHILDREN = 512;
00060 public static final int MAX_SIBLINGS = 128;
00061 public static final int MAX_PEERS = (MAX_CHILDREN + MAX_SIBLINGS);
00062 public static final int MAX_CAPABLE = 32;
00063
00064
00065 public static byte[] i2b(int i) {
00066 return i2b(i, 4);
00067 }
00068
00069 public static byte[] i2b(int i, int size) {
00070 byte[] b = new byte[size];
00071 for (int j = 0; j < size; j++)
00072 b[j] = (byte)(i >> (8 * ((size - 1) - j)));
00073 return b;
00074 }
00075
00076 public static int b2i(byte b[]) {
00077 return b2i(b, 4);
00078 }
00079
00080 public static int b2i(byte b[], int size) {
00081 int ret = 0;
00082 for (int j = 0; j < size; j++)
00083 ret += ((b[j] & 0x00000000ff) << (8 * ((size - 1) - j)));
00084 return ret;
00085 }
00086
00087 public static int b2i_le(byte b[]) {
00088 return b2i_le(b, 4);
00089 }
00090
00091 public static int b2i_le(byte b[], int size) {
00092 int ret = 0;
00093 for (int j = 0; j < size; j++)
00094 ret += ((b[j] & 0x00000000ff) << (8 * j));
00095 return ret;
00096 }
00097
00098
00099 public static void short2byte_arraycopy(short[] shorts, int sidx,
00100 byte[] bytes, int bidx, int len) {
00101 for (int i = 0; i < len; i++)
00102 bytes[bidx + i] = (byte)(shorts[sidx + i] & 0xff);
00103 }
00104
00105 public static void byte2short_arraycopy(byte[] bytes, int bidx,
00106 short[] shorts, int sidx, int len) {
00107 for (int i = 0; i < len; i++)
00108 shorts[sidx + i] = (short)(bytes[bidx + i] & 0xff);
00109 }
00110
00111
00112
00113 public class ObjectReference
00114 {
00115 public static final int SIZE = 1024;
00116
00117 public int id;
00118 public byte type;
00119 }
00120
00121
00122 public ObjectReference[] references;
00123 public int refs_size;
00124 public ObjectReference[] sib_database;
00125 public int sib_db_size;
00126 public ObjectReference[] desc_database;
00127 public int desc_db_size;
00128 public int[] ancestors_seen;
00129 public int ancestors_size;
00130 public int[] siblings_seen;
00131 public int siblings_size;
00132 public int[] descendants_seen;
00133 public int descendants_size;
00134 private Client client;
00135 private long last_ancestor, last_sibling, last_descendant;
00136 private Logger logger;
00137 private int identifier;
00138 private byte level;
00139
00140
00141 public SensixMarshalling(int id, byte lvl, Client c,
00142 String app, byte debug) {
00143 identifier = id;
00144 level = lvl;
00145 client = c;
00146 last_ancestor = 0;
00147 last_sibling = 0;
00148 last_descendant = 0;
00149 logger = new Logger(app, debug);
00150
00151 references = new ObjectReference[MAX_CAPABLE];
00152 refs_size = 0;
00153 sib_database = new ObjectReference[MAX_SIBLINGS];
00154 sib_db_size = 0;
00155 desc_database = new ObjectReference[MAX_CHILDREN];
00156 desc_db_size = 0;
00157
00158 ancestors_seen = new int[MAX_PARENTS];
00159 ancestors_size = 0;
00160 siblings_seen = new int[MAX_SIBLINGS];
00161 siblings_size = 0;
00162 descendants_seen = new int[MAX_CHILDREN];
00163 descendants_size = 0;
00164 }
00165
00166 public void log(Logger.LOG type, String msg) {
00167 logger.log(type, msg);
00168 }
00169
00170 public int identifier() {
00171 return identifier;
00172 }
00173
00174 public byte level() {
00175 return level;
00176 }
00177
00178
00179 public byte[] load_header(byte what, byte data_size) {
00180 byte hdr[] = new byte[HEADER_SIZE];
00181 hdr[0]= MAGIC;
00182 hdr[1] = VERSION;
00183 hdr[2] = WORDSIZE;
00184
00185 hdr[3] = what;
00186 hdr[4] = data_size;
00187 return hdr;
00188 }
00189
00190
00191 public void xmit_data(Functor f) {
00192 xmit_data(f, false);
00193 }
00194
00195 public void xmit_data(Functor f, boolean peer) {
00196 String res_str = "";
00197 Data[] results = f.results();
00198 int fsize = results.length;
00199 for (int i = 0; i < fsize; i++ ) {
00200 char d = results[i].discriminator();
00201 if (d == 'd' || d == 'i' || d == 'u' || d == 'x')
00202 res_str += results[i].iresult() + " ";
00203 else
00204 res_str += results[i].fresult() + " ";
00205 }
00206 log(Logger.LOG.DEBUG, "xmit data: " + res_str);
00207
00208 Sensory s = (Sensory)f;
00209 byte msg_size = (byte)((fsize + 2) * WORDSIZE + 1);
00210 byte[] msg = new byte[msg_size];
00211 int idx = 0;
00212
00213 byte[] id = i2b(client.identity());
00214 System.arraycopy(id, 0, msg, idx, WORDSIZE);
00215 idx += WORDSIZE;
00216
00217
00218
00219
00220 idx += WORDSIZE;
00221
00222 msg[idx] = Sensix.ENERGY;
00223 idx++;
00224
00225 byte[] e = i2b((int)(s.energyused() * 1000));
00226 System.arraycopy(e, 0, msg, idx, WORDSIZE);
00227 idx += WORDSIZE;
00228
00229 msg[idx] = Sensix.TIME;
00230 idx++;
00231 byte[] t = i2b((int)s.timeused());
00232 System.arraycopy(t, 0, msg, idx, WORDSIZE);
00233 idx += WORDSIZE;
00234
00235 msg[idx] = s.identifier();
00236 idx++;
00237 msg[idx] = (byte)s.sequencer();
00238 idx++;
00239 byte[] s_bytes = i2b(fsize);
00240 System.arraycopy(s_bytes, 0, msg, idx, WORDSIZE);
00241 idx += WORDSIZE;
00242 for (int i = 0; i < fsize; i++) {
00243 byte[] d_bytes = null;
00244 char d = results[i].discriminator();
00245 if (d == 'd' || d == 'i' || d == 'u' || d == 'x')
00246 d_bytes = i2b((int)results[i].iresult());
00247 else
00248 d_bytes = i2b((int)results[i].fresult());
00249 System.arraycopy(d_bytes, 0, msg, idx, WORDSIZE);
00250 idx += WORDSIZE;
00251 }
00252
00253 if (peer)
00254 client.bcast(DATA, msg_size, msg);
00255 else
00256 client.send_up(DATA, msg_size, msg);
00257 }
00258
00259
00260 public void xmit_metadata(byte meta) {
00261 if (meta == Sensix.ANCESTORS) {
00262 long now = System.currentTimeMillis();
00263 if ((now - last_ancestor) > REDISCOVER_RESET) {
00264 byte asize = (byte)(((ancestors_size + 1)
00265 * (WORDSIZE + 1)) + 1 + WORDSIZE);
00266 byte[] anc_msg = new byte[asize];
00267
00268 anc_msg[0] = meta;
00269 int idx = 1;
00270 int anc_size = ancestors_size + 1;
00271 byte[] sizebytes = i2b(anc_size);
00272 System.arraycopy(sizebytes, 0, anc_msg, idx, WORDSIZE);
00273 idx += WORDSIZE;
00274
00275
00276 anc_msg[idx] = WORDSIZE;
00277 idx++;
00278 byte[] id = i2b(client.identity());
00279 System.arraycopy(id, 0, anc_msg, idx, WORDSIZE);
00280 idx += WORDSIZE;
00281
00282
00283 for (int i = 0; i < ancestors_size; i++) {
00284 anc_msg[idx] = WORDSIZE;
00285 idx++;
00286 byte[] o_id = i2b(ancestors_seen[i]);
00287 System.arraycopy(o_id, 0, anc_msg, idx, WORDSIZE);
00288 idx += WORDSIZE;
00289 }
00290 client.send_dn(METADATA, asize, anc_msg);
00291 last_ancestor = now;
00292 }
00293 }
00294 else if (meta == Sensix.SIBLINGS) {
00295 long now = System.currentTimeMillis();
00296 if ((now - last_sibling) > REDISCOVER_RESET) {
00297 int cap_size = 0;
00298 SortedMap<Byte,Capability> capabilities =
00299 client.capabilities();
00300 if (capabilities != null)
00301 cap_size = capabilities.size();
00302 byte ssize = (byte)(WORDSIZE + 3 + cap_size);
00303 byte[] sib_msg = new byte[ssize];
00304
00305 sib_msg[0] = meta;
00306 sib_msg[1] = (byte)1;
00307 int idx = 2;
00308 byte[] id = i2b(client.identity());
00309 System.arraycopy(id, 0, sib_msg, idx, WORDSIZE);
00310 idx += WORDSIZE;
00311 sib_msg[idx] = (byte)cap_size;
00312 idx++;
00313 if (capabilities != null) {
00314 Iterator iter = capabilities.keySet().iterator();
00315 while (iter.hasNext()) {
00316 sib_msg[idx] = ((Byte)iter.next()).byteValue();
00317 idx++;
00318 }
00319 }
00320 client.bcast(METADATA, ssize, sib_msg);
00321 last_sibling = now;
00322 }
00323 }
00324 else if (meta == Sensix.DESCENDANTS) {
00325 long now = System.currentTimeMillis();
00326 if ((now - last_descendant) > REDISCOVER_RESET) {
00327 int cap_size = 0;
00328 SortedMap<Byte,Capability> capabilities =
00329 client.capabilities();
00330 if (capabilities != null)
00331 cap_size = capabilities.size();
00332 int dsize = (3 * WORDSIZE) + 3 + cap_size;
00333 byte desc_msg[] = new byte[dsize];
00334
00335 desc_msg[0] = meta;
00336 desc_msg[1] = (byte)1;
00337 int idx = 2;
00338 byte[] id = i2b(client.identity());
00339 System.arraycopy(id, 0, desc_msg, idx, WORDSIZE);
00340 idx += WORDSIZE;
00341 desc_msg[idx] = (byte)cap_size;
00342 idx++;
00343 if (capabilities != null) {
00344 Iterator iter = capabilities.keySet().iterator();
00345 while (iter.hasNext()) {
00346 desc_msg[idx] = ((Byte)iter.next()).byteValue();
00347 idx++;
00348 }
00349 }
00350 desc_msg[idx] = Sensix.ENERGY;
00351 idx++;
00352 byte[] en = i2b((int)(client.check_energy() * 1000));
00353 System.arraycopy(en, 0, desc_msg, idx, WORDSIZE);
00354 idx += WORDSIZE;
00355
00356 client.send_up(METADATA, dsize, desc_msg);
00357 last_descendant = now;
00358 }
00359 }
00360 }
00361
00362
00363 public void xmit_command(Functor f) {
00364 xmit_command(f, false);
00365 }
00366
00367 public void xmit_command(Functor f, boolean peer) {
00368 int hdr_size = WORDSIZE + 2;
00369 byte[] params = compile_params(f.identifier(), f);
00370 int msg_size = params.length + hdr_size;
00371 int idx = 0;
00372 byte[] msg = new byte[msg_size];
00373
00374 log(Logger.LOG.DEBUG, "xmit command");
00375 byte[] id = i2b(client.identity());
00376 System.arraycopy(id, 0, msg, idx, WORDSIZE);
00377 idx += WORDSIZE;
00378 msg[idx] = 0;
00379 idx++;
00380 msg[idx] = f.priority();
00381 idx++;
00382 System.arraycopy(params, 0, msg, hdr_size, params.length);
00383 if (peer)
00384 client.bcast(CMD, msg_size, msg);
00385 else
00386 client.send_dn(CMD, msg_size, msg);
00387 }
00388
00389 private byte[] compile_params(byte cmd, Functor f) {
00390 int size = 2;
00391 byte params[] = new byte[MAX_CMD_SIZE];
00392
00393 log(Logger.LOG.DEBUG, "compile params for " + f.asString());
00394 params[0] = cmd;
00395 params[1] = (byte)f.sequencer();
00396
00397 switch(cmd) {
00398 case Sensix.ALPHA:
00399 {
00400 Sense s = (Sense)f;
00401 params[2] = 2;
00402 params[3] = Sensix.SENSOR;
00403 params[4] = s.sensor();
00404 params[5] = Sensix.FREQ;
00405 size += 4;
00406 int rt = (int)s.rate();
00407 if (rt == 0)
00408 rt = 1;
00409 byte[] rt_bytes = i2b(rt);
00410 System.arraycopy(rt_bytes, 0, params, size, WORDSIZE);
00411 size += WORDSIZE;
00412 }
00413 break;
00414 case Sensix.BETA:
00415 {
00416 PeakSense s = (PeakSense)f;
00417 params[2] = 4;
00418 params[3] = Sensix.SENSOR;
00419 params[4] = s.sensor();
00420 params[5] = Sensix.FREQ;
00421 size += 4;
00422 int rt = (int)s.rate();
00423 if (rt == 0)
00424 rt = 1;
00425 byte[] rt_bytes = i2b(rt);
00426 System.arraycopy(rt_bytes, 0, params, size, WORDSIZE);
00427 size += WORDSIZE;
00428
00429 params[size] = Sensix.THR_PLUS;
00430 size++;
00431 byte[] ht = i2b((int)s.highthreshold());
00432 System.arraycopy(ht, 0, params, size, WORDSIZE);
00433 size += WORDSIZE;
00434 params[size] = Sensix.THR_MINUS;
00435 size++;
00436 byte[] lt = i2b((int)s.lowthreshold());
00437 System.arraycopy(lt, 0, params, size, WORDSIZE);
00438 size += WORDSIZE;
00439 }
00440 break;
00441 case Sensix.THETA:
00442 {
00443 TimeSeries s = (TimeSeries)f;
00444 params[2] = 2;
00445 params[3] = Sensix.TIME;
00446 size += 2;
00447 byte[] dur = i2b((int)s.duration());
00448 System.arraycopy(dur, 0, params, size, WORDSIZE);
00449 size += WORDSIZE;
00450 byte[] subp = compile_params(s.subfunctors()[0].identifier(),
00451 s.subfunctors()[0]);
00452 System.arraycopy(subp, 0, params, size, subp.length);
00453 size += subp.length;
00454 }
00455 break;
00456 case Sensix.PSI:
00457 {
00458 SpatialSeries s = (SpatialSeries)f;
00459 params[2] = 3;
00460 size++;
00461 params[size] = Sensix.DIST;
00462 size++;
00463 byte[] dist = i2b((int)s.distance());
00464 System.arraycopy(dist, 0, params, size, WORDSIZE);
00465 size += WORDSIZE;
00466 params[size] = Sensix.ANGLE;
00467 size++;
00468 byte[] angle = i2b((int)s.angle());
00469 System.arraycopy(angle, 0, params, size, WORDSIZE);
00470 size += WORDSIZE;
00471 byte[] subp = compile_params(s.subfunctors()[0].identifier(),
00472 s.subfunctors()[0]);
00473 System.arraycopy(subp, 0, params, size, subp.length);
00474 size += subp.length;
00475 }
00476 break;
00477 case Sensix.IOTA:
00478 case Sensix.SUMMA:
00479 case Sensix.DELTA:
00480 case Sensix.BARX:
00481 case Sensix.SIGMA:
00482 {
00483 Aggregate s = (Aggregate)f;
00484 int subs = s.subfunctors().length;
00485 params[2] = (byte)subs;
00486 size++;
00487 for (int i = 0; i < subs; i++) {
00488 byte[] subp =
00489 compile_params(s.subfunctors()[i].identifier(),
00490 s.subfunctors()[i]);
00491 System.arraycopy(subp, 0, params, size, subp.length);
00492 size += subp.length;
00493 if (i < subs - 1) {
00494 params[size] = Sensix.AND;
00495 size++;
00496 }
00497 }
00498 }
00499 break;
00500 case Sensix.LAMBDA:
00501 log(Logger.LOG.ERROR,
00502 "Invalid attempt to marshal lambda aggregate");
00504 default:
00505 return null;
00506 }
00507 return params;
00508 }
00509
00510
00511
00512
00513 public byte parse_packet(byte[] msg, int len) {
00514 byte hdr_size = HEADER_SIZE;
00515 byte what = msg[3];
00516 byte data_size = msg[4];
00517 byte base_len = (byte)(msg[2] & ~LITTLE_ENDIAN);
00518 boolean convert = false;
00519
00520 log(Logger.LOG.DEBUG, "parse packet" );
00521 if (msg[0] != MAGIC || msg[1] != VERSION)
00522 return -1 ;
00523 if ((msg[2] & LITTLE_ENDIAN) != 0)
00524 convert = true;
00525
00526 if (what == METADATA) {
00527 parse_metadata(base_len, data_size, msg, convert);
00528 }
00529 else if (what == DATA) {
00530 parse_data(base_len, data_size, msg, convert);
00531 }
00532 else if (what == CMD) {
00533 Functor f = parse_command(base_len, msg, data_size, convert);
00534 if (f != null)
00535 client.apply(f);
00536 }
00537 return what;
00538 }
00539
00540
00541 private int parse_metadata(int base, int size, byte[] msg,
00542 boolean convert) {
00543 int idx = HEADER_SIZE;
00544 if (msg == null)
00545 return -1;
00546
00547 switch(msg[idx]) {
00548 case Sensix.ANCESTORS:
00549 idx++;
00550 byte[] len_bytes = new byte[base];
00551 System.arraycopy(msg, idx, len_bytes, 0, base);
00552 int num_ancestors = 0;
00553 if (convert)
00554 num_ancestors = b2i_le(len_bytes, base);
00555 else
00556 num_ancestors = b2i(len_bytes, base);
00557 idx += base;
00558
00559 for (int i = 0; i < num_ancestors; i++) {
00560 int len = (msg[idx] & 0xff);
00561 idx++;
00562 byte[] id_bytes = new byte[len];
00563 System.arraycopy(msg, idx, id_bytes, 0, len);
00564 idx += len;
00565 int a_id = 0;
00566 if (convert)
00567 a_id = b2i_le(id_bytes, len);
00568 else
00569 a_id = b2i(len_bytes, len);
00570 boolean found = false;
00571 int asize = ancestors_size;
00572 for (int j = 0; j < asize; j++) {
00573 if (ancestors_seen[j] == a_id) {
00574 found = true;
00575 break;
00576 }
00577 }
00578 if (found == false && asize < MAX_PARENTS) {
00579 int aidx = ancestors_size;
00580 ancestors_seen[aidx] = a_id;
00581 ancestors_size++;
00582 log(Logger.LOG.DEBUG, "Added ancestor " + a_id);
00583 }
00584 }
00585 xmit_metadata(Sensix.DESCENDANTS);
00586 break;
00587
00588 case Sensix.SIBLINGS:
00589 {
00590 idx++;
00591 idx++;
00592 byte[] id_bytes = new byte[base];
00593 System.arraycopy(msg, idx, id_bytes, 0, base);
00594 idx += base;
00595 int s_id = 0;
00596 if (convert)
00597 s_id = b2i_le(id_bytes, base);
00598 else
00599 s_id = b2i(id_bytes, base);
00600
00601 boolean found = false;
00602 int ssize = siblings_size;
00603 for (int i = 0; i < ssize; i++) {
00604 if (siblings_seen[i] == s_id) {
00605 found = true;
00606 break;
00607 }
00608 }
00609 if (found == false && ssize < MAX_SIBLINGS) {
00610 int sidx = siblings_size;
00611 siblings_seen[sidx] = s_id;
00612 siblings_size++;
00613 log(Logger.LOG.DEBUG, "Added sibling " + s_id);
00614 }
00615
00616 int cap_size = (msg[idx] & 0xff);
00617 idx++;
00618 for (int j = 0; j < cap_size; j++) {
00619 byte s_cap = msg[idx];
00620 idx++;
00621
00622 found = false;
00623 for (int k = 0; k < sib_db_size; k++) {
00624 if (sib_database[k].id == s_id &&
00625 sib_database[k].type == s_cap) {
00626 found = true;
00627 break;
00628 }
00629 }
00630 if (found == false) {
00631 int sidx = sib_db_size;
00632 sib_database[sidx] =
00633 new ObjectReference();
00634 sib_database[sidx].id = s_id;
00635 sib_database[sidx].type = s_cap;
00636 sib_db_size++;
00637 log(Logger.LOG.DEBUG, "Added sibling capability " +
00638 SenseUtil.capabilityToString(s_cap));
00639 }
00640 }
00641 xmit_metadata(Sensix.SIBLINGS);
00642 }
00643 break;
00644
00645 case Sensix.DESCENDANTS:
00646 {
00647 idx++;
00648 idx++;
00649 byte[] id_bytes = new byte[base];
00650 System.arraycopy(msg, idx, id_bytes, 0, base);
00651 idx += base;
00652 int d_id = 0;
00653 if (convert)
00654 d_id = b2i_le(id_bytes, base);
00655 else
00656 d_id = b2i(id_bytes, base);
00657
00658 boolean found = false;
00659 int dsize = descendants_size;
00660 for (int i = 0; i < dsize; i++) {
00661 if (descendants_seen[i] == d_id) {
00662 found = true;
00663 break;
00664 }
00665 }
00666 if (found == false && dsize < MAX_CHILDREN) {
00667 int didx = descendants_size;
00668 descendants_seen[didx] = d_id;
00669 descendants_size++;
00670 log(Logger.LOG.DEBUG, "Added descendant " + d_id);
00671 }
00672
00673 int cap_size = (msg[idx] & 0xff);
00674 idx++;
00675 for (int j = 0; j < cap_size; j++) {
00676 byte d_cap = msg[idx];
00677 idx++;
00678
00679 found = false;
00680 for (int k = 0; k < desc_db_size; k++) {
00681 if (desc_database[k].id == d_id &&
00682 desc_database[k].type == d_cap) {
00683 found = true;
00684 break;
00685 }
00686 }
00687 if (found == false) {
00688 int didx = desc_db_size;
00689 desc_database[didx] =
00690 new ObjectReference();
00691 desc_database[didx].id = d_id;
00692 desc_database[didx].type = d_cap;
00693 desc_db_size++;
00694 log(Logger.LOG.DEBUG, "Added descendant capability " +
00695 SenseUtil.capabilityToString(d_cap));
00696 }
00697 }
00698
00699 if (msg[idx] != Sensix.ENERGY)
00700 break;
00701 idx++;
00702 byte[] energy_bytes = new byte[base];
00703 System.arraycopy(msg, idx, energy_bytes, 0, base);
00704 double energy = 0;
00705 if (convert)
00706 energy = b2i_le(energy_bytes, base);
00707 else
00708 energy = b2i(energy_bytes, base);
00709 energy = energy / 1000.0;
00710 idx += base;
00712 xmit_metadata(Sensix.ANCESTORS);
00713 }
00714 break;
00715 }
00716 return 0;
00717 }
00718
00719
00720 private int parse_data(int base, int size, byte[] msg, boolean convert) {
00721 int idx = HEADER_SIZE;
00722 byte[] id_bytes = new byte[base];
00723 System.arraycopy(msg, idx, id_bytes, 0, base);
00724 int node_id = 0;
00725 if (convert)
00726 node_id = b2i_le(id_bytes, base);
00727 else
00728 node_id = b2i(id_bytes, base);
00729 idx += base;
00730
00731 byte[] to_bytes = new byte[base];
00732 System.arraycopy(msg, idx, to_bytes, 0, base);
00733 int node_to = 0;
00734 if (convert)
00735 node_to = b2i_le(id_bytes, base);
00736 else
00737 node_to = b2i(id_bytes, base);
00738 idx += base;
00739 if (node_to != client.identity())
00740 return 0;
00741
00742 if (msg[idx] != Sensix.ENERGY)
00743 return -1;
00744 idx++;
00745 byte[] energy_bytes = new byte[base];
00746 System.arraycopy(msg, idx, energy_bytes, 0, base);
00747 double energy = 0;
00748 if (convert)
00749 energy = b2i_le(energy_bytes, base);
00750 else
00751 energy = b2i(energy_bytes, base);
00752 energy = energy / 1000.0;
00753 idx += base;
00754
00755 if (msg[idx] != Sensix.TIME)
00756 return -1;
00757 idx++;
00758 byte[] time_bytes = new byte[base];
00759 System.arraycopy(msg, idx, time_bytes, 0, base);
00760 long time = 0;
00761 if (convert)
00762 time = b2i_le(time_bytes, base);
00763 else
00764 time = b2i(time_bytes, base);
00765 idx += base;
00766
00767 int ftype = (msg[idx] & 0xff);
00768 int fid = (msg[idx+1] & 0xff);
00769 idx += 2;
00770
00771 byte[] count_bytes = new byte[base];
00772 System.arraycopy(msg, idx, count_bytes, 0, base);
00773 int num = 0;
00774 if (convert)
00775 num = b2i_le(count_bytes, base);
00776 else
00777 num = b2i(count_bytes, base);
00778 idx += base;
00779
00780 Data[] array = new Data[num];
00781 for (int i = 0; i < num; i++) {
00782 byte[] res_bytes = new byte[base];
00783 System.arraycopy(msg, idx, res_bytes, 0, base);
00784 int result = 0;
00785 if (convert)
00786 result = b2i_le(res_bytes, base);
00787 else
00788 result = b2i(res_bytes, base);
00789 idx += base;
00790
00791 array[i] = new Data();
00792 array[i].iresult(result);
00793 }
00794
00795 Task task = client.tracker().tree.get((int)fid);
00796 if (task == null)
00797 return -1;
00798 Sensory target = (Sensory)task.f();
00799 if (target == null)
00800 return -1;
00801 target.energyused(energy + target.energyused());
00802 if (time < target.timeused())
00803 time = target.timeused();
00804 target.timeused(time);
00805 target.results(array);
00806 task.callback().aggregate(target);
00807 return 0;
00808 }
00809
00810
00811 private Functor parse_command(int base, byte[] msg, int size,
00812 boolean convert) {
00813 int idx = 0;
00814 log(Logger.LOG.DEBUG, "parse command");
00815 if (msg == null)
00816 return null;
00817
00818 byte req_octet = (byte)base;
00819 byte[] requestor = new byte[base];
00820 System.arraycopy(msg, idx, requestor, 0, base);
00821 idx += base;
00822 byte bits = 0;
00823 boolean peer_req = (msg[idx] != 0);
00824 idx++;
00825 int priority = (msg[idx] & 0xff);
00826 idx++;
00827 int f_type = (msg[idx] & 0xff);
00828 idx++;
00829
00830 int f_id = (msg[idx] & 0xff);
00831 idx++;
00832 int num_params = (msg[idx] & 0xff);
00833 idx++;
00834 long f_t = System.currentTimeMillis();
00835 int f_e = (int)client.check_energy();
00837
00838 byte sensor = Sensix.INVALID;
00839 byte level = Sensix.INVALID;
00840 double rate = 0.0;
00841 double hi_threshold = Double.POSITIVE_INFINITY;
00842 double lo_threshold = Double.NEGATIVE_INFINITY;
00843 long duration = 0;
00844 double distance = 0.0;
00845 double angle = 0.0;
00846
00847 sensix.sensing.Collection[] collectors =
00848 new sensix.sensing.Collection[num_params];
00849 for (int i = 0; i < num_params; i++)
00850 collectors[i] = null;
00851 int cidx = 0;
00852
00853 for (int i = 0; i < num_params; i++) {
00854 int err;
00855 byte code = msg[idx];
00856 idx++;
00857
00858 switch(code) {
00859 case Sensix.AND:
00860 cidx++;
00861 break;
00862 case Sensix.LEVEL:
00863 level = msg[idx];
00864 idx++;
00865 break;
00866 case Sensix.SENSOR:
00867 sensor = msg[idx];
00868 idx++;
00869 break;
00870 case Sensix.FREQ:
00871 {
00872 byte[] rate_bytes = new byte[base];
00873 System.arraycopy(msg, idx, rate_bytes, 0, base);
00874 if (convert)
00875 rate = b2i_le(rate_bytes, base);
00876 else
00877 rate = b2i(rate_bytes, base);
00878 idx += base;
00879 }
00880 break;
00881 case Sensix.THR_PLUS:
00882 {
00883 byte[] thresh_bytes = new byte[base];
00884 System.arraycopy(msg, idx, thresh_bytes, 0, base);
00885 if (convert)
00886 hi_threshold = b2i_le(thresh_bytes, base);
00887 else
00888 hi_threshold = b2i(thresh_bytes, base);
00889 idx += base;
00890 }
00891 break;
00892 case Sensix.THR_MINUS:
00893 {
00894 byte[] thresh_bytes = new byte[base];
00895 System.arraycopy(msg, idx, thresh_bytes, 0, base);
00896 if (convert)
00897 lo_threshold = b2i_le(thresh_bytes, base);
00898 else
00899 lo_threshold = b2i(thresh_bytes, base);
00900 idx += base;
00901 }
00902 break;
00903 case Sensix.TIME:
00904 {
00905 byte[] time_bytes = new byte[base];
00906 System.arraycopy(msg, idx, time_bytes, 0, base);
00907 if (convert)
00908 duration = b2i_le(time_bytes, base);
00909 else
00910 duration = b2i(time_bytes, base);
00911 idx += base;
00912 }
00913 break;
00914 case Sensix.DIST:
00915 {
00916 byte[] dist_bytes = new byte[base];
00917 System.arraycopy(msg, idx, dist_bytes, 0, base);
00918 if (convert)
00919 distance = b2i_le(dist_bytes, base);
00920 else
00921 distance = b2i(dist_bytes, base);
00922 idx += base;
00923 }
00924 break;
00925 case Sensix.ANGLE:
00926 {
00927 byte[] angle_bytes = new byte[base];
00928 System.arraycopy(msg, idx, angle_bytes, 0, base);
00929 if (convert)
00930 angle = b2i_le(angle_bytes, base);
00931 else
00932 angle = b2i(angle_bytes, base);
00933 idx += base;
00934 }
00935 break;
00936 default:
00937 int nsize = size - idx;
00938 byte[] next = new byte[nsize];
00939 System.arraycopy(msg, idx, next, 0, nsize);
00940 Sensory sub =
00941 (Sensory)parse_command(base, next, nsize, convert);
00942 if (sub == null)
00943 return null;
00944 collectors[cidx] = (sensix.sensing.Collection)sub;
00945 break;
00946 }
00947 }
00948
00949 Sensory f = null;
00950 switch(f_type) {
00951 case Sensix.ALPHA:
00952 f = new Sense(level, sensor, rate);
00953 break;
00954 case Sensix.BETA:
00955 f = new PeakSense(level, sensor, rate, hi_threshold, lo_threshold);
00956 break;
00957 case Sensix.THETA:
00958 f = new TimeSeries(level, collectors[0], duration);
00959 break;
00960 case Sensix.PSI:
00961 f = new SpatialSeries(level, collectors[0], angle, distance);
00962 break;
00963 case Sensix.IOTA:
00964 f = new Recite(level, collectors);
00965 break;
00966 case Sensix.SUMMA:
00967 f = new Sum(level, collectors);
00968 break;
00969 case Sensix.DELTA:
00970 f = new Delta(level, collectors);
00971 break;
00972 case Sensix.BARX:
00973 f = new Mean(level, collectors);
00974 break;
00975 case Sensix.SIGMA:
00976 f = new Sigma(level, collectors);
00977 break;
00978 case Sensix.LAMBDA:
00979 f = new Lambda(level, collectors);
00981
00982
00983 default:
00984 return null;
00985 }
00986 return f;
00987 }
00988 }