00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <string.h>
00036
00037 #define MOTE
00038 #include "sensix.h"
00039 #include "sense_impl.h"
00040 #include "sense_endian.h"
00041 #include "mote_impl.h"
00042 #include "mote_sensix.h"
00043 #include "mote_networking.h"
00044 #include "mote_state.h"
00045
00046
00047 extern mote_state _node_state[];
00048
00049
00050 byte_t load_header(byte_t *hdr, byte_t what, byte_t data_size)
00051 {
00052 hdr[0] = PROTOCOL;
00053 hdr[1] = VERSION;
00054 hdr[2] = WORDSIZE;
00055 if (little_endian())
00056 hdr[2] |= 0x80;
00057 hdr[3] = what;
00058 hdr[4] = data_size;
00059
00060 return HEADER_SIZE;
00061 }
00062
00063
00064 void xmit_data(functor *f)
00065 {
00066 uint_t i;
00067 log_info("xmit data: ");
00068 for (i = 0; i < f->size; i++)
00069 log_info("%d ", f->results[i].data);
00070 log_info("\n");
00071
00072 if (!f->bits & functor_pass_thru) {
00073 uint_t my_identity = _node_state[f->node_idx].ident;
00074 unsigned int msg_size = (f->size + 2) * WORDSIZE + 1;
00075 byte_t msg[msg_size], *ptr;
00076
00077 ptr = msg;
00078 network_order(ptr, my_identity);
00079 ptr += WORDSIZE;
00080 memcpy(ptr, f->requestor, 4);
00081 ptr += 4;
00082
00083 ptr[0] = ENERGY;
00084 ptr++;
00085 network_order(ptr, energy_diff(f->f_e, check_energy()));
00086 ptr += WORDSIZE;
00087 ptr[0] = TIME;
00088 ptr++;
00089 network_order(ptr, time_diff(f->f_t, check_time()));
00090 ptr += WORDSIZE;
00091
00092 ptr[0] = f->f_type;
00093 if (f->bits & functor_peer_req)
00094 ptr[1] = f->f_id;
00095 else
00096 ptr[1] = f->task_id;
00097 ptr += 2;
00098
00099 network_order(ptr, f->size);
00100 ptr += WORDSIZE;
00101 for (i = 0; i < f->size; i++) {
00102 network_order(ptr, f->results[i].data);
00103 ptr += WORDSIZE;
00104 }
00105
00106 if (f->bits & functor_peer_req)
00107 bcast(DATA, msg_size, msg);
00108 else
00109 send_up(DATA, msg_size, msg);
00110 }
00111 }
00112
00113
00114 void xmit_datum(functor *f)
00115 {
00116 log_info("xmit datum: %d\n", f->result);
00117
00118 if (!f->bits & functor_pass_thru) {
00119 uint_t my_identity = _node_state[f->node_idx].ident;
00120 unsigned int msg_size = 2 * WORDSIZE + 2;
00121 byte_t msg[msg_size], *ptr;
00122 uint_t one = 1;
00123
00124 ptr = msg;
00125 network_order(ptr, my_identity);
00126 ptr += WORDSIZE;
00127 memcpy(ptr, f->requestor, 4);
00128 ptr += 4;
00129
00130 ptr[0] = ENERGY;
00131 ptr++;
00132 network_order(ptr, energy_diff(f->f_e, check_energy()));
00133 ptr += WORDSIZE;
00134 ptr[0] = TIME;
00135 ptr++;
00136 network_order(ptr, time_diff(f->f_t, check_time()));
00137 ptr += WORDSIZE;
00138
00139 ptr[0] = f->f_type;
00140 if (f->bits & functor_peer_req)
00141 ptr[1] = f->f_id;
00142 else
00143 ptr[1] = f->task_id;
00144 ptr += 2;
00145
00146 network_order(ptr, one);
00147 ptr += WORDSIZE;
00148 network_order(ptr, f->result);
00149 ptr += WORDSIZE;
00150
00151 if (f->bits & functor_peer_req)
00152 bcast(DATA, msg_size, msg);
00153 else
00154 send_up(DATA, msg_size, msg);
00155 }
00156 }
00157
00158
00159 void xmit_metadata(byte_t meta, uint_t node)
00160 {
00161 if (node > MAX_NODES) {
00162 log_error("Pointing beyond valid state\n");
00163 return;
00164 }
00165 if (meta == ANCESTORS) {
00166 uint_t my_identity = _node_state[node].ident;
00167 uint_t i, size = _node_state[node].ancestors.size + 1;
00168 unsigned int msg_size = ((size + 1) * WORDSIZE) + 1;
00169 byte_t msg[msg_size], *ptr;
00170
00171 msg[0] = meta;
00172 ptr = msg + 1;
00173 network_order(ptr, size);
00174 ptr += WORDSIZE;
00175 *ptr = WORDSIZE;
00176 ptr++;
00177 network_order(ptr, my_identity);
00178 ptr += WORDSIZE;
00179
00180 for (i = 0; i < _node_state[node].ancestors.size; i++) {
00181 byte_t len = _node_state[node].ancestors.results[i].octet;
00182 *ptr = len;
00183 ptr++;
00184 memcpy(ptr, _node_state[node].ancestors.results[i].id, len);
00186 ptr += len;
00187 }
00188
00189 _node_state[node].ancestors_sent++;
00190 send_dn(METADATA, msg_size, msg);
00191 }
00192 else if (meta == SIBLINGS) {
00193 uint_t my_identity = _node_state[node].ident;
00194 unsigned int msg_size = WORDSIZE + 3 + _node_state[node].cap_size;
00195 byte_t msg[msg_size], *ptr;
00196
00197 msg[0] = meta;
00198 msg[1] = 1;
00199 ptr = msg + 2;
00200 network_order(ptr, my_identity);
00201 ptr += WORDSIZE;
00202
00203 *ptr = _node_state[node].cap_size;
00204 ptr++;
00205 memcpy(ptr, _node_state[node].capabilities, _node_state[node].cap_size);
00206
00207 _node_state[node].siblings_sent++;
00208 bcast(METADATA, msg_size, msg);
00209 }
00210 else if (meta == DESCENDANTS) {
00211 uint_t my_identity = _node_state[node].ident;
00212 unsigned int msg_size = (3 * WORDSIZE) + 3 + _node_state[node].cap_size;
00213 byte_t msg[msg_size], *ptr;
00214
00215 msg[0] = meta;
00216 msg[1] = 1;
00217 ptr = msg + 2;
00218 network_order(ptr, my_identity);
00220 ptr += WORDSIZE;
00221
00222 *ptr = _node_state[node].cap_size;
00223 ptr++;
00224 memcpy(ptr, _node_state[node].capabilities, _node_state[node].cap_size);
00225 ptr += _node_state[node].cap_size;
00226
00227 *ptr = ENERGY;
00228 ptr++;
00229 network_order(ptr, check_energy());
00230 ptr += WORDSIZE;
00231
00232 _node_state[node].descendants_sent++;
00233 send_up(METADATA, msg_size, msg);
00234 }
00235 }
00236
00237
00238 static unsigned int _compile_params(byte_t cmd, byte_t *params, functor *f)
00239 {
00240 unsigned int size = 2;
00241 log_info("compile params\n");
00242 params[0] = cmd;
00243 params[1] = f->f_id;
00244
00245 switch(cmd) {
00246 case ALPHA:
00247 log_info(" params for sense\n");
00248 params[2] = 2;
00249 params[3] = SENSOR;
00250 params[4] = f->sensor;
00251 params[5] = FREQ;
00252 network_order(params + 5, f->rate);
00253 size += 4 + WORDSIZE;
00254 break;
00255 case BETA:
00256 log_info(" params for peak sense\n");
00257 if (f->bits & functor_hi_thresh && f->bits & functor_lo_thresh)
00258 params[2] = 4;
00259 else
00260 params[2] = 3;
00261 params[3] = SENSOR;
00262 params[4] = f->sensor;
00263 params[5] = FREQ;
00264 network_order(params + 6, f->rate);
00265 size += 4 + WORDSIZE;
00266
00267 if (f->bits & functor_hi_thresh && f->bits & functor_lo_thresh) {
00268 params[size + 2] = THR_PLUS;
00269 network_order(params + size + 3, f->threshold);
00270 params[size + 3 + WORDSIZE] = THR_MINUS;
00271 network_order(params + size + 4 + WORDSIZE, f->alt_threshold);
00272 size += 4 + (2 * WORDSIZE);
00273 }
00274 else {
00275 if (f->bits & functor_hi_thresh)
00276 params[size + 2] = THR_PLUS;
00277 else
00278 params[size + 2] = THR_MINUS;
00279 network_order(params + size + 3, f->threshold);
00280 size += 3 + WORDSIZE;
00281 }
00282 break;
00283 case THETA:
00284 log_info(" params for time series\n");
00285 params[2] = 2;
00286 params[3] = TIME;
00287 network_order(params + 4, f->duration);
00288 size += 2 + WORDSIZE;
00289 size += _compile_params(f->collector->f_type, params + size, f->collector);
00290 break;
00291 case PSI:
00292 log_info(" params for spatial series\n");
00293 params[2] = 1;
00294 size++;
00295 size += _compile_params(f->collector->f_type, params + size, f->collector);
00296 break;
00297 case IOTA:
00298 case SUMMA:
00299 case DELTA:
00300 case BARX:
00301 case SIGMA:
00302 log_info(" params for aggregate\n");
00303 params[2] = 1;
00304 size++;
00305 size += _compile_params(f->collector->f_type, params + size, f->collector);
00306 break;
00307 case LAMBDA:
00308 log_error(" params for lambda aggregate\n");
00310 default:
00311 return 0;
00312 }
00313 return size;
00314 }
00315
00316
00317 void xmit_command(functor *f)
00318 {
00319 uint_t my_identity = _node_state[f->node_idx].ident;
00320 uint_t hdr_size = WORDSIZE + 2;
00321 byte_t params[MAX_CMD_SIZE];
00322 unsigned int msg_size = _compile_params(f->f_type, params, f) + hdr_size;
00323 byte_t msg[msg_size], *ptr;
00324 log_info("xmit command\n");
00325
00326 ptr = msg;
00327 network_order(ptr, my_identity);
00328 ptr += WORDSIZE;
00329 ptr[0] = 1;
00330 ptr[1] = f->priority;
00331 ptr += 2;
00332 memcpy(msg + hdr_size, params, msg_size - hdr_size);
00333
00334 bcast(CMD, msg_size, msg);
00335 }
00336
00337
00338 static bool_t equal_net_ids(byte_t oct, byte_t *stored,
00339 byte_t len, byte_t *check)
00340 {
00341 uint_t i;
00342 bool_t same = true;
00343
00344 if (oct == len) {
00345 for (i = 0; i < len; i++) {
00346 if (stored[i] != check[i]) {
00347 same = false;
00348 break;
00349 }
00350 }
00351 }
00352 return same;
00353 }
00354
00355
00356
00357 static int_t _parse_command(uint_t base, uint_t size, byte_t *msg,
00358 functor **f_ptr, functor *p, bool_t convert,
00359 uint_t node)
00360 {
00361 uint_t i, hdr_size, num_params;
00362 byte_t *ptr = msg;
00363 functor *f = NULL;
00364
00365 log_info("parse command\n");
00366
00367 if (node > MAX_NODES) {
00368 log_error("Pointing beyond valid state\n");
00369 return -1;
00370 }
00371 if (msg == NULL)
00372 return -1;
00373 if (*f_ptr == NULL) {
00374 if ((*f_ptr = allocate_functor(node)) == NULL)
00375 return -2;
00376 }
00377 f = *f_ptr;
00378
00379 f->parent = p;
00380 f->req_octet = base;
00381 memcpy(f->requestor, ptr, base);
00382 ptr += base;
00383 if (ptr[0] != 0)
00384 f->bits |= functor_peer_req;
00385 else
00386 f->bits &= ~functor_peer_req;
00387 f->priority = ptr[1];
00388 f->f_type = ptr[2];
00389 ptr += 3;
00390
00391 switch(f->f_type) {
00392 case ALPHA:
00393 f->function = &f_sense;
00394 break;
00395 case BETA:
00396 f->function = &f_peak_sense;
00397 break;
00398 case THETA:
00399 f->function = &f_time_series;
00400 break;
00401 case PSI:
00402 f->function = &f_spatial_series;
00403 break;
00404 case IOTA:
00405 f->function = &f_recite;
00406 break;
00407 case SUMMA:
00408 f->function = &f_sum;
00409 break;
00410 case DELTA:
00411 f->function = &f_delta;
00412 break;
00413 case BARX:
00414 f->function = &f_mean;
00415 break;
00416 case SIGMA:
00417 f->function = &f_sigma;
00418 break;
00419 case LAMBDA:
00421
00422 default:
00423 return -1;
00424 }
00425
00426 if (f->bits & functor_peer_req)
00427 f->f_id = ptr[0];
00428 else
00429 f->task_id = ptr[0];
00430 num_params = ptr[1];
00431 ptr += 2;
00432 f->f_t = check_time();
00433 f->f_e = check_energy();
00435
00436 hdr_size = 4;
00437 for (i = 0; i < num_params; i++) {
00438 int_t err;
00439 byte_t code = *ptr;
00440 ptr++;
00441
00442 switch(code) {
00443 case SENSOR:
00444 f->sensor = *ptr;
00445 ptr++;
00446 break;
00447 case FREQ:
00448 host_order((WORD*)&(f->rate), ptr, base, convert, false);
00449 ptr += base;
00450 break;
00451 case THR_PLUS:
00452 f->bits |= functor_hi_thresh;
00453 if (f->bits & functor_lo_thresh)
00454 f->alt_threshold = f->threshold;
00455 host_order((WORD*)&(f->threshold), ptr, base, convert, true);
00456 ptr += base;
00457 break;
00458 case THR_MINUS:
00459 f->bits |= functor_lo_thresh;
00460 if (f->bits & functor_hi_thresh)
00461 host_order((WORD*)&(f->alt_threshold), ptr, base, convert, true);
00462 else
00463 host_order((WORD*)&(f->threshold), ptr, base, convert, true);
00464 ptr += base;
00465 break;
00466 case TIME:
00467 host_order((WORD*)&(f->duration), ptr, base, convert, false);
00468 ptr += base;
00469 break;
00470 case DIST:
00471 case ANGLE:
00472 ptr += base;
00473 break;
00474 default:
00475 if ((err = _parse_command(base, ptr - msg, ptr,
00476 &f->collector, f, convert, node)) < 0) {
00477 free_functor(f);
00478 return err;
00479 }
00480 f->rate = f->collector->rate;
00481 break;
00482 }
00483 }
00484
00485 log_info(" end parse command\n");
00486 return 0;
00487 }
00488
00489
00490 static int_t _parse_data(uint_t base, uint_t size, byte_t *msg,
00491 functor *f, bool_t convert, uint_t node)
00492 {
00493 uint_t my_identity = _node_state[node].ident;
00494 byte_t cmd, idx, num_results, node_from[4], *ptr;
00495 uint_t node_to, energy_used, time_used;
00496 functor *target;
00497 #if 0
00498 uint_t i;
00499 #endif
00500
00501 if (node > MAX_NODES) {
00502 log_error("Pointing beyond valid state\n");
00503 return -1;
00504 }
00505 if (msg == NULL)
00506 return -1;
00507
00508 ptr = msg;
00509 memcpy(node_from, ptr, base);
00510 ptr += base;
00511 node_to = *((uint_t*)ptr);
00512 ptr += WORDSIZE;
00513 if (node_to != my_identity)
00514 return 0;
00515
00516 cmd = ptr[0];
00517 idx = ptr[1];
00518 ptr += 2;
00519
00520 if (*ptr == ENERGY) {
00521 ptr++;
00522 host_order((WORD*)&energy_used, ptr, base, convert, false);
00523 ptr += base;
00524 }
00525 if (*ptr == TIME) {
00526 ptr++;
00527 host_order((WORD*)&time_used, ptr, base, convert, false);
00528 ptr += base;
00529 }
00530 num_results = ptr[0];
00531 ptr++;
00532
00533 target = &(_node_state[node].functor_pool[idx]);
00534 if (!(target->bits & functor_used && target->bits & functor_running))
00535 return -1;
00536
00537 target->bits |= functor_data_ready;
00538 target->f_e += energy_used;
00539 target->f_t += time_used;
00540 if (num_results == 1) {
00541 uint_t pos = f->size++;
00542
00543 host_order((WORD*)&(f->results[pos].data), ptr, base, convert, true);
00544 f->results[pos].octet = base;
00545 memcpy(f->results[pos].id, node_from, base);
00546 }
00547 #if 0
00548 else {
00549 for (i = 0; i < num_results; i++) {
00550 host_order((WORD*)&(f->results[i].data), ptr, base, convert, true);
00551 ptr += base;
00552 }
00553 f->size = num_results;
00554 }
00555 #endif
00556
00557 f_dataready(target);
00558 return 0;
00559 }
00560
00561
00562 void rediscovery(uint_t node) {
00563 if (node > MAX_NODES) {
00564 log_error("Pointing beyond valid state\n");
00565 return;
00566 }
00567 _node_state[node].ancestors_sent = 0;
00568 _node_state[node].siblings_sent = 0;
00569 _node_state[node].descendants_sent = 0;
00570 }
00571
00572
00573 static int_t _find_family(uint_t base, byte_t *o_id, functor *f)
00574 {
00575 uint_t my_identity = _node_state[f->node_idx].ident;
00576 bool_t found = false;
00577 uint_t i;
00578
00579 if (equal_net_ids(WORDSIZE, (byte_t*)&my_identity, base, o_id))
00580 return -1;
00581
00582 for (i = 0; i < f->size; i++) {
00583 if (equal_net_ids(f->results[i].octet, f->results[i].id, base, o_id)) {
00584 found = true;
00585 break;
00586 }
00587 }
00588 if (!found) {
00589 memset(f->results[f->size].id, 0, ID_BYTE_LEN);
00590 memcpy(f->results[f->size].id, o_id, base);
00591 f->results[f->size].octet = base;
00592 f->size++;
00593 }
00594
00595 return i;
00596 }
00597
00598
00599 static int_t _parse_metadata(uint_t base, uint_t size,
00600 byte_t *msg, bool_t convert, uint_t node)
00601 {
00602 byte_t *ptr = msg + 1;
00603
00604 if (node > MAX_NODES) {
00605 log_error("Pointing beyond valid state\n");
00606 return -1;
00607 }
00608 if (msg == NULL)
00609 return -1;
00610
00611 switch(msg[0]) {
00612 case ANCESTORS:
00613 {
00614 uint_t i;
00615 host_order((WORD*)&(_node_state[node].ancestors.size),
00616 ptr, base, convert, false);
00617 ptr += base;
00618
00619 for (i = 0; i < _node_state[node].ancestors.size; i++) {
00620 byte_t len = *ptr;
00621 ptr++;
00622 _node_state[node].ancestors.results[i].octet = len;
00623 memcpy(_node_state[node].ancestors.results[i].id, ptr, len);
00624 ptr += len;
00625 }
00626
00627 if (_node_state[node].descendants_sent == 0)
00628 xmit_metadata(DESCENDANTS, node);
00629 }
00630 break;
00631
00632 case SIBLINGS:
00633 {
00634 uint_t i, num_caps;
00635 int_t idx;
00636 byte_t o_id[base];
00637
00638 ptr += 1;
00639 memcpy(o_id, ptr, base);
00640 ptr += base;
00641
00642 if ((idx = _find_family(base, o_id,
00643 &(_node_state[node].siblings))) >= 0) {
00644 num_caps = *ptr;
00645 ptr++;
00646 if (num_caps > MAX_CAPABILITIES)
00647 num_caps = MAX_CAPABILITIES;
00648
00649 for (i = 0; i < num_caps; i++)
00650 _node_state[node].sib_caps[idx][i] = ptr[i];
00651 _node_state[node].sib_cap_size = num_caps;
00652 }
00653
00654 if (_node_state[node].siblings_sent == 0)
00655 xmit_metadata(SIBLINGS, node);
00656 }
00657 break;
00658
00659 case DESCENDANTS:
00660
00661 break;
00662 }
00663 return 0;
00664 }
00665
00666
00667
00668 byte_t parse_packet(byte_t *msg, uint_t len, uint_t node)
00669 {
00670 byte_t hdr_size = HEADER_SIZE;
00671 byte_t type = msg[3], size = msg[4], base_len = (msg[2] & ~0x80);
00672 bool_t convert = false;
00673 if (node > MAX_NODES) {
00674 log_error("Pointing beyond valid state\n");
00675 return -1;
00676 }
00677
00678 if (msg[0] != PROTOCOL || msg[1] != VERSION)
00679 return -1;
00680 if (little_endian() && (msg[2] & 0x80))
00681 convert = true;
00682
00683 if (type == CMD) {
00684 functor *f = NULL;
00685 if (_parse_command(base_len, size, msg + hdr_size, &f, NULL,
00686 convert, node) < 0) {
00687 free_functor(f);
00688 f = NULL;
00689 }
00690 else
00691 f_apply(f);
00692 }
00693 else if (type == DATA) {
00694 functor *f = NULL;
00695 _parse_data(base_len, size, msg + hdr_size, f, convert, node);
00696 }
00697 else if (type == METADATA)
00698 _parse_metadata(base_len, size, msg + hdr_size, convert, node);
00699
00700 return type;
00701 }