00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "sensingS.h"
00036 #include "sensix.h"
00037 #include "sense_client.h"
00038 #include "sense_server.h"
00039 #include "tracking.h"
00040 #include "stacktrace.h"
00041 using namespace sensix;
00042
00043
00044 SenseClient::SenseClient(CORBA::ORB_var orb_val,
00045 PortableServer::POA_var poa_val,
00046 sensix::discovery::DiscoveryService *ds) : Response() {
00047 orb = orb_val;
00048 poa = poa_val;
00049 discover = ds;
00050 own_discover = false;
00051 local_server = NULL;
00052
00053 CORBA::Object_var ref =
00054 poa->servant_to_reference((PortableServer::ServantBase*)this);
00055 resp = Response::_narrow(ref);
00056 }
00057
00058
00059 SenseClient::SenseClient(int id, int level,
00060 int argc, char *argv[]) : Response() {
00061 orb = CORBA::ORB_init(argc, argv, "SenseClient");
00062 CORBA::Object_var poa_obj = orb->resolve_initial_references("RootPOA");
00063 poa = PortableServer::POA::_narrow(poa_obj.in());
00064 poa->the_POAManager()->activate();
00065
00066 local_server = NULL;
00067
00068 if (EXTERNAL_DISCOVERY) {
00069 CORBA::Object_var objref =
00070 orb->resolve_initial_references("DiscoveryService");
00071 discover = (sensix::discovery::DiscoveryService*)
00072 sensix::discovery::Self::_narrow(objref);
00073 own_discover = false;
00074 }
00075 else {
00076 discover = new sensix::discovery::DiscoveryService(orb, id, level);
00077 own_discover = true;
00078 }
00079
00080 CORBA::Object_var ref =
00081 poa->servant_to_reference((PortableServer::ServantBase*)this);
00082 resp = Response::_narrow(ref);
00083 }
00084
00085
00086 SenseClient::~SenseClient() {
00087 shutdown();
00088 }
00089
00090
00091 void SenseClient::shutdown() {
00092 if (own_discover)
00093 discover->shutdown();
00094
00095 if (local_server != NULL)
00096 local_server->shutdown();
00097 else {
00098 try {
00099 poa->the_POAManager()->deactivate(false, false);
00100 } catch (CORBA::Exception &e) {}
00101 orb->shutdown(false);
00102 orb->destroy();
00103 }
00104 }
00105
00106 void SenseClient::aggregate(Functor_ptr funct) {
00107 if (funct == NULL) {
00108 log_error("%s: Aggregate on a null functor.\n", APP);
00109 return;
00110 }
00111 Functor_ptr f = re_evolve(funct);
00112 if (f != NULL)
00113 Functor::_tao_release(f);
00114 }
00115
00116
00117
00118
00119 Functor_ptr SenseClient::re_evolve(Functor_ptr funct) {
00120 log_info("%s: aggregate %s.\n", APP, funct->asString());
00121
00122 TaskImpl *task = track.getTask(funct);
00123 Functor_ptr tf = NULL;
00124
00125 if (task != NULL) {
00126 tf = task->f();
00127
00128 TaskImpl *ti = track.getTask(tf);
00129 if (ti != NULL) {
00130 Functor_ptr tf_parent = ti->superf();
00131 if (tf_parent != NULL &&
00132 (tf_parent->identifier() == THETA ||
00133 tf_parent->identifier() == PSI))
00134 tf_parent->results(*(funct->results()));
00135 }
00136
00137 track.numTaskPeers(tf, track.numTaskPeers(tf) - 1);
00138 if (track.numTaskPeers(tf) <= 0) {
00139 ReqList *array = task->subtasks();
00140 for (unsigned int i = 0; i < array->length(); i++)
00141 Request::_tao_release((*array)[i]);
00142
00143 Functor_ptr f_prime = tf;
00144 Functor_ptr f_prev = NULL;
00145 while (f_prime != NULL) {
00146 f_prev = f_prime;
00147 TaskImpl *t = track.getTask(f_prime);
00148 track.removeTask(f_prime);
00149 if (t != NULL) {
00150 f_prime = t->superf();
00151 if (f_prime != NULL)
00152 log_info("%s: aggregate %s.\n", APP, f_prime->asString());
00153 }
00154 else
00155 f_prime = NULL;
00156 }
00157
00158 if (local_server != NULL)
00159 local_server->dataready(f_prev);
00160 tf = f_prev;
00161 }
00162 }
00163
00164 Functor::_tao_release(funct);
00165 return tf;
00166 }
00167
00168
00169
00170 bool SenseClient::reduction_test(sensix::sensing::Sensory_ptr f_prime,
00171 FunctorList *array) {
00172 if (array != NULL && array->length() > 0 &&
00173 (f_prime->identifier() == IOTA ||
00174 (discover->level() == 2 && (f_prime->level() == INVALID ||
00175 f_prime->level() >= discover->level())) ||
00176 (f_prime->level() != INVALID &&
00177 f_prime->level() >= discover->level())))
00178 return true;
00179 return false;
00180 }
00181
00182
00183 int SenseClient::devolve(Functor_ptr funct) {
00184 if (funct == NULL) {
00185 log_error("%s: Devolve on a null functor.\n", APP);
00186 return -1;
00187 }
00188 return select(NULL, funct);
00189 }
00190
00191
00192 int SenseClient::select(Functor_ptr prev, Functor_ptr funct) {
00193 log_info("%s: devolve %s.\n", APP, funct->asString());
00194
00195 TaskImpl *task = new TaskImpl(prev, funct, NULL);
00196 track.addTask(task);
00197
00198 if (discover == NULL)
00199 return -2;
00200
00201 sensix::sensing::Sensory_ptr f_prime =
00202 sensix::sensing::Sensory::_narrow(funct);
00203 FunctorList *array = f_prime->subfunctors();
00204 if (reduction_test(f_prime, array)) {
00205 int rv = 0;
00206 for (unsigned int i = 0; i < array->length(); i++)
00207 rv += select(f_prime, (*array)[i]);
00208 if ((rv / (int)array->length()) > -1)
00209 return 0;
00210 else
00211 return -1;
00212 }
00213
00214 sensix::discovery::RequestList *reqs = NULL;
00215 try {
00216 reqs = discover->queryFamilyNetwork(f_prime->sensor());
00217 } catch (sensix::discovery::DiscoveryException &de) {
00218 char cap[512], descr[512];
00219 capabilityToString(cap, f_prime->sensor());
00220 strcpy(descr, de.description);
00221 log_error("%s: Discovery exception on queryNetwork for %s: %s.\n",
00222 APP, cap, descr);
00223 return -1;
00224 }
00225
00226 for (unsigned int i = 0; i < reqs->length(); i++) {
00227 try {
00228 task->addSubtask((*reqs)[i], f_prime);
00229 track.numTaskPeers(funct, track.numTaskPeers(funct) + 1);
00230 (*reqs)[i]->apply(f_prime, resp);
00231 } catch (CORBA::Exception &e) {
00232 log_error("%s: Bad Param: %s %s.\n", APP, f_prime->asString(),
00233 e._info().c_str());
00234 printStackTrace("CORBA::", __FILE__, __LINE__);
00235 }
00236 }
00237 return 0;
00238 }
00239
00240
00241
00242 int SenseClient::detask(Functor_ptr funct) {
00243 if (funct == NULL) {
00244 log_error("%s: Detask on a null functor.\n", APP);
00245 return -1;
00246 }
00247 log_info("%s: detask %s.\n", APP, funct->asString());
00248
00249 TaskImpl *task = track.getTask(funct);
00250 if (task == NULL)
00251 return -1;
00252
00253 if (discover == NULL)
00254 return -2;
00255
00256 sensix::sensing::Sensory_ptr f_prime =
00257 sensix::sensing::Sensory::_narrow(task->f());
00258 FunctorList *array = f_prime->subfunctors();
00259 if (reduction_test(f_prime, array)) {
00260 int rv = 0;
00261 for (unsigned int i = 0; i < array->length(); i++)
00262 rv += detask((*array)[i]);
00263 if ((rv / (int)array->length()) > -1)
00264 return 0;
00265 else
00266 return -1;
00267 }
00268
00269 ReqList *subs = task->subtasks();
00270 for (unsigned int i = 0; i < subs->length(); i++)
00271 (*subs)[i]->cancel(sensix::sensing::Sensory::_narrow(task->f()));
00272 track.removeTask(task->f());
00273 task->f()->results(NULL);
00274 return 0;
00275 }