00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package sensix;
00036
00037 import java.util.*;
00038 import sensix.sensing.*;
00039
00040
00041 public class SenseClient implements Response
00042 {
00043 protected SensixNetworking net;
00044 protected SensixMarshalling marshal;
00045 protected TaskTracking track;
00046 protected SenseServer local_server;
00047 protected Logger logger;
00048 protected boolean own_net;
00049
00050
00051 public SenseClient(SensixNetworking n, String app, byte debug_level) {
00052 logger = new Logger(app, debug_level);
00053 net = n;
00054 own_net = false;
00055 local_server = null;
00056 track = new TaskTracking();
00057 }
00058
00059
00060 public SenseClient(int id, int level, String app, byte debug_level)
00061 throws Exception {
00062 logger = new Logger(app, debug_level);
00063
00064 local_server = null;
00065 track = new TaskTracking();
00066
00067
00068
00069
00070
00071
00072 own_net = true;
00073 }
00074
00075
00076 public void log(Logger.LOG type, String msg) {
00077 logger.log(type, msg);
00078 }
00079
00080 public synchronized void linkServer(SenseServer serv) {
00081 local_server = serv;
00082 }
00083
00084 public synchronized void shutdown() {
00085 if (own_net)
00086 net.shutdown();
00087
00088 log(Logger.LOG.INFO, "Shutdown");
00089 }
00090
00091
00092
00097 public void aggregate(Functor funct) {
00098 if (funct == null) {
00099 log(Logger.LOG.ERROR, "Aggregate on a null functor");
00100 return;
00101 }
00102 Functor f = re_evolve(funct);
00103 if (f != null)
00104 f = null;
00105 }
00106
00107
00108
00109
00110 protected synchronized Functor re_evolve(Functor funct) {
00111 log(Logger.LOG.DEBUG, "aggregate: " + funct.asString());
00112 Task task = track.get(funct);
00113 Functor tf = null;
00114
00115 if (task != null) {
00116 tf = task.f();
00117
00118 Task ti = track.get(tf);
00119 if (ti != null) {
00120 Functor tf_parent = ti.superf();
00121 if (tf_parent != null &&
00122 (tf_parent.identifier() == Sensix.THETA ||
00123 tf_parent.identifier() == Sensix.PSI))
00124 tf_parent.results(funct.results());
00125 }
00126
00127 track.num_peers(tf, track.num_peers(tf) - 1);
00128 if (track.num_peers(tf) <= 0) {
00129 Request[] array = task.subtasks();
00130 for (int i = 0; i < array.length; i++)
00131 array[i] = null;
00132
00133 Functor f_prime = tf;
00134 Functor f_prev = null;
00135 while (f_prime != null) {
00136 f_prev = f_prime;
00137 Task t = track.get(f_prime);
00138 track.remove(f_prime);
00139 if (t != null) {
00140 f_prime = t.superf();
00141 if (f_prime != null)
00142 log(Logger.LOG.DEBUG, "aggregate: " +
00143 f_prime.asString());
00144 }
00145 else
00146 f_prime = null;
00147 }
00148
00149 if (local_server != null)
00150 local_server.dataready(f_prev);
00151 tf = f_prev;
00152 }
00153 }
00154
00155 funct = null;
00156 return tf;
00157 }
00158
00159
00160
00161 protected boolean reduction_test(Sensory f_prime, Functor[] array) {
00162 if (array != null && array.length > 0 &&
00163 (f_prime.identifier() == Sensix.IOTA ||
00164 (net.level() == 2 && (f_prime.level() == Sensix.INVALID ||
00165 f_prime.level() >= net.level())) ||
00166 (f_prime.level() != Sensix.INVALID &&
00167 f_prime.level() >= net.level())))
00168 return true;
00169 return false;
00170 }
00171
00172
00178 public synchronized int devolve(Functor funct) {
00179 if (funct == null) {
00180 log(Logger.LOG.ERROR, "Devolve on a null functor");
00181 return -1;
00182 }
00183 return select(null, funct);
00184 }
00185
00186
00187 protected synchronized int select(Functor prev, Functor funct) {
00188 log(Logger.LOG.DEBUG, "devolve: " + funct.asString());
00189
00190 Task task = new Task(prev, funct, null);
00191 track.add(task);
00192
00193 if (net == null)
00194 return -2;
00195
00196 Sensory f_prime = (Sensory)funct;
00197 Functor[] array = f_prime.subfunctors();
00198 if (reduction_test(f_prime, array)) {
00199 int rv = 0;
00200 for (int i = 0; i < array.length; i++)
00201 rv += select(f_prime, array[i]);
00202 if ((rv / array.length) > -1)
00203 return 0;
00204 else
00205 return -1;
00206 }
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229 return 0;
00230 }
00231
00232
00233
00239 public synchronized int detask(Functor funct) {
00240 if (funct == null) {
00241 log(Logger.LOG.ERROR, "Detask on a null functor");
00242 return -1;
00243 }
00244 log(Logger.LOG.DEBUG, "detask: " + funct.asString());
00245 Task task = track.get(funct);
00246 if (task == null)
00247 return -1;
00248
00249 if (net == null)
00250 return -2;
00251
00252 Sensory f_prime = (Sensory)(task.f());
00253 Functor[] array = f_prime.subfunctors();
00254 if (reduction_test(f_prime, array)) {
00255 int rv = 0;
00256 for (int i = 0; i < array.length; i++)
00257 rv += detask(array[i]);
00258 if ((rv / array.length) > -1)
00259 return 0;
00260 else
00261 return -1;
00262 }
00263
00264 Request[] subs = task.subtasks();
00265 for (int i = 0; i < subs.length; i++)
00266 subs[i].cancel((Sensory)(task.f()));
00267 track.remove(task.f());
00268 task.f().results(new Data[0]);
00269 return 0;
00270 }
00271 }