00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include <pthread.h>
00036
00037 #include "sensingS.h"
00038 #include "sensix.h"
00039 #include "sense_server.h"
00040 #include "sense_client.h"
00041 #include "discovery.h"
00042 #include "tracking.h"
00043 #include "stacktrace.h"
00044 using namespace sensix;
00045
00046
00047 pthread_mutex_t capabilities_mutex = PTHREAD_MUTEX_INITIALIZER;
00048 pthread_mutex_t tracker_mutex = PTHREAD_MUTEX_INITIALIZER;
00049
00050
00051
00052 SenseServer::SenseServer(int id, int level,
00053 int argc, char *argv[]) : Request() {
00054 orb = CORBA::ORB_init(argc, argv, "SenseServer");
00055 CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
00056 poa = PortableServer::POA::_narrow(poa_obj.in());
00057 poa->the_POAManager()->activate();
00058
00059 local_client = NULL;
00060
00061 if (EXTERNAL_DISCOVERY) {
00062 CORBA::Object_var objref =
00063 orb->resolve_initial_references("DiscoveryService");
00064 discover = (sensix::discovery::DiscoveryService*)
00065 sensix::discovery::Self::_narrow(objref);
00066 own_discover = false;
00067 }
00068 else {
00069 discover = new sensix::discovery::DiscoveryService(orb, id, level);
00070 own_discover = true;
00071 }
00072
00073 if (level > 1) {
00074 local_client = new SenseClient(orb, poa, discover);
00075 local_client->linkServer(this);
00076 }
00077 }
00078
00079
00080 SenseServer::~SenseServer() {
00081 shutdown();
00082 delete discover;
00083 delete local_client;
00084 }
00085
00086 void SenseServer::shutdown() {
00087 log_info("Shutting down the ORB\n");
00088 if (own_discover)
00089 discover->shutdown();
00090
00091 try {
00092 poa->the_POAManager()->deactivate(false, false);
00093 } catch (CORBA::Exception &e) {}
00094 orb->shutdown(true);
00095 orb->destroy();
00096
00097 log_info("Shutdown\n");
00098 }
00099
00100
00101 void SenseServer::addCapability(uint8_t cap, Capability_ptr obj) {
00102 try {
00103 CORBA::Object_var ref =
00104 poa->servant_to_reference((PortableServer::ServantBase*)this);
00105 Request_ptr req = Request::_narrow(ref);
00106
00107 discover->registerObject(cap, req);
00108 pthread_mutex_lock(&capabilities_mutex);
00109 local_capabilities.insertCapability(cap, obj);
00110 pthread_mutex_unlock(&capabilities_mutex);
00111
00112 char cap_str[512];
00113 capabilityToString(cap_str, cap);
00114 log_info("%s: Added %s.\n", APP, cap_str);
00115 } catch (sensix::discovery::DiscoveryException &de) {
00116 char descr[512];
00117 strcpy(descr, de.description);
00118 log_error("%s: Discovery exception while registering capability: %s.\n",
00119 APP, descr);
00120 } catch (CORBA::Exception &ue) {
00121 log_error("%s: CORBA POA exception while registering capability - %s.\n",
00122 APP, ue._info().c_str());
00123 printStackTrace("CORBA::", __FILE__, __LINE__);
00124 }
00125 }
00126
00127
00128 void SenseServer::apply(Functor_ptr funct, Response_ptr callback) {
00129 if (funct == NULL) {
00130 log_error("%s: Apply on a null functor.\n", APP);
00131 return;
00132 }
00133 TaskImpl *task = new TaskImpl(NULL, funct, callback);
00134 bool run;
00135
00136 log_info("%s apply %s.\n", APP, funct->asString());
00137 pthread_mutex_lock(&tracker_mutex);
00138 track.addTask(task);
00139 track.numTaskPeers(funct, 1);
00140 if (local_client != NULL)
00141 local_client->devolve(funct);
00142
00143 sensix::sensing::Sensory_ptr f_s = sensix::sensing::Sensory::_narrow(funct);
00144 uint8_t key = f_s->sensor();
00145 Capability *cap = NULL;
00146 if ((cap = local_capabilities.getCapability(key)) != NULL)
00147 cap->acquire(funct);
00148
00149 run = (track.numTaskPeers(funct) > 0 &&
00150 track.cancelledTask(funct) == false);
00151 pthread_mutex_unlock(&tracker_mutex);
00152
00153 if (local_client == NULL) {
00154 if (callback != NULL)
00155 callback->aggregate(funct);
00156 track.removeTask(funct);
00157 }
00158 }
00159
00160
00161 void SenseServer::cancel(Functor_ptr funct) {
00162 if (funct == NULL) {
00163 log_error("%s: Cancel on a null functor.\n", APP);
00164 return;
00165 }
00166 log_info("%s: cancel %s.\n", APP, funct->asString());
00167 track.cancelledTask(funct, true);
00168 if (local_client != NULL)
00169 local_client->detask(funct);
00170 track.removeTask(funct);
00171 }
00172
00173
00174 void SenseServer::dataready(Functor_ptr f) {
00175 if (f == NULL) {
00176 log_error("%s: Dataready on a null functor.\n", APP);
00177 return;
00178 }
00179 sensix::sensing::Sensory_ptr funct = sensix::sensing::Sensory::_narrow(f);
00180 log_info("%s: dataready %s.\n", APP, funct->asString());
00181 TaskImpl *task = track.getTask(funct);
00182 if (task == NULL || track.cancelledTask(funct))
00183 return;
00184
00185 track.numTaskPeers(funct, track.numTaskPeers(funct) - 1);
00186 if (track.numTaskPeers(funct) <= 0) {
00187 if (task->callback() != NULL)
00188 task->callback()->aggregate(funct);
00189 track.removeTask(funct);
00190 }
00191 }
00192
00193
00194
00195
00196 extern pthread_mutex_t ancestor_mutex;
00197 extern bool_t ancestors_running;
00198
00199
00200 void ancestors_shutdown(int sig)
00201 {
00202 pthread_mutex_lock(&ancestor_mutex);
00203 ancestors_running = false;
00204 pthread_mutex_unlock(&ancestor_mutex);
00205 }
00206
00207
00208 static void *run_service(void *argument)
00209 {
00210 SenseServer *server;
00211
00212 if (argument == NULL)
00213 die("Failed to start", "server is null");
00214 server = (SenseServer*)argument;
00215
00216 server->run();
00217
00218 return NULL;
00219 }
00220
00221
00222 void *ancestors_server(void *arguments)
00223 {
00224 struct args *a;
00225 SenseServer *server;
00226 pthread_t service;
00227
00228 if (arguments == NULL)
00229 die("Failed to start", "null thread arguments");
00230 a = (struct args*)arguments;
00231
00232 try {
00233 server = new SenseServer(a->id, a->level, a->argc, a->argv);
00234 }
00235 catch (CORBA::Exception &ex) {
00236 log_error("%s: service exception - %s\n", APP, ex._info().c_str());
00237 printStackTrace("CORBA::", __FILE__, __LINE__);
00238 return NULL;
00239 }
00240
00241 log_info("%s: service started\n", APP);
00242 pthread_mutex_lock(&ancestor_mutex);
00243 ancestors_running = true;
00244 pthread_mutex_unlock(&ancestor_mutex);
00245
00246 if (pthread_create(&service, NULL, run_service, server) != 0)
00247 die("Failed to start", "server subthread");
00248
00249 while (1) {
00250 sleep(1);
00251 pthread_mutex_lock(&ancestor_mutex);
00252 if (ancestors_running == false)
00253 break;
00254 pthread_mutex_unlock(&ancestor_mutex);
00255 }
00256 server->shutdown();
00257 pthread_join(service, NULL);
00258
00259 return NULL;
00260 }
00261
00262
00263
00264 #ifndef DESCENDANT_SERVER
00265
00266 pthread_mutex_t descendant_mutex = PTHREAD_MUTEX_INITIALIZER;
00267 bool_t descendants_running = false;
00268
00269
00270 void descendants_shutdown(int sig)
00271 {
00272 pthread_mutex_lock(&descendant_mutex);
00273 descendants_running = false;
00274 pthread_mutex_unlock(&descendant_mutex);
00275 }
00276
00277
00278 void *descendants_client(void *arguments)
00279 {
00280 pthread_mutex_lock(&descendant_mutex);
00281 descendants_running = true;
00282 pthread_mutex_unlock(&descendant_mutex);
00283
00284 while (1) {
00285 sleep(1);
00286 pthread_mutex_lock(&descendant_mutex);
00287 if (descendants_running == false)
00288 break;
00289 pthread_mutex_unlock(&descendant_mutex);
00290 }
00291
00292 return NULL;
00293 }
00294
00295 #endif