00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <stdint.h>
00036 #include <stdlib.h>
00037 #include <unistd.h>
00038 #include <string.h>
00039 #include <errno.h>
00040 #include <pthread.h>
00041 #include <sys/types.h>
00042 #include <sys/time.h>
00043 #include <sys/socket.h>
00044 #include <netinet/in.h>
00045 #include <netdb.h>
00046 #include <arpa/inet.h>
00047
00048 #include "sensix.h"
00049 #include "sense_impl.h"
00050 #include "stacktrace.h"
00051 #include "sense_endian.h"
00052 #include "discovery_service.h"
00053
00054
00055 #define max(x, y) (((x) < (y)) ? (y) : (x))
00056
00057
00058 #define MEMORY_LIMIT 8096
00059 #define MAX_RPT_ITVL 10
00060
00061 #define MAX_PARENTS 128
00062 #define MAX_CHILDREN 512
00063 #define MAX_SIBLINGS 128
00064 #define MAX_CAPABLE 32
00065 #define MAX_THREADS (MAX_SIBLINGS + 1)
00066
00067
00068 pthread_mutex_t ancestor_mutex = PTHREAD_MUTEX_INITIALIZER;
00069 bool_t ancestors_running = false;
00070
00071 static pthread_mutex_t references_mutex = PTHREAD_MUTEX_INITIALIZER;
00072 static pthread_mutex_t database_mutex = PTHREAD_MUTEX_INITIALIZER;
00073 static pthread_mutex_t peers_mutex = PTHREAD_MUTEX_INITIALIZER;
00074
00075 static object_reference_t references[MAX_CAPABLE];
00076 static unsigned int refs_size = 0;
00077 static object_reference_t sib_database[MAX_SIBLINGS];
00078 static unsigned int sib_db_size = 0;
00079 static object_reference_t desc_database[MAX_CHILDREN];
00080 static unsigned int desc_db_size = 0;
00081
00082 static int ancestors_seen[MAX_PARENTS];
00083 static int ancestors_size = 0;
00084 static int siblings_seen[MAX_SIBLINGS];
00085 static int siblings_size = 0;
00086 static int descendants_seen[MAX_CHILDREN];
00087 static int descendants_size = 0;
00088
00089 static pthread_mutex_t socket_mutex = PTHREAD_MUTEX_INITIALIZER;
00090 #define MAX_SOCKETS 2
00091 static struct sockaddr_storage saddrs[MAX_SOCKETS];
00092 static int sockets[MAX_SOCKETS];
00093 static int num_sockets = 0;
00094
00095 static uint32_t node_id;
00096 static uint8_t h_level;
00097 static uint32_t service_port;
00098
00099
00102
00103
00104 void announceHeader(unsigned char *hdr) {
00105 int len = ANNOUNCE_SIZE;
00106 hdr[0] = 'D';
00107 hdr[1] = 0x10;
00108 hdr[2] = little_endian();
00109 hdr[3] = 0x01;
00110 network_order(hdr + 4, len);
00111 }
00112
00113 void reportHeader(unsigned char *hdr) {
00114 int len = REPORT_SIZE;
00115 hdr[0] = 'D';
00116 hdr[1] = 0x10;
00117 hdr[2] = little_endian();
00118 hdr[3] = 0x02;
00119 network_order(hdr + 4, len);
00120 }
00121
00122 void requireHeader(unsigned char *hdr) {
00123 int len = REQUIRE_SIZE;
00124 hdr[0] = 'D';
00125 hdr[1] = 0x10;
00126 hdr[2] = little_endian();
00127 hdr[3] = 0x03;
00128 network_order(hdr + 4, len);
00129 }
00130
00131 void shareHeader(unsigned char *hdr, Share *shr) {
00132 uint len = REPORT_SIZE + strlen((char*)shr->ior);
00133 hdr[0] = 'D';
00134 hdr[1] = 0x10;
00135 hdr[2] = little_endian();
00136 hdr[3] = 0x04;
00137 network_order(hdr + 4, len);
00138 }
00139
00140
00141
00142 void loadAnnounce(unsigned char *msg, Announce *ann) {
00143 network_order(msg, ann->nodeIdent);
00144 msg[4] = ann->hLevel;
00145 }
00146
00147 void loadReport(unsigned char *msg, Report *rpt) {
00148 network_order(msg, rpt->nodeIdent);
00149 msg[4] = rpt->hLevel;
00150 msg[5] = rpt->cType;
00151 }
00152
00153 void loadRequire(unsigned char *msg, Require *req) {
00154 network_order(msg, req->nodeIdent);
00155 msg[4] = req->hLevel;
00156 msg[5] = req->cType;
00157 network_order(msg + 6, req->targetIdent);
00158 }
00159
00160 void loadShare(unsigned char *msg, Share *shr) {
00161 network_order(msg, shr->nodeIdent);
00162 msg[4] = shr->hLevel;
00163 msg[5] = shr->cType;
00164 strcpy((char*)(msg + 6), (char*)shr->ior);
00165 }
00166
00167
00168
00169 static void send_discover_packet(unsigned char *hdr,
00170 unsigned char *body, size_t len) {
00171 int i;
00172 pthread_mutex_lock(&socket_mutex);
00173 for (i = 0; i < num_sockets; i++) {
00174 if (sendto(sockets[i], hdr, DISCOVERY_HEADER_SIZE, 0,
00175 (struct sockaddr*)&saddrs[i], sizeof(struct sockaddr)) < 0) {
00176 log_error("%s: discovery header sendto %s\n", APP, strerror(errno));
00177 continue;
00178 }
00179 if (sendto(sockets[i], body, len, 0,
00180 (struct sockaddr*)&saddrs[i], sizeof(struct sockaddr)) < 0)
00181 log_error("%s: discovery message sendto %s\n", APP, strerror(errno));
00182 }
00183 pthread_mutex_unlock(&socket_mutex);
00184 }
00185
00186
00187 static void sendAnnounce(Announce *ann) {
00188 unsigned char hdr[DISCOVERY_HEADER_SIZE];
00189 size_t len = ANNOUNCE_SIZE;
00190 unsigned char msg[len];
00191
00192 announceHeader(hdr);
00193 loadAnnounce(msg, ann);
00194 send_discover_packet(hdr, msg, len);
00195 }
00196
00197 static void sendReport(Report *rpt) {
00198 unsigned char hdr[DISCOVERY_HEADER_SIZE];
00199 size_t len = REPORT_SIZE;
00200 unsigned char msg[len];
00201
00202 reportHeader(hdr);
00203 loadReport(msg, rpt);
00204 send_discover_packet(hdr, msg, len);
00205 }
00206
00207 static void sendRequire(Require *req) {
00208 unsigned char hdr[DISCOVERY_HEADER_SIZE];
00209 size_t len = REQUIRE_SIZE;
00210 unsigned char msg[len];
00211
00212 requireHeader(hdr);
00213 loadRequire(msg, req);
00214 send_discover_packet(hdr, msg, len);
00215 }
00216
00217 static void sendShare(Share *shr) {
00218 unsigned char hdr[DISCOVERY_HEADER_SIZE];
00219 size_t len = REPORT_SIZE + strlen((char*)shr->ior) + 1;
00220 unsigned char msg[len];
00221
00222 shareHeader(hdr, shr);
00223 loadShare(msg, shr);
00224 send_discover_packet(hdr, msg, len);
00225 }
00226
00227
00228
00229 int parseDiscoverHeader(DiscoverHeader *hdr, unsigned char *bytes) {
00230 if (bytes[0] != 'D')
00231 return -1;
00232 if (bytes[1] != 0x10)
00233 return -1;
00234
00235 if (bytes[2] == 0x01)
00236 hdr->little_endian = true;
00237 else
00238 hdr->little_endian = false;
00239
00240 hdr->announceType = bytes[3];
00241 if (hdr->little_endian != little_endian())
00242 host_order(&(hdr->announceSize), bytes + 4, 4, 1, 1);
00243 else
00244 host_order(&(hdr->announceSize), bytes + 4, 4, 0, 1);
00245
00246 if (hdr->announceSize > MEMORY_LIMIT)
00247 return -1;
00248 return 0;
00249 }
00250
00251 int parseDiscoverMessage(DiscoverMessage *dm, DiscoverHeader *hdr,
00252 unsigned char *bytes) {
00253 memset(dm, 0, sizeof(DiscoverMessage));
00254
00255 switch (hdr->announceType) {
00256 case 0x01:
00257 if (hdr->announceSize < 5)
00258 return -1;
00259
00260 if (hdr->little_endian != little_endian())
00261 host_order(&(dm->nodeIdent), bytes, 4, 1, 1);
00262 else
00263 host_order(&(dm->nodeIdent), bytes, 4, 0, 1);
00264
00265 dm->hLevel = bytes[4];
00266 break;
00267
00268 case 0x02:
00269 if (hdr->announceSize < 6)
00270 return -1;
00271
00272 if (hdr->little_endian != little_endian())
00273 host_order(&(dm->nodeIdent), bytes, 4, 1, 1);
00274 else
00275 host_order(&(dm->nodeIdent), bytes, 4, 0, 1);
00276
00277 dm->hLevel = bytes[4];
00278 dm->cType = bytes[5];
00279 break;
00280
00281 case 0x03:
00282 if (hdr->announceSize < 10)
00283 return -1;
00284
00285 if (hdr->little_endian != little_endian())
00286 host_order(&(dm->nodeIdent), bytes, 4, 1, 1);
00287 else
00288 host_order(&(dm->nodeIdent), bytes, 4, 0, 1);
00289
00290 dm->hLevel = bytes[4];
00291 dm->cType = bytes[5];
00292
00293 if (hdr->little_endian != little_endian())
00294 host_order(&(dm->targetIdent), bytes + 6, 4, 1, 1);
00295 else
00296 host_order(&(dm->targetIdent), bytes + 6, 4, 0, 1);
00297 break;
00298
00299 case 0x04:
00300 if (hdr->announceSize < 7)
00301 return -1;
00302
00303 if (hdr->little_endian != little_endian())
00304 host_order(&(dm->nodeIdent), bytes, 4, 1, 1);
00305 else
00306 host_order(&(dm->nodeIdent), bytes, 4, 0, 1);
00307
00308 dm->hLevel = bytes[4];
00309 dm->cType = bytes[5];
00310
00311 strncpy((char*)(dm->ior), (char*)(bytes + 6), max((hdr->announceSize - 6),
00312 OBJ_REF_SIZE));
00313 break;
00314 }
00315 return 0;
00316 }
00317
00318
00320
00321
00322 static int timed_recvfrom(int s, void *buf, size_t len, struct sockaddr *from,
00323 socklen_t *fromlen, long timeout) {
00324 fd_set fds;
00325 int n;
00326 struct timeval tv;
00327
00328 FD_ZERO(&fds);
00329 FD_SET(s, &fds);
00330
00331 tv.tv_sec = timeout / 1000;
00332 tv.tv_usec = (timeout % 1000) * 1000;
00333
00334 n = select(s+1, &fds, NULL, NULL, &tv);
00335 if (n == 0)
00336 return -2;
00337 if (n == -1)
00338 return -1;
00339
00340 return recvfrom(s, buf, len, 0, from, fromlen);
00341 }
00342
00343
00344 struct listen_arguments {
00345 int sfd;
00346 struct sockaddr_storage addr;
00347 };
00348
00349
00350 static void *listener_thread(void *args) {
00351 struct listen_arguments *la;
00352
00353 if (args == NULL)
00354 return NULL;
00355 la = (struct listen_arguments*)args;
00356
00357 while(true) {
00358 DiscoverMessage in;
00359 DiscoverHeader hdr;
00360 struct sockaddr_storage remoteaddr;
00361 socklen_t sin_size = sizeof(remoteaddr);
00362 int err;
00363 unsigned char raw_hdr[DISCOVERY_HEADER_SIZE];
00364 unsigned char *msg = NULL;
00365 bool_t go = false;
00366
00367 pthread_mutex_lock(&ancestor_mutex);
00368 go = ancestors_running;
00369 pthread_mutex_unlock(&ancestor_mutex);
00370 if (go == false)
00371 break;
00372
00373 memset(&in, 0, sizeof(in));
00374 if ((err = timed_recvfrom(la->sfd, raw_hdr, DISCOVERY_HEADER_SIZE,
00375 (struct sockaddr*)&remoteaddr, &sin_size,
00376 EMPTY_TIMEOUT)) < 0) {
00377 if (err != -2)
00378 log_error("%s: recvfrom %s\n", APP, strerror(errno));
00379 continue;
00380 }
00381 if (parseDiscoverHeader(&hdr, raw_hdr) < 0) {
00382 continue;
00383 }
00384
00385 msg = malloc(hdr.announceSize);
00386 if ((err = timed_recvfrom(la->sfd, msg, hdr.announceSize,
00387 (struct sockaddr*)&remoteaddr, &sin_size,
00388 TX_TIMEOUT)) < 0) {
00389 if (err == -2)
00390 log_error("%s: recvfrom timeout on DiscoveryMessage\n", APP);
00391 else
00392 log_error("%s: recvfrom %s\n", APP, strerror(errno));
00393 free(msg);
00394 continue;
00395 }
00396
00397 parseDiscoverMessage(&in, &hdr, msg);
00398 free(msg);
00399
00400 if (in.nodeIdent == node_id)
00401 continue;
00402 switch (hdr.announceType) {
00403 case 1:
00404 if (in.hLevel >= h_level) {
00405 int i;
00406 if (in.hLevel == h_level) {
00407 bool_t found = false;
00408 for (i = 0; i < siblings_size; i++) {
00409 if (siblings_seen[i] == in.nodeIdent) {
00410 found = true;
00411 break;
00412 }
00413 }
00414 if (found == false && siblings_size < MAX_SIBLINGS) {
00415 siblings_seen[siblings_size] = in.nodeIdent;
00416 siblings_size++;
00417 log_info("%s: Added sibling\n", APP);
00418 }
00419 }
00420 else {
00421 bool_t found = false;
00422 for (i = 0; i < ancestors_size; i++) {
00423 if (ancestors_seen[i] == in.nodeIdent) {
00424 found = true;
00425 break;
00426 }
00427 }
00428 if (found == false && ancestors_size < MAX_PARENTS) {
00429 ancestors_seen[ancestors_size] = in.nodeIdent;
00430 ancestors_size++;
00431 log_info("%s: Added ancestor\n", APP);
00432 }
00433 }
00434
00435
00436 for (i = 0; i < refs_size; i++) {
00437 Report out;
00438 out.nodeIdent = node_id;
00439 out.hLevel = h_level;
00440 out.cType = references[i].type;
00441 sendReport(&out);
00442 }
00443 } else if (in.hLevel == (h_level - 1)) {
00444 bool_t found = false;
00445 int i;
00446 for (i = 0; i < descendants_size; i++) {
00447 if (descendants_seen[i] == in.nodeIdent) {
00448 found = true;
00449 break;
00450 }
00451 }
00452 if (found == false && descendants_size < MAX_CHILDREN) {
00453 descendants_seen[descendants_size] = in.nodeIdent;
00454 descendants_size++;
00455 log_info("%s: Added descendant\n", APP);
00456 }
00457
00458
00459 Announce out;
00460 out.nodeIdent = node_id;
00461 out.hLevel = h_level;
00462 sendAnnounce(&out);
00463 }
00464 break;
00465 case 2:
00466 {
00467 if (in.hLevel == (h_level - 1)) {
00468 bool_t found = false;
00469 int i;
00470 for (i = 0; i < desc_db_size; i++) {
00471 if (desc_database[i].id == in.nodeIdent) {
00472 found = true;
00473 break;
00474 }
00475 }
00476 if (found == false) {
00477 int idx = desc_db_size;
00478 char cap[MAX_CAPABILITY_STRLEN];
00479 capabilityToString(cap, in.cType);
00480
00481
00482 desc_database[idx].id = in.nodeIdent;
00483 desc_database[idx].type = in.cType;
00484 memset(desc_database[idx].ior, 0, OBJ_REF_SIZE);
00485 desc_database[idx].valid = false;
00486 desc_db_size++;
00487 log_info("%s: Added descendant capability %s\n", APP, cap);
00488 }
00489 }
00490 else if (in.hLevel == h_level) {
00491 bool_t found = false;
00492 int i;
00493 for (i = 0; i < sib_db_size; i++) {
00494 if (sib_database[i].id == in.nodeIdent) {
00495 found = true;
00496 break;
00497 }
00498 }
00499 if (found == false) {
00500 int idx = sib_db_size;
00501 char cap[MAX_CAPABILITY_STRLEN];
00502 capabilityToString(cap, in.cType);
00503
00504
00505 sib_database[idx].id = in.nodeIdent;
00506 sib_database[idx].type = in.cType;
00507 memset(sib_database[idx].ior, 0, OBJ_REF_SIZE);
00508 sib_database[idx].valid = false;
00509 sib_db_size++;
00510 log_info("%s: Added sibling capability %s\n", APP, cap);
00511 }
00512 }
00513
00514 Require out;
00515 out.nodeIdent = node_id;
00516 out.hLevel = h_level;
00517 out.cType = in.cType;
00518 out.targetIdent = in.nodeIdent;
00519 sendRequire(&out);
00520 }
00521 break;
00522 case 3:
00523 if (in.targetIdent == node_id) {
00524 int i;
00525 for (i = 0; i < refs_size; i++) {
00526 if (references[i].type == in.cType) {
00527 Share out;
00528 out.nodeIdent = node_id;
00529 out.hLevel = h_level;
00530 out.cType = in.cType;
00531 strcpy((char*)out.ior, references[i].ior);
00532 sendShare(&out);
00533 }
00534 }
00535 }
00536 break;
00537 case 4:
00538 if (in.hLevel == (h_level - 1)) {
00539 bool_t found = false;
00540 int i;
00541 for (i = 0; i < desc_db_size; i++) {
00542 if (desc_database[i].id == in.nodeIdent) {
00543 found = true;
00544 if (desc_database[i].type == in.cType &&
00545 desc_database[i].ior[0] == 0) {
00546 strcpy(desc_database[i].ior, (char*)in.ior);
00547 desc_database[i].valid = true;
00548 }
00549 break;
00550 }
00551 }
00552 if (found == false) {
00553 int idx = desc_db_size;
00554 char cap[MAX_CAPABILITY_STRLEN];
00555 capabilityToString(cap, in.cType);
00556
00557 desc_database[idx].id = in.nodeIdent;
00558 desc_database[idx].type = in.cType;
00559 strcpy(desc_database[idx].ior, (char*)in.ior);
00560 desc_database[idx].valid = true;
00561 desc_db_size++;
00562 log_info("%s: Added descendant IOR for %s\n", APP, cap);
00563 }
00564 }
00565 else if (in.hLevel == h_level) {
00566 bool_t found = false;
00567 int i;
00568 for (i = 0; i < sib_db_size; i++) {
00569 if (sib_database[i].id == in.nodeIdent) {
00570 found = true;
00571 if (sib_database[i].type == in.cType &&
00572 sib_database[i].ior[0] == 0) {
00573 strcpy(sib_database[i].ior, (char*)in.ior);
00574 sib_database[i].valid = true;
00575 }
00576 break;
00577 }
00578 }
00579 if (found == false) {
00580 int idx = sib_db_size;
00581 char cap[MAX_CAPABILITY_STRLEN];
00582 capabilityToString(cap, in.cType);
00583
00584 sib_database[idx].id = in.nodeIdent;
00585 sib_database[idx].type = in.cType;
00586 strcpy(sib_database[idx].ior, (char*)in.ior);
00587 sib_database[idx].valid = true;
00588 sib_db_size++;
00589 log_info("%s: Added sibling IOR for %s\n", APP, cap);
00590 }
00591 }
00592 break;
00593 default:
00594 break;
00595 }
00596 }
00597 close(la->sfd);
00598
00599 return NULL;
00600 }
00601
00602
00605
00606
00607 uint8_t self_level() {
00608 return h_level;
00609 }
00610
00611
00612 uint32_t self_identifier() {
00613 return node_id;
00614 }
00615
00616
00617 int self_capabilities(uint8_t *caps, size_t *size) {
00618 int i;
00619 pthread_mutex_lock(&references_mutex);
00620 if (*size < refs_size) {
00621 *size = refs_size;
00622 pthread_mutex_unlock(&references_mutex);
00623 return TOO_SMALL;
00624 }
00625 if (*size != refs_size)
00626 *size = refs_size;
00627
00628 for (i = 0; i < refs_size; i++)
00629 caps[i] = references[i].type;
00630 pthread_mutex_unlock(&references_mutex);
00631
00632 return 0;
00633 }
00634
00635
00636 int self_register_object(uint8_t capability, object_reference_t *objref,
00637 bool_t override)
00638 {
00639 bool_t found = false;
00640 int i;
00641
00642 if (refs_size > MAX_CAPABLE)
00643 return REGISTRY_FULL;
00644
00645 pthread_mutex_lock(&references_mutex);
00646 for (i = 0; i < refs_size; i++) {
00647 if (references[i].type == capability) {
00648 found = true;
00649 if (override == true)
00650 strcpy(references[i].ior, objref->ior);
00651 else {
00652 pthread_mutex_unlock(&references_mutex);
00653 return ALREADY_REGISTERED;
00654 }
00655 }
00656 }
00657 if (found == false) {
00658 strcpy(references[refs_size].ior, objref->ior);
00659 references[refs_size].type = capability;
00660 references[refs_size].id = node_id;
00661 refs_size++;
00662 }
00663 pthread_mutex_unlock(&references_mutex);
00664
00665 return 0;
00666 }
00667
00668
00670
00671
00672 bool_t descendants_findNode(uint32_t id)
00673 {
00674 int i;
00675
00676 pthread_mutex_lock(&peers_mutex);
00677 for (i = 0; i < descendants_size; i++) {
00678 if (descendants_seen[i] == id)
00679 return true;
00680 }
00681 pthread_mutex_unlock(&peers_mutex);
00682
00683 return false;
00684 }
00685
00686
00687 int descendants_queryNetwork(object_reference_t *objrefs, size_t *num,
00688 uint8_t capability)
00689 {
00690 int i, j, count = 0;
00691
00692 pthread_mutex_lock(&database_mutex);
00693 for (i = 0; i < desc_db_size; i++) {
00694 if (desc_database[i].type == capability && desc_database[i].valid)
00695 count++;
00696 }
00697 pthread_mutex_unlock(&database_mutex);
00698 if (count == 0) {
00699 *num = 0;
00700 return NO_CAPABILITY;
00701 }
00702
00703 if (*num < count) {
00704 *num = count;
00705 return TOO_SMALL;
00706 }
00707 *num = count;
00708
00709 pthread_mutex_lock(&database_mutex);
00710 for (i = 0, j = 0; i < desc_db_size; i++) {
00711 if (desc_database[i].type == capability && desc_database[i].valid) {
00712 memcpy(&objrefs[j], &desc_database[i], sizeof(object_reference_t));
00713 j++;
00714 break;
00715 }
00716 }
00717 pthread_mutex_unlock(&database_mutex);
00718
00719 return 0;
00720 }
00721
00722
00723 int descendants_queryNode(object_reference_t *objref,
00724 uint32_t id, uint8_t capability)
00725 {
00726 int i;
00727 bool_t found = false;
00728 bool_t id_found = false;
00729 bool_t cap_found = false;
00730
00731 pthread_mutex_lock(&database_mutex);
00732 for (i = 0; i < desc_db_size; i++) {
00733 if (desc_database[i].type == capability)
00734 cap_found = true;
00735 if (desc_database[i].id == id)
00736 id_found = true;
00737
00738 if (id_found && cap_found && desc_database[i].valid) {
00739 found = true;
00740 memcpy(objref, &desc_database[i], sizeof(object_reference_t));
00741 break;
00742 }
00743 }
00744 pthread_mutex_unlock(&database_mutex);
00745 if (cap_found == false)
00746 return NO_CAPABILITY;
00747 if (id_found == false)
00748 return NO_NODE;
00749 if (found == false)
00750 return NO_CAPABILITY;
00751
00752 return 0;
00753 }
00754
00755
00756 int descendants_capabilities(uint8_t *caps, size_t *num)
00757 {
00758 int i;
00759
00760 pthread_mutex_lock(&database_mutex);
00761 if (*num < desc_db_size) {
00762 *num = desc_db_size;
00763 pthread_mutex_unlock(&database_mutex);
00764 return TOO_SMALL;
00765 }
00766 *num = desc_db_size;
00767
00768 for (i = 0; i < desc_db_size; i++)
00769 caps[i] = desc_database[i].type;
00770 pthread_mutex_unlock(&database_mutex);
00771
00772 return 0;
00773 }
00774
00775
00776 int descendants_nodes(uint32_t *ids, size_t *num)
00777 {
00778 int i;
00779
00780 pthread_mutex_lock(&peers_mutex);
00781 if (*num < descendants_size) {
00782 *num = descendants_size;
00783 pthread_mutex_unlock(&peers_mutex);
00784 return TOO_SMALL;
00785 }
00786 *num = descendants_size;
00787
00788 for (i = 0; i < descendants_size; i++)
00789 ids[i] = descendants_seen[i];
00790 pthread_mutex_unlock(&peers_mutex);
00791
00792 return 0;
00793 }
00794
00795
00797
00798
00799 bool_t siblings_findNode(uint32_t id)
00800 {
00801 int i;
00802
00803 pthread_mutex_lock(&peers_mutex);
00804 for (i = 0; i < siblings_size; i++) {
00805 if (siblings_seen[i] == id)
00806 return true;
00807 }
00808 pthread_mutex_unlock(&peers_mutex);
00809
00810 return false;
00811 }
00812
00813
00814 int siblings_queryNetwork(object_reference_t *objrefs, size_t *num,
00815 uint8_t capability)
00816 {
00817 int i, j, count = 0;
00818
00819 pthread_mutex_lock(&database_mutex);
00820 for (i = 0; i < sib_db_size; i++) {
00821 if (sib_database[i].type == capability && sib_database[i].valid)
00822 count++;
00823 }
00824 pthread_mutex_unlock(&database_mutex);
00825 if (count == 0) {
00826 *num = 0;
00827 return NO_CAPABILITY;
00828 }
00829
00830 if (*num < count) {
00831 *num = count;
00832 return TOO_SMALL;
00833 }
00834 *num = count;
00835
00836 pthread_mutex_lock(&database_mutex);
00837 for (i = 0, j = 0; i < sib_db_size; i++) {
00838 if (sib_database[i].type == capability && sib_database[i].valid) {
00839 memcpy(&objrefs[j], &sib_database[i], sizeof(object_reference_t));
00840 j++;
00841 break;
00842 }
00843 }
00844 pthread_mutex_unlock(&database_mutex);
00845
00846 return 0;
00847 }
00848
00849
00850 int siblings_queryNode(object_reference_t *objref,
00851 uint32_t id, uint8_t capability)
00852 {
00853 int i;
00854 bool_t found = false;
00855 bool_t id_found = false;
00856 bool_t cap_found = false;
00857
00858 pthread_mutex_lock(&database_mutex);
00859 for (i = 0; i < sib_db_size; i++) {
00860 if (sib_database[i].type == capability)
00861 cap_found = true;
00862 if (sib_database[i].id == id)
00863 id_found = true;
00864
00865 if (id_found && cap_found && sib_database[i].valid) {
00866 found = true;
00867 memcpy(objref, &sib_database[i], sizeof(object_reference_t));
00868 break;
00869 }
00870 }
00871 pthread_mutex_unlock(&database_mutex);
00872 if (cap_found == false)
00873 return NO_CAPABILITY;
00874 if (id_found == false)
00875 return NO_NODE;
00876 if (found == false)
00877 return NO_CAPABILITY;
00878
00879 return 0;
00880 }
00881
00882
00883 int siblings_capabilities(uint8_t *caps, size_t *num)
00884 {
00885 int i;
00886
00887 pthread_mutex_lock(&database_mutex);
00888 if (*num < sib_db_size) {
00889 *num = sib_db_size;
00890 pthread_mutex_unlock(&database_mutex);
00891 return TOO_SMALL;
00892 }
00893 *num = sib_db_size;
00894
00895 for (i = 0; i < sib_db_size; i++)
00896 caps[i] = sib_database[i].type;
00897 pthread_mutex_unlock(&database_mutex);
00898
00899 return 0;
00900 }
00901
00902
00903 int siblings_nodes(uint32_t *ids, size_t *num)
00904 {
00905 int i;
00906
00907 pthread_mutex_lock(&peers_mutex);
00908 if (*num < siblings_size) {
00909 *num = siblings_size;
00910 pthread_mutex_unlock(&peers_mutex);
00911 return TOO_SMALL;
00912 }
00913 *num = siblings_size;
00914
00915 for (i = 0; i < siblings_size; i++)
00916 ids[i] = siblings_seen[i];
00917 pthread_mutex_unlock(&peers_mutex);
00918
00919 return 0;
00920 }
00921
00922
00924
00925
00926 bool_t ancestors_findNode(uint32_t id)
00927 {
00928 int i;
00929
00930 pthread_mutex_lock(&peers_mutex);
00931 for (i = 0; i < ancestors_size; i++) {
00932 if (ancestors_seen[i] == id)
00933 return true;
00934 }
00935 pthread_mutex_unlock(&peers_mutex);
00936
00937 return false;
00938 }
00939
00940
00941 int ancestors_nodes(uint32_t *ids, size_t *num)
00942 {
00943 int i;
00944
00945 pthread_mutex_lock(&peers_mutex);
00946 if (*num < ancestors_size) {
00947 *num = ancestors_size;
00948 pthread_mutex_unlock(&peers_mutex);
00949 return TOO_SMALL;
00950 }
00951 if (*num != ancestors_size)
00952 *num = ancestors_size;
00953
00954 for (i = 0; i < ancestors_size; i++)
00955 ids[i] = ancestors_seen[i];
00956 pthread_mutex_unlock(&peers_mutex);
00957
00958 return 0;
00959 }
00960
00961
00963
00964
00965 bool_t family_findNode(uint32_t id)
00966 {
00967 return ancestors_findNode(id) || siblings_findNode(id) ||
00968 descendants_findNode(id);
00969 }
00970
00971
00972 int family_queryNetwork(object_reference_t *objrefs, size_t *num,
00973 uint8_t capability)
00974 {
00975 int err_d = 0, err_s = 0;
00976 size_t second, first = *num;
00977
00978 err_d = descendants_queryNetwork(objrefs, &first, capability);
00979
00980 second = *num - first;
00981 err_s = siblings_queryNetwork(objrefs + first, &second, capability);
00982
00983 *num = first + second;
00984 if (err_d == TOO_SMALL || err_s == TOO_SMALL)
00985 return TOO_SMALL;
00986
00987 if (err_d == NO_CAPABILITY && err_s == NO_CAPABILITY)
00988 return NO_CAPABILITY;
00989
00990 return 0;
00991 }
00992
00993
00994 int family_queryNode(object_reference_t *objref,
00995 uint32_t id, uint8_t capability)
00996 {
00997 int err = 0;
00998
00999 memset(objref, 0, sizeof(objref));
01000 if ((err = descendants_queryNode(objref, id, capability)) < 0)
01001 return siblings_queryNode(objref, id, capability);
01002
01003 return err;
01004 }
01005
01006
01009
01010
01011 static int join_mcast_group(int sockfd, struct sockaddr_storage *ss_addr,
01012 int service_port);
01013 static void *get_in_addr(struct sockaddr *sa);
01014
01015
01016 void *discovery_service(void *args) {
01017 discovery_args_t *da;
01018 int i, sfd, err, yes = 1;
01019 struct addrinfo hints, *info;
01020 char addr_str[INET6_ADDRSTRLEN];
01021 pthread_t listening_threads[MAX_SOCKETS];
01022 Announce ann;
01023 char *mcast_addr;
01024 char serv_port_str[32];
01025
01026 if (args == NULL)
01027 return NULL;
01028 da = (discovery_args_t*)args;
01029
01030 node_id = da->node_id;
01031 h_level = da->h_level;
01032 service_port = da->service_port;
01033 if (service_port <= 0)
01034 service_port = DISCOVERY_PORT;
01035 memset(serv_port_str, 0, 32);
01036 sprintf(serv_port_str, "%u", service_port);
01037
01038 memset(&hints, 0, sizeof(hints));
01039 hints.ai_family = AF_UNSPEC;
01040 hints.ai_socktype = SOCK_DGRAM;
01041 hints.ai_flags = AI_PASSIVE;
01042
01043 pthread_mutex_lock(&ancestor_mutex);
01044 ancestors_running = true;
01045 pthread_mutex_unlock(&ancestor_mutex);
01046
01047 for (i = 0; i < MAX_SOCKETS; i++) {
01048 struct listen_arguments largs;
01049
01050 if (i == 0)
01051 mcast_addr = MCAST_ADDR_IPv4;
01052 else
01053 mcast_addr = MCAST_ADDR_IPv6;
01054
01055 if ((err = getaddrinfo(mcast_addr, serv_port_str, &hints, &info)) < 0) {
01056 log_error("%s: getaddrinfo %s\n", APP, gai_strerror(err));
01057 continue;
01058 }
01059
01060 if ((sfd = socket(info->ai_family, info->ai_socktype,
01061 info->ai_protocol)) < 0) {
01062 log_error("%s: socket %s\n", APP, strerror(errno));
01063 continue;
01064 }
01065 largs.sfd = sfd;
01066
01067 inet_ntop(info->ai_family, get_in_addr((struct sockaddr *)info->ai_addr),
01068 addr_str, sizeof(addr_str));
01069
01070 if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) < 0) {
01071 log_error("%s: setsockopt %s %s\n", APP, "SO_REUSEADDR", strerror(errno));
01072 close(sfd);
01073 continue;
01074 }
01075
01076 if (bind(sfd, info->ai_addr, info->ai_addrlen) < 0) {
01077 log_error("%s: bind %s\n", APP, strerror(errno));
01078 close(sfd);
01079 continue;
01080 }
01081
01082 if (join_mcast_group(sfd, (struct sockaddr_storage*)&(info->ai_addr),
01083 service_port) < 0)
01084 continue;
01085
01086 memcpy(&(largs.addr), &(info->ai_addr), sizeof(largs.addr));
01087 pthread_mutex_lock(&socket_mutex);
01088 memcpy(&(saddrs[num_sockets]), &(info->ai_addr),
01089 sizeof(saddrs[num_sockets]));
01090 sockets[num_sockets] = sfd;
01091 num_sockets++;
01092 if (pthread_create(&listening_threads[num_sockets-1], NULL,
01093 listener_thread, &largs) != 0)
01094 log_error("%s: Failed to start discovery for interface %s\n",
01095 APP, addr_str);
01096 else
01097 log_info("%s: Discovery service on interface %s\n", APP, addr_str);
01098 pthread_mutex_unlock(&socket_mutex);
01099 }
01100 freeaddrinfo(info);
01101
01102 pthread_mutex_lock(&socket_mutex);
01103 if (num_sockets == 0) {
01104 pthread_mutex_unlock(&socket_mutex);
01105 log_error("%s: Failed to bind discovery services\n", APP);
01106 return NULL;
01107 }
01108 pthread_mutex_unlock(&socket_mutex);
01109
01110 ann.nodeIdent = node_id;
01111 ann.hLevel = h_level;
01112 sendAnnounce(&ann);
01113
01114 pthread_mutex_lock(&socket_mutex);
01115 for (i = 0; i < num_sockets; i++) {
01116 pthread_join(listening_threads[i], NULL);
01117 }
01118 pthread_mutex_unlock(&socket_mutex);
01119 return NULL;
01120 }
01121
01122
01124
01125
01126 static int join_mcast_group(int sockfd, struct sockaddr_storage *ss_addr,
01127 int service_port) {
01128 int loopback = 0, ttl = 1;
01129
01130 if (ss_addr->ss_family == AF_INET) {
01131 struct in_addr test;
01132 struct sockaddr_in *addr = (struct sockaddr_in*)ss_addr;
01133 struct ip_mreq mreq;
01134
01135 if (inet_pton(AF_INET, MCAST_ADDR_IPv4, &test) &&
01136 !IN_MULTICAST(ntohl(test.s_addr))) {
01137
01138 }
01139
01140 memset(addr->sin_zero, '\0', sizeof(addr->sin_zero));
01141 addr->sin_family = AF_INET;
01142 addr->sin_port = htons(service_port);
01143 inet_pton(AF_INET, MCAST_ADDR_IPv4, &(addr->sin_addr));
01144
01145 mreq.imr_multiaddr.s_addr = ((struct sockaddr_in*)addr)->sin_addr.s_addr;
01146 mreq.imr_interface.s_addr = INADDR_ANY;
01147
01148 if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP,
01149 &loopback, sizeof(int)) < 0) {
01150 log_error("%s: setsockopt %s %s\n", APP, "IP_MULTICAST_LOOP",
01151 strerror(errno));
01152 close(sockfd);
01153 return -1;
01154 }
01155 if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_TTL,
01156 &ttl, sizeof(int)) < 0) {
01157 log_error("%s: setsockopt %s %s\n", APP, "IP_MULTICAST_TTL",
01158 strerror(errno));
01159 close(sockfd);
01160 return -1;
01161 }
01162 if (setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
01163 &mreq, sizeof(int)) < 0) {
01164 log_error("%s: setsockopt %s %s\n", APP, "IP_ADD_MEMBERSHIP",
01165 strerror(errno));
01166 close(sockfd);
01167 return -1;
01168 }
01169 }
01170 else if (ss_addr->ss_family == AF_INET6) {
01171 struct ipv6_mreq mreq;
01172 struct in6_addr test;
01173 struct sockaddr_in6 *addr = (struct sockaddr_in6*)ss_addr;
01174
01175 if (inet_pton(AF_INET6, MCAST_ADDR_IPv6, &test) &&
01176 !IN6_IS_ADDR_MULTICAST(&test)) {
01177
01178 }
01179
01180 addr->sin6_family = AF_INET6;
01181 addr->sin6_port = htons(service_port);
01182 inet_pton(AF_INET6, MCAST_ADDR_IPv6, &(addr->sin6_addr));
01183
01184 memcpy(&mreq.ipv6mr_multiaddr, &(((struct sockaddr_in6*)addr)->sin6_addr),
01185 sizeof(struct in6_addr));
01186 mreq.ipv6mr_interface = 0;
01187
01188 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
01189 &loopback, sizeof(int)) < 0) {
01190 log_error("%s: setsockopt %s %s\n", APP, "IPV6_MULTICAST_LOOP",
01191 strerror(errno));
01192 close(sockfd);
01193 return -1;
01194 }
01195 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS,
01196 &ttl, sizeof(int)) < 0) {
01197 log_error("%s: setsockopt %s %s\n", APP, "IPV6_MULTICAST_HOPS",
01198 strerror(errno));
01199 close(sockfd);
01200 return -1;
01201 }
01202 if (setsockopt(sockfd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
01203 &mreq, sizeof(int)) < 0) {
01204 log_error("%s: setsockopt %s %s\n", APP, "IPV6_ADD_MEMBERSHIP",
01205 strerror(errno));
01206 close(sockfd);
01207 return -1;
01208 }
01209 }
01210 return 0;
01211 }
01212
01213
01214 static void *get_in_addr(struct sockaddr *sa) {
01215 if (sa->sa_family == AF_INET)
01216 return &(((struct sockaddr_in*)sa)->sin_addr);
01217
01218 return &(((struct sockaddr_in6*)sa)->sin6_addr);
01219 }