00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #define _GNU_SOURCE
00036 #include <stdio.h>
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <errno.h>
00040 #include <string.h>
00041 #include <sys/time.h>
00042 #include <sys/types.h>
00043 #include <sys/socket.h>
00044 #include <netinet/in.h>
00045 #include <arpa/inet.h>
00046 #include <netdb.h>
00047 #include <math.h>
00048 #include <signal.h>
00049 #include <pthread.h>
00050 #include <igraph/igraph.h>
00051
00052 #define norace
00053 #include "platform.pc/external_comm.h"
00054 #include "sense_endian.h"
00055 #include "SensixMsg.h"
00056
00057
00058 #define MAX_CLIENTS 1024
00059 #define MAX_MOTES 100000
00060 #define MAX_ADC_PORTS 16
00061 #define SLEEP_MSECS 500
00062 #define TIME_ADJUST 4000.0 // to get msecs
00063
00064 #define DATA_BUFFER_SIZE 256
00065 #define MAX_MSG_SIZE 512
00066 #define ACK 65
00067 #define NACK 66
00068
00069 #define APP "NETSIM: "
00070 #define BROADCAST 0xffff
00071
00072 #define INSTRUMENTATION
00073
00074
00075 #define DEBUG_VERBOSE
00076 #define DEBUG_RADIO
00077
00078 #define DEBUG_ADC
00079
00080
00081
00083
00084
00085 enum {
00086 AM_DEBUGMSGEVENT = 1,
00087 AM_RADIOMSGSENTEVENT = 1 << 1,
00088 AM_UARTMSGSENTEVENT = 1 << 2,
00089 AM_ADCDATAREADYEVENT = 1 << 3,
00090 AM_TOSSIMINITEVENT = 1 << 4,
00091 AM_INTERRUPTEVENT = 1 << 5,
00092 AM_LEDEVENT = 1 << 6,
00093
00094 AM_TURNONMOTECOMMAND = 1 << 12,
00095 AM_TURNOFFMOTECOMMAND,
00096 AM_RADIOMSGSENDCOMMAND,
00097 AM_UARTMSGSENDCOMMAND,
00098 AM_SETLINKPROBCOMMAND,
00099 AM_SETADCPORTVALUECOMMAND,
00100 AM_INTERRUPTCOMMAND,
00101 AM_SETRATECOMMAND,
00102 AM_SETDBGCOMMAND,
00103 AM_VARIABLERESOLVECOMMAND,
00104 AM_VARIABLERESOLVERESPONSE,
00105 AM_VARIABLEREQUESTCOMMAND,
00106 AM_VARIABLEREQUESTRESPONSE,
00107 AM_GETMOTECOUNTCOMMAND,
00108 AM_GETMOTECOUNTRESPONSE,
00109 AM_SETEVENTMASKCOMMAND,
00110 AM_BEGINBATCHCOMMAND,
00111 AM_ENDBATCHCOMMAND,
00112 };
00113
00114 typedef struct GuiMsg {
00115 uint16_t msgType;
00116 uint16_t moteID;
00117 long long time;
00118 uint16_t payLoadLen;
00119 } GuiMsg_t;
00120
00121 #define GUI_MSG_HEADER_LENGTH 14
00122
00124
00125
00126 typedef struct {
00127 uint8_t port;
00128 uint8_t _pad;
00129 uint16_t data;
00130 } adc_request_t;
00131
00132 typedef struct {
00133 uint16_t addr;
00134 uint8_t type;
00135 uint8_t grp;
00136 uint8_t len;
00137 char data[MAX_MSG_SIZE];
00138 uint16_t crc;
00139 uint16_t strength;
00140 uint8_t ack;
00141 uint8_t _pad2;
00142 uint16_t time;
00143 uint8_t send_sec_mode;
00144 uint8_t recv_sec_mode;
00145 } radio_msg_t;
00146
00147 typedef struct {
00148 int evtfd;
00149 int cmdfd;
00150 int idx;
00151 unsigned char is_tos;
00152 unsigned long addr;
00153 } client_t;
00154
00155 static igraph_t network_graph;
00156 static unsigned int num_threads;
00157 static long long sim_time;
00158 static unsigned int not_done;
00159 static int num_clients;
00160 static int num_motes;
00161 static int first;
00162 static client_t clients[MAX_CLIENTS];
00163 static int expected_clients;
00164 static int adc_values[MAX_MOTES][MAX_ADC_PORTS];
00165
00166 pthread_t threads[MAX_CLIENTS + 1];
00167 pthread_mutex_t num_threads_mutex = PTHREAD_MUTEX_INITIALIZER;
00168 pthread_mutex_t sim_time_mutex = PTHREAD_MUTEX_INITIALIZER;
00169 pthread_mutex_t not_done_mutex = PTHREAD_MUTEX_INITIALIZER;
00170 pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
00171 pthread_mutex_t adc_mutex = PTHREAD_MUTEX_INITIALIZER;
00172
00173
00174
00175 static void msg_hdr_swap(GuiMsg_t *host)
00176 {
00177 _byte_swap((uint8_t*)&host->msgType, sizeof(host->msgType));
00178 _byte_swap((uint8_t*)&host->moteID, sizeof(host->moteID));
00179 _byte_swap((uint8_t*)&host->time, sizeof(host->time));
00180 _byte_swap((uint8_t*)&host->payLoadLen, sizeof(host->payLoadLen));
00181 }
00182
00183 static void adc_msg_swap(adc_request_t *req)
00184 {
00185 _byte_swap((uint8_t*)&req->data, sizeof(req->data));
00186 }
00187
00188 static void radio_msg_swap(radio_msg_t *msg)
00189 {
00190 _byte_swap((uint8_t*)&msg->addr, sizeof(msg->addr));
00191 _byte_swap((uint8_t*)&msg->crc, sizeof(msg->crc));
00192 _byte_swap((uint8_t*)&msg->strength, sizeof(msg->strength));
00193 _byte_swap((uint8_t*)&msg->time, sizeof(msg->time));
00194 }
00195
00196
00197
00198 static int recv_all(int sock, void *buf, int len)
00199 {
00200 int total = 0;
00201 while (total < len) {
00202 int bytes;
00203 if ((bytes = read(sock, buf + total, len - total)) <= 0) {
00204 if (bytes < 0)
00205 perror(APP "read");
00206 return bytes;
00207 }
00208 total += bytes;
00209 }
00210 return total;
00211 }
00212
00213 static int send_all(int sock, void *buf, int len)
00214 {
00215 int total = 0;
00216 while (total < len) {
00217 int bytes;
00218 if ((bytes = write(sock, buf + total, len - total)) <= 0) {
00219 if (bytes < 0)
00220 perror(APP "write");
00221 return bytes;
00222 }
00223 total += bytes;
00224 }
00225 return total;
00226 }
00227
00228 static int acknowledge(int sock)
00229 {
00230 unsigned char ack = 0;
00231 int bytes;
00232
00233 if ((bytes = write(sock, &ack, 1)) < 0)
00234 perror(APP "write");
00235 return bytes;
00236 }
00237
00238 #ifdef ACK
00239 static int await_ack(int sock)
00240 {
00241 unsigned char ack = 0;
00242 int bytes;
00243
00244 if ((bytes = read(sock, &ack, 1)) < 0)
00245 perror(APP "read");
00246 return bytes;
00247 }
00248 #endif
00249
00250
00251 static int are_connected(uint16_t a, uint16_t b)
00252 {
00253 int adjacent = 0;
00254 igraph_vs_t v_sel;
00255 igraph_vit_t v_iter;
00256
00257 if (igraph_vs_adj(&v_sel, a, IGRAPH_OUT) < 0) {
00258 perror(APP "igraph_vs_adj");
00259 return -1;
00260 }
00261 if (igraph_vit_create(&network_graph, v_sel, &v_iter) < 0) {
00262 perror(APP "igraph_vit_create");
00263 return -1;
00264 }
00265
00266 while (!IGRAPH_VIT_END(v_iter)) {
00267 if (b == VAN(&network_graph, "id", IGRAPH_VIT_GET(v_iter))) {
00268 adjacent = 1;
00269 break;
00270 }
00271 IGRAPH_VIT_NEXT(v_iter);
00272 }
00273
00274 igraph_vit_destroy(&v_iter);
00275 igraph_vs_destroy(&v_sel);
00276 return adjacent;
00277 }
00278
00279 static int num_direct_connections(uint16_t from)
00280 {
00281 igraph_integer_t size = 0;
00282 igraph_vs_t v_sel;
00283 if (igraph_vs_adj(&v_sel, from, IGRAPH_OUT) < 0) {
00284 perror(APP "igraph_vs_adj");
00285 return -1;
00286 }
00287 igraph_vs_size(&network_graph, &v_sel, &size);
00288 igraph_vs_destroy(&v_sel);
00289 return (int)size;
00290 }
00291
00292 static int get_direct_connections(uint16_t from, int motes[], int num)
00293 {
00294 int i = 0;
00295 igraph_vs_t v_sel;
00296 igraph_vit_t v_iter;
00297
00298 if (igraph_vs_adj(&v_sel, from, IGRAPH_OUT) < 0) {
00299 perror(APP "igraph_vs_adj");
00300 return -1;
00301 }
00302 if (igraph_vit_create(&network_graph, v_sel, &v_iter) < 0) {
00303 perror(APP "igraph_vit_create");
00304 return -1;
00305 }
00306
00307 while (!IGRAPH_VIT_END(v_iter) && i < num) {
00308 motes[i] = VAN(&network_graph, "id", IGRAPH_VIT_GET(v_iter));
00309 IGRAPH_VIT_NEXT(v_iter);
00310 }
00311
00312 igraph_vit_destroy(&v_iter);
00313 igraph_vs_destroy(&v_sel);
00314 return 0;
00315 }
00316
00317 static void send_to_networks(uint16_t from, uint16_t to,
00318 int sims[], int num, int **motes)
00319 {
00320 int i;
00321 for (i = 0; i < num; i++) {
00322 if (to != BROADCAST) {
00323 if (are_connected(to, from)) {
00324 sims[i] = 1;
00325 motes[i] = (int*) malloc(sizeof(int) * sims[i]);
00326 memset(motes[i], 0, sims[i]);
00327
00328 }
00329 }
00330 else {
00331 sims[i] = num_direct_connections(from);
00332 motes[i] = (int*) malloc(sizeof(int) * sims[i]);
00333 memset(motes[i], 0, sims[i]);
00334 get_direct_connections(from, motes[i], sims[i]);
00335 }
00336 }
00337 }
00338
00339
00340 static int forward_message(int idx, uint16_t to, uint16_t from,
00341 GuiMsg_t *msg_hdr, radio_msg_t *msg, char evt)
00342 {
00343 int msg_len;
00344 GuiMsg_t header;
00345 radio_msg_t message;
00346 int fd = clients[idx].cmdfd;
00347 if (evt)
00348 fd = clients[idx].evtfd;
00349
00350 if (to == from)
00351 return -1;
00352
00353 memcpy(&header, msg_hdr, sizeof(GuiMsg_t));
00354 memcpy(&message, msg, sizeof(radio_msg_t));
00355 msg_len = msg_hdr->payLoadLen;
00356
00357 #ifdef DEBUG_RADIO
00358 fprintf(stderr, "%s Sending radio message from mote %u to mote %u.\n",
00359 APP, from, to);
00360 #ifdef DEBUG_VERBOSE
00361 {
00362 int i;
00363 fprintf(stderr, "%s Addr: %hu, Type: %hhu, Group: %hhu, Len: %hhu\n",
00364 APP, message.addr, message.type, message.grp, message.len);
00365 fprintf(stderr, "%s Data: ", APP);
00366 for (i = 0; i < message.len; i++)
00367 fprintf(stderr, "%hhu ", message.data[i]);
00368 fprintf(stderr, "\n%s crc: %hu, str: %hu, ack: %hhu, time: %hu, "
00369 "sendSecMode: %hhu, recvSecMode: %hhu\n",
00370 APP, message.crc, message.strength, message.ack, message.time,
00371 message.send_sec_mode, message.recv_sec_mode);
00372 #ifdef DEBUG_RADIO_RAW
00373 fprintf(stderr, "Raw: ");
00374 for (i = 0; i < header.payLoadLen; i++)
00375 fprintf(stderr, " %hhu", ((unsigned char*)&message)[i]);
00376 fprintf(stderr, "\n");
00377 #endif
00378 }
00379 #endif
00380 #endif
00381
00382 msg_hdr_swap(&header);
00383 if (send_all(fd, &header, GUI_MSG_HEADER_LENGTH) < 0) {
00384 perror(APP "send");
00385 if (errno == EPIPE)
00386 return -1;
00387 }
00388 radio_msg_swap(&message);
00389 if (send_all(fd, &message, msg_len) < 0) {
00390 perror(APP "send");
00391 if (errno == EPIPE)
00392 return -1;
00393 }
00394 #ifdef ACK
00395 {
00396 int numbytes;
00397 if ((numbytes = await_ack(fd)) < 0) {
00398 if (numbytes == 0)
00399 return -1;
00400 }
00401 }
00402 #endif
00403 return 0;
00404 }
00405
00406
00407 static int load_data(FILE* data_file)
00408 {
00409 long long stime;
00410 char *line = NULL;
00411 size_t len = 0;
00412 int mote = -1, port = -1, value = -1;
00413 int parsed, ready = 0;
00414 long sleep_time = 0;
00415 long long data_time = 0;
00416
00417 pthread_mutex_lock(&clients_mutex);
00418 if (num_motes >= expected_clients)
00419 ready = 1;
00420 pthread_mutex_unlock(&clients_mutex);
00421 if (!ready) {
00422 usleep(SLEEP_MSECS * 1000);
00423 return 1;
00424 }
00425
00426 if (getdelim(&line, &len, (int)':', data_file) < 0) {
00427 if (errno == 0)
00428 return 0;
00429 perror(APP "getdelim");
00430 if (line)
00431 free(line);
00432 return -1;
00433 }
00434 if ((parsed = sscanf(line, "%Ld:", &data_time)) < 0)
00435 perror(APP "sscanf");
00436 if (parsed < 1)
00437 fprintf(stderr, "%s Failed to parse data: (%d) %s\n", APP, parsed, line);
00438 free(line);
00439 line = NULL;
00440
00441 do {
00442 if (sleep_time > 0)
00443 usleep(sleep_time);
00444
00445 pthread_mutex_lock(&sim_time_mutex);
00446 stime = (long long) floor((double)sim_time / TIME_ADJUST);
00447 pthread_mutex_unlock(&sim_time_mutex);
00448
00449 sleep_time = (data_time - stime) * (int)TIME_ADJUST;
00450 } while (sleep_time > MAX_MOTES);
00451
00452 #if defined(DEBUG_ADC) && defined(DEBUG_VERBOSE)
00453 fprintf(stderr, "%s %Ld msecs, data: %Ld msecs\n", APP, stime, data_time);
00454 #endif
00455
00456 if ((len = getline(&line, &len, data_file)) < 0) {
00457 perror(APP "getline");
00458 free(line);
00459 return 0;
00460 }
00461 if (len == 0)
00462 return 1;
00463
00464 if ((parsed = sscanf(line, "%d:%d:%d", &mote, &port, &value) < 0)) {
00465 perror(APP "sscanf");
00466 free(line);
00467 return 0;
00468 }
00469
00470
00471
00472
00473
00474
00475
00476
00477 free(line);
00478
00479 pthread_mutex_lock(&adc_mutex);
00480 adc_values[mote][port] = value;
00481 pthread_mutex_unlock(&adc_mutex);
00482
00483 return 1;
00484 }
00485
00486
00487
00488 void *service_thread(void *idx_ptr) {
00489 unsigned int keep_going = 1, not_ready = 1;
00490 int this_client = -1;
00491 int evtfd, cmdfd;
00492
00493 this_client = (int)idx_ptr;
00494 if (this_client < 0 || this_client > MAX_CLIENTS) {
00495 fprintf(stderr, "%s Bad index %d\n", APP, this_client);
00496 exit(1);
00497 }
00498
00499 while (keep_going && not_ready) {
00500 sleep(1);
00501 pthread_mutex_lock(&clients_mutex);
00502 if (clients[this_client].evtfd > 0 && clients[this_client].cmdfd > 0)
00503 not_ready = 0;
00504 pthread_mutex_unlock(&clients_mutex);
00505 pthread_mutex_lock(¬_done_mutex);
00506 keep_going = not_done;
00507 pthread_mutex_unlock(¬_done_mutex);
00508 }
00509 evtfd = clients[this_client].evtfd;
00510 cmdfd = clients[this_client].cmdfd;
00511
00512 #ifdef DEBUG_CLIENTS
00513 fprintf(stderr, "%s Client %d joined.\n", APP, this_client);
00514 #endif
00515
00516 while (keep_going) {
00517 struct timeval tv;
00518 fd_set recv_set;
00519 int i, numbytes, client_num;
00520 GuiMsg_t msg_hdr;
00521 char event_msg[MAX_MSG_SIZE];
00522
00523 memset(event_msg, 0 , MAX_MSG_SIZE);
00524 pthread_mutex_lock(&clients_mutex);
00525 client_num = num_clients;
00526 pthread_mutex_unlock(&clients_mutex);
00527 tv.tv_sec = 1;
00528 tv.tv_usec = 0;
00529 FD_ZERO(&recv_set);
00530 FD_SET(evtfd, &recv_set);
00531
00532 if (select(evtfd + 1, &recv_set, NULL, NULL, &tv) < 0) {
00533 perror("select");
00534 goto servloop;
00535 }
00536 if (!FD_ISSET(evtfd, &recv_set))
00537 goto servloop;
00538
00539 if ((numbytes = recv_all(evtfd, &msg_hdr, GUI_MSG_HEADER_LENGTH)) <= 0) {
00540 if (numbytes == 0)
00541 goto end;
00542 goto servloop;
00543 }
00544 msg_hdr_swap(&msg_hdr);
00545 if ((numbytes = recv_all(evtfd, event_msg, msg_hdr.payLoadLen)) <= 0) {
00546 if (numbytes == 0)
00547 goto end;
00548 goto servloop;
00549 }
00550
00551 switch(msg_hdr.msgType) {
00552 case (AM_TOSSIMINITEVENT):
00553 {
00554 int ready = 0;
00555 int *node = (int*)event_msg;
00556 #ifdef DEBUG_CLIENTS
00557 fprintf(stderr, "%s Starting simulation %d with %d motes.\n",
00558 APP, this_client + 1, *node);
00559 #endif
00560 pthread_mutex_lock(&clients_mutex);
00561 num_motes += *node;
00562 clients[this_client].is_tos = 1;
00563 if (first < 0)
00564 first = this_client;
00565 pthread_mutex_unlock(&clients_mutex);
00566
00567 while (!ready) {
00568 pthread_mutex_lock(&clients_mutex);
00569 if (num_motes >= expected_clients)
00570 ready = 1;
00571 pthread_mutex_unlock(&clients_mutex);
00572 sleep(1);
00573 }
00574 if ((numbytes = acknowledge(evtfd)) <= 0) {
00575 if (numbytes == 0 || errno == EPIPE)
00576 goto end;
00577 }
00578 }
00579 break;
00580
00581 case (AM_ADCDATAREADYEVENT):
00582 {
00583 adc_request_t *req = (adc_request_t*)event_msg;
00584 adc_msg_swap(req);
00585 #ifdef DEBUG_ADC
00586 fprintf(stderr, "%s ADC data request from mote %d\n",
00587 APP, msg_hdr.moteID);
00588 fprintf(stderr, "%s Port: %hhu data: %hu \n",
00589 APP, req->port, req->data);
00590 #endif
00591 pthread_mutex_lock(&sim_time_mutex);
00592 sim_time = msg_hdr.time;
00593 pthread_mutex_unlock(&sim_time_mutex);
00594
00595 pthread_mutex_lock(&adc_mutex);
00596 req->data = adc_values[msg_hdr.moteID][req->port];
00597 pthread_mutex_unlock(&adc_mutex);
00598
00599 if ((numbytes = acknowledge(evtfd)) <= 0) {
00600 if (numbytes == 0 || errno == EPIPE)
00601 goto end;
00602 }
00603
00604 msg_hdr.msgType = AM_SETADCPORTVALUECOMMAND;
00605 msg_hdr_swap(&msg_hdr);
00606 if (send_all(cmdfd, &msg_hdr, GUI_MSG_HEADER_LENGTH) < 0) {
00607 perror(APP "send");
00608 if (errno == EPIPE)
00609 goto end;
00610 }
00611 adc_msg_swap(req);
00612 if (send_all(cmdfd, req, sizeof(adc_request_t)) < 0) {
00613 perror(APP "send");
00614 if (errno == EPIPE)
00615 goto end;
00616 }
00617 #ifdef ACK
00618 if ((numbytes = await_ack(cmdfd)) < 0) {
00619 if (numbytes == 0)
00620 goto end;
00621 }
00622 #endif
00623 }
00624 break;
00625
00626 case (AM_RADIOMSGSENTEVENT):
00627 {
00628 int sims[client_num], *motes[client_num];
00629 radio_msg_t *msg = (radio_msg_t*)event_msg;
00630
00631 memset(sims, 0, client_num);
00632 radio_msg_swap(msg);
00633
00634 #ifdef DEBUG_RADIO
00635 fprintf(stderr, "%s Radio message recvd from mote %d.\n",
00636 APP, msg_hdr.moteID);
00637 #ifdef DEBUG_VERBOSE
00638 {
00639 int i;
00640 fprintf(stderr, "%s Addr: %hu, Type: %hhu, Group: %hhu, Len: %hhu\n",
00641 APP, msg->addr, msg->type, msg->grp, msg->len);
00642 fprintf(stderr, "%s Data: ", APP);
00643 for (i = 0; i < msg->len; i++)
00644 fprintf(stderr, "%hhu ", msg->data[i]);
00645 fprintf(stderr, "\n%s crc: %hu, str: %hu, ack: %hhu, time: %hu, "
00646 "sendSecMode: %hhu, recvSecMode: %hhu\n",
00647 APP, msg->crc, msg->strength, msg->ack, msg->time,
00648 msg->send_sec_mode, msg->recv_sec_mode);
00649 #ifdef DEBUG_RADIO_RAW
00650 fprintf(stderr, "Raw: ");
00651 for (i = 0; i < msg_hdr.payLoadLen; i++)
00652 fprintf(stderr, " %hhu", ((unsigned char*)msg)[i]);
00653 fprintf(stderr, "\n");
00654 #endif
00655 }
00656 #endif
00657 #endif
00658 if ((numbytes = acknowledge(evtfd)) <= 0) {
00659 if (numbytes == 0 || errno == EPIPE)
00660 goto end;
00661 }
00662
00663 #ifdef INSTRUMENTATION
00664 if (msg->addr == BROADCAST) {
00665 char path[255];
00666 FILE *file;
00667 sprintf(path, "tiny.%u.msgct", getpid());
00668 if ((file = fopen(path, "a")) != NULL) {
00669 fprintf(file, "%u bytes\n", msg_hdr.payLoadLen);
00670 fclose(file);
00671 }
00672 }
00673 #endif
00674 send_to_networks(msg_hdr.moteID, msg->addr, sims, client_num, motes);
00675 for (i = 0; i < client_num; i++) {
00676 if (i == this_client)
00677 continue;
00678
00679 if (clients[i].is_tos == 0 && clients[i].cmdfd > 0 &&
00680 msg->addr == BROADCAST) {
00681 if (forward_message(i, msg->addr, msg_hdr.moteID,
00682 &msg_hdr, msg, 1) < 0)
00683 goto end;
00684 }
00685 else if (sims[i] > 0 && clients[i].cmdfd > 0) {
00686 msg_hdr.msgType = AM_RADIOMSGSENDCOMMAND;
00687
00688
00689 if (msg->addr == BROADCAST) {
00690 int j;
00691 for (j = 0; j < sims[i]; j++) {
00692 if (forward_message(i, motes[i][j], msg_hdr.moteID,
00693 &msg_hdr, msg, 0) < 0)
00694 goto end;
00695 }
00696 free(motes[i]);
00697 }
00698 else {
00699 if (forward_message(i, msg->addr, msg_hdr.moteID,
00700 &msg_hdr, msg, 0) < 0)
00701 goto end;
00702 }
00703 }
00704 }
00705 }
00706 break;
00707
00708 case (AM_UARTMSGSENTEVENT):
00709 printf("UART msg\n");
00710 if ((numbytes = acknowledge(evtfd)) <= 0) {
00711 if (numbytes == 0 || errno == EPIPE)
00712 goto end;
00713 }
00714 break;
00715
00716 case (AM_INTERRUPTEVENT):
00717 case (AM_LEDEVENT):
00718 case (AM_DEBUGMSGEVENT):
00719 default:
00720 if ((numbytes = acknowledge(evtfd)) <= 0) {
00721 if (numbytes == 0 || errno == EPIPE)
00722 goto end;
00723 }
00724 break;
00725 }
00726
00727 servloop:
00728 pthread_mutex_lock(¬_done_mutex);
00729 keep_going = not_done;
00730 pthread_mutex_unlock(¬_done_mutex);
00731 }
00732
00733 end:
00734 #ifdef DEBUG_CLIENTS
00735 fprintf(stderr, "%s Client %d left.\n", APP, this_client);
00736 #endif
00737 pthread_mutex_lock(&clients_mutex);
00738 close(evtfd);
00739 clients[this_client].evtfd = -1;
00740 close(cmdfd);
00741 clients[this_client].cmdfd = -1;
00742 pthread_mutex_unlock(&clients_mutex);
00743 close(evtfd);
00744
00745 return NULL;
00746 }
00747
00748
00749 void *accept_thread(void *unused) {
00750 int evtfd, cmdfd, yes = 1;
00751 struct sockaddr_in my_evt_addr, my_cmd_addr;
00752 unsigned int keep_going = 1;
00753
00754 if ((evtfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00755 perror(APP "socket");
00756 exit(1);
00757 }
00758 if ((cmdfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
00759 perror(APP "socket");
00760 exit(1);
00761 }
00762
00763 if (setsockopt(evtfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0)
00764 perror(APP "setsockopt");
00765 if (setsockopt(cmdfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0)
00766 perror(APP "setsockopt");
00767
00768 memset(&my_evt_addr, 0, sizeof(struct sockaddr_in));
00769 my_evt_addr.sin_family = AF_INET;
00770 my_evt_addr.sin_port = htons(EVENT_PORT);
00771 my_evt_addr.sin_addr.s_addr = INADDR_ANY;
00772 memset(&my_cmd_addr, 0, sizeof(struct sockaddr_in));
00773 my_cmd_addr.sin_family = AF_INET;
00774 my_cmd_addr.sin_port = htons(COMMAND_PORT);
00775 my_cmd_addr.sin_addr.s_addr = INADDR_ANY;
00776
00777 if (bind(evtfd, (struct sockaddr*)&my_evt_addr,
00778 sizeof(struct sockaddr)) < 0) {
00779 perror(APP "bind");
00780 exit(1);
00781 }
00782 if (bind(cmdfd, (struct sockaddr*)&my_cmd_addr,
00783 sizeof(struct sockaddr)) < 0) {
00784 perror(APP "bind");
00785 exit(1);
00786 }
00787
00788 if (listen(evtfd, MAX_CLIENTS) < 0) {
00789 perror(APP "listen");
00790 exit(1);
00791 }
00792 if (listen(cmdfd, MAX_CLIENTS) < 0) {
00793 perror(APP "listen");
00794 exit(1);
00795 }
00796
00797 while(keep_going) {
00798 struct timeval tv;
00799 int max = 0;
00800 fd_set accept_set;
00801 struct sockaddr_in their_addr;
00802 socklen_t sin_size= sizeof(struct sockaddr_in);
00803 int idx = -1;
00804 tv.tv_sec = 1;
00805 tv.tv_usec = 0;
00806 FD_ZERO(&accept_set);
00807 FD_SET(evtfd, &accept_set);
00808 max = evtfd;
00809 FD_SET(cmdfd, &accept_set);
00810 if (cmdfd > max)
00811 max = cmdfd;
00812
00813 if (select(max + 1, &accept_set, NULL, NULL, &tv) < 0) {
00814 perror("select");
00815 goto acceptloop;
00816 }
00817 if (FD_ISSET(evtfd, &accept_set)) {
00818 int fd;
00819 if ((fd = accept(evtfd, (struct sockaddr*)&their_addr, &sin_size)) < 0) {
00820 perror(APP "accept");
00821 goto acceptloop;
00822 }
00823
00824 pthread_mutex_lock(&clients_mutex);
00825 clients[num_clients].addr = their_addr.sin_addr.s_addr;
00826 clients[num_clients].evtfd = fd;
00827 idx = num_clients;
00828 num_clients++;
00829 pthread_mutex_unlock(&clients_mutex);
00830
00831 pthread_mutex_lock(&num_threads_mutex);
00832 if (pthread_create(&threads[num_threads], NULL,
00833 service_thread, (void*)idx) < 0) {
00834 perror(APP "pthread_create");
00835 exit(1);
00836 }
00837 num_threads++;
00838 pthread_mutex_unlock(&num_threads_mutex);
00839 }
00840 else if (FD_ISSET(cmdfd, &accept_set)) {
00841 int i, fd;
00842 if ((fd = accept(cmdfd, (struct sockaddr*)&their_addr, &sin_size)) < 0) {
00843 perror(APP "accept");
00844 goto acceptloop;
00845 }
00846
00847 pthread_mutex_lock(&clients_mutex);
00848 for (i = 0; i < num_clients; i++) {
00849 if (clients[i].addr == their_addr.sin_addr.s_addr
00850 && clients[i].cmdfd == 0) {
00851 clients[i].cmdfd = fd;
00852 break;
00853 }
00854 }
00855 pthread_mutex_unlock(&clients_mutex);
00856 }
00857
00858 acceptloop:
00859 pthread_mutex_lock(¬_done_mutex);
00860 keep_going = not_done;
00861 pthread_mutex_unlock(¬_done_mutex);
00862 }
00863 return NULL;
00864 }
00865
00866
00867 static void quit(int sig, siginfo_t *si, void *unused)
00868 {
00869 pthread_mutex_lock(¬_done_mutex);
00870 not_done = 0;
00871 pthread_mutex_unlock(¬_done_mutex);
00872 }
00873
00874
00875 int main(int argc, char *argv[])
00876 {
00877 int i;
00878 char *net_file_name;
00879 char *adc_file_name;
00880 FILE *net_data_file;
00881 FILE *adc_data_file;
00882 struct sigaction sa;
00883 int loop = 1;
00884
00885 if (argc != 4) {
00886 char *app = rindex(argv[0], '/');
00887 if (app == NULL)
00888 app = argv[0];
00889 else
00890 app++;
00891 fprintf(stderr, "%s Usage: %s <num clients> <network file> <adc file>\n",
00892 APP, app);
00893 exit(1);
00894 }
00895 expected_clients = atoi(argv[1]);
00896 net_file_name = argv[2];
00897 adc_file_name = argv[3];
00898
00899 sa.sa_flags = SA_SIGINFO;
00900 sigemptyset(&sa.sa_mask);
00901 sa.sa_sigaction = quit;
00902 if (sigaction(SIGTERM, &sa, NULL) < 0)
00903 perror(APP "sigaction");
00904 if (sigaction(SIGINT, &sa, NULL) < 0)
00905 perror(APP "sigaction");
00906
00907 if ((adc_data_file = fopen(adc_file_name, "r")) == NULL) {
00908 perror(APP "fopen");
00909 exit(1);
00910 }
00911 if ((net_data_file = fopen(net_file_name, "r")) == NULL) {
00912 perror(APP "fopen");
00913 exit(1);
00914 }
00915 igraph_i_set_attribute_table(&igraph_cattribute_table);
00916 if (igraph_read_graph_gml(&network_graph, net_data_file) < 0) {
00917 perror(APP "igraph_read_graph_gml");
00918 exit(1);
00919 }
00920 fclose(net_data_file);
00921
00922 num_threads = 0;
00923 sim_time = 0;
00924 not_done = 1;
00925 num_clients = 0;
00926 num_motes = 0;
00927 first = -1;
00928 memset(clients, 0, sizeof(client_t) * MAX_CLIENTS);
00929
00930 pthread_mutex_lock(&num_threads_mutex);
00931 if (pthread_create(&threads[num_threads], NULL, accept_thread, NULL) < 0) {
00932 perror(APP "pthread_create");
00933 exit(1);
00934 }
00935 num_threads++;
00936 pthread_mutex_unlock(&num_threads_mutex);
00937
00938 do {
00939 pthread_mutex_lock(¬_done_mutex);
00940 loop = not_done;
00941 pthread_mutex_unlock(¬_done_mutex);
00942 } while (loop && load_data(adc_data_file));
00943
00944 do {
00945 sleep(1);
00946 pthread_mutex_lock(¬_done_mutex);
00947 loop = not_done;
00948 pthread_mutex_unlock(¬_done_mutex);
00949 } while (loop);
00950
00951 for (i = 0; i < num_threads; i++)
00952 pthread_join(threads[i], NULL);
00953
00954 fclose(adc_data_file);
00955 igraph_destroy(&network_graph);
00956 return 0;
00957 }