00001
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 package gov.lanl.isr.sensix;
00036
00037 import java.net.*;
00038 import java.io.*;
00039 import java.util.*;
00040 import gov.lanl.isr.sensix.discovery.*;
00041 import gov.lanl.isr.sensix.sensing.*;
00042 import org.omg.CORBA.*;
00043 import org.omg.PortableServer.*;
00044
00045
00046 public class TinyOSClient extends RequestPOA
00047 implements Runnable, LevelOneClient, net.tinyos.message.MessageListener
00048 {
00049 class TosMessenger implements net.tinyos.util.Messenger {
00050 public void message(String message) {
00051 log(Logger.LOG.DEBUG, message);
00052 }
00053 }
00054
00055 class TosProcessing extends Thread {
00056 private SensixMsg msg;
00057 private TinyOSClient tos;
00058
00059 public TosProcessing(TinyOSClient t, SensixMsg m) {
00060 tos = t;
00061 msg = m;
00062 }
00063
00064 public void run() {
00065 byte[] data = new byte[SensixMsg.DEFAULT_MESSAGE_SIZE];
00066 for (int i = 0; i < data.length; i++)
00067 data[i] = (byte)msg.getElement_bytes(i);
00068
00070
00071
00072 boolean little_endian = false;
00073 if (data[0] != SensixMarshalling.MAGIC)
00074 return;
00075 if (data[1] != SensixMarshalling.VERSION)
00076 return;
00077 int wordsize = (data[2] & 0xff);
00078 if (wordsize >= SensixMarshalling.LITTLE_ENDIAN) {
00079 little_endian = true;
00080 wordsize &= (~SensixMarshalling.LITTLE_ENDIAN & 0xff);
00081 }
00082 int what = (data[3] & 0xff);
00083 int data_size = (data[4] & 0xff);
00084
00085
00087
00088 if (what == SensixMarshalling.METADATA) {
00089 int idx = SensixMarshalling.HEADER_SIZE;
00090 if (data[idx] != SenseCorba.DESCENDANTS)
00091 return;
00092 idx += 2;
00093
00094 byte[] id_bytes = new byte[wordsize];
00095 System.arraycopy(data, idx, id_bytes, 0, wordsize);
00096 int node_id = 0;
00097 if (little_endian)
00098 node_id = DiscoveryService.b2i_le(id_bytes, wordsize);
00099 else
00100 node_id = DiscoveryService.b2i(id_bytes, wordsize);
00101 idx += wordsize;
00102 System.err.println("Metadata from " + node_id);
00103
00104 int csize = (data[idx] & 0xff);
00105 idx++;
00106 for (int i = 0; i < csize; i++) {
00107 try {
00108 org.omg.CORBA.Object ref =
00109 tos.poa.servant_to_reference(tos);
00110 gov.lanl.isr.sensix.Request req =
00111 RequestHelper.narrow(ref);
00112 Share out = new Share(node_id, (byte)1,
00113 (byte)(data[idx + i] & 0xff),
00114 tos.orb.object_to_string(req));
00115 discover.sendDiscovery(out);
00116 }
00117 catch (org.omg.CORBA.UserException e) {
00118 log(Logger.LOG.DEBUG, "CORBA exception on share");
00119 }
00120 catch (java.lang.ArrayIndexOutOfBoundsException ae) {
00121 log(Logger.LOG.DEBUG, "Oversized capability array: " +
00122 csize);
00123 return;
00124 }
00125 }
00126 idx += csize;
00127
00128 if (data[idx] != SenseCorba.ENERGY)
00129 return;
00130 idx++;
00131 byte[] energy_bytes = new byte[wordsize];
00132 System.arraycopy(data, idx, energy_bytes, 0, wordsize);
00133 int energy = 0;
00134 if (little_endian)
00135 energy = DiscoveryService.b2i_le(energy_bytes, wordsize);
00136 else
00137 energy = DiscoveryService.b2i(energy_bytes, wordsize);
00138 idx += wordsize;
00140 marshal.xmit_metadata(SenseCorba.ANCESTORS);
00141 }
00143
00144 else if (what == SensixMarshalling.DATA) {
00145 log(Logger.LOG.DEBUG, "Processing data message");
00146 int idx = SensixMarshalling.HEADER_SIZE;
00147 byte[] id_bytes = new byte[wordsize];
00148 System.arraycopy(data, idx, id_bytes, 0, wordsize);
00149 int node_id = 0;
00150 if (little_endian)
00151 node_id = DiscoveryService.b2i_le(id_bytes, wordsize);
00152 else
00153 node_id = DiscoveryService.b2i(id_bytes, wordsize);
00154 idx += wordsize;
00155
00156 if (data[idx] != SenseCorba.ENERGY)
00157 return;
00158 idx++;
00159 byte[] energy_bytes = new byte[wordsize];
00160 System.arraycopy(data, idx, energy_bytes, 0, wordsize);
00161 double energy = 0;
00162 if (little_endian)
00163 energy = DiscoveryService.b2i_le(energy_bytes, wordsize);
00164 else
00165 energy = DiscoveryService.b2i(energy_bytes, wordsize);
00166 idx += wordsize;
00167
00168 if (data[idx] != SenseCorba.TIME)
00169 return;
00170 idx++;
00171 byte[] time_bytes = new byte[wordsize];
00172 System.arraycopy(data, idx, time_bytes, 0, wordsize);
00173 long time = 0;
00174 if (little_endian)
00175 time = DiscoveryService.b2i_le(time_bytes, wordsize);
00176 else
00177 time = DiscoveryService.b2i(time_bytes, wordsize);
00178 idx += wordsize;
00179
00180 byte ftype = (byte)(data[idx] & 0xff);
00181 byte fid = (byte)(data[idx+1] & 0xff);
00182 idx += 2;
00183
00184 byte[] count_bytes = new byte[wordsize];
00185 System.arraycopy(data, idx, count_bytes, 0, wordsize);
00186 int num = 0;
00187 if (little_endian)
00188 num = DiscoveryService.b2i_le(count_bytes, wordsize);
00189 else
00190 num = DiscoveryService.b2i(count_bytes, wordsize);
00191 idx += wordsize;
00192
00193 Data[] array = new Data[num];
00194 for (int i = 0; i < num; i++) {
00195 byte[] res_bytes = new byte[wordsize];
00196 System.arraycopy(data, idx, res_bytes, 0, wordsize);
00197 idx += wordsize;
00198 int result = 0;
00199 if (little_endian)
00200 result = DiscoveryService.b2i_le(res_bytes, wordsize);
00201 else
00202 result = DiscoveryService.b2i(res_bytes, wordsize);
00203 array[i] = new Data();
00204 array[i].iresult(result);
00205 }
00206
00207 TaskImpl task = track.tree.get((int)fid);
00208 if (task == null)
00209 return;
00210 Functor f = task.f();
00211 if (f == null)
00212 return;
00213 Sensory target = SensoryHelper.narrow(f);
00214 target.energyused(energy + target.energyused());
00215 if (time < target.timeused())
00216 time = target.timeused();
00217 target.timeused(time);
00218 target.results(array);
00219 task.callback().aggregate(target);
00220 }
00221 else
00222 log(Logger.LOG.DEBUG, "Dropping command message");
00223
00224 }
00225 }
00226
00227 protected Logger logger;
00228 protected DiscoveryService discover;
00229 protected SensixMarshalling marshal;
00230 protected ThreadGroup my_grp;
00231 protected SenseServer parent;
00232 protected String serial_port;
00233 protected Thread t;
00234 protected TaskTracking track;
00235 protected ORB orb;
00236 protected POA poa;
00237 protected String simulator;
00238 protected net.tinyos.message.MoteIF mote;
00239
00240
00241 public TinyOSClient(SenseServer serv, String app, byte debug, String sim) {
00242 this(serv, "/dev/ttyS0", app, debug, sim);
00243 }
00244
00245 public TinyOSClient(SenseServer serv, String serial, String app,
00246 byte debug, String sim) {
00247 t = new Thread(new ThreadGroup("tinyos"), this, "tinyos_proc");
00248 parent = serv;
00249 marshal = new SensixMarshalling(this, app, debug);
00250 discover = serv.discover;
00251 serial_port = serial;
00252 initLogger(app, debug);
00253 track = new TaskTracking();
00254 orb = serv.orb;
00255 poa = serv.poa;
00256 simulator = sim;
00257
00258 t.start();
00259 }
00260
00261
00262 public void run() {
00263 my_grp = t.getThreadGroup();
00264 startLevelOneInterface();
00265 }
00266
00267 public void startLevelOneInterface() {
00268 String default_baud = ":38400";
00269 String portname = "serial@" + serial_port;
00270
00271 if (portname.indexOf(':') == -1)
00272 portname += default_baud;
00273 if (simulator != "")
00274 portname = "tossim-radio@" + simulator;
00275
00276 int group_id = 125;
00277 TosMessenger msgr = new TosMessenger();
00278 net.tinyos.packet.PhoenixSource source =
00279 net.tinyos.packet.BuildSource.makePhoenix(portname, msgr);
00280 source.setResurrection();
00281 mote = new net.tinyos.message.MoteIF(source, group_id);
00282 mote.registerListener(new SensixMsg(), this);
00283 mote.start();
00284
00285 log(Logger.LOG.DEBUG, "Spawning TinyOS interface at " + portname);
00286 }
00287
00288
00289 public void messageReceived(int dest, net.tinyos.message.Message m) {
00290 if (!(m instanceof SensixMsg))
00291 return;
00292 SensixMsg msg = (SensixMsg)m;
00293 TosProcessing p = new TosProcessing(this, msg);
00294 p.start();
00295 }
00296
00297
00298 public void initLogger(String app, byte debug_level) {
00299 logger = new Logger(app, debug_level);
00300 }
00301
00302 public void log(Logger.LOG type, String msg) {
00303 logger.log(type, msg);
00304 }
00305
00306 public void shutdown() {
00307 log(Logger.LOG.DEBUG, "TinyOS shutdown");
00308 my_grp.interrupt();
00309 }
00310
00311
00312
00313 public int identity() {
00314 return discover.identifier();
00315 }
00316
00317 public DiscoveryService discovery() {
00318 return discover;
00319 }
00320
00321 public double check_energy() {
00322 return 1.0;
00323 }
00324
00325 public TaskTracking tracker() {
00326 return track;
00327 }
00328
00329 public SortedMap<Byte,Capability> capabilities() {
00330 return parent.local_capabilities;
00331 }
00332
00333 public org.omg.CORBA.ORB orb() {
00334 return orb;
00335 }
00336
00337 public org.omg.PortableServer.POA poa() {
00338 return poa;
00339 }
00340
00341 public void send_up(int what, int size, byte msg[]) {
00342 log(Logger.LOG.ERROR, "Interface error: invalid call to send_up");
00343 }
00344
00345 public void bcast(int what, int size, byte msg[]) {
00346 log(Logger.LOG.ERROR, "Interface error: invalid call to bcast");
00347 }
00348
00349 public void send_dn(int what, int size, byte msg[]) {
00350 byte[] hdr = marshal.load_header((byte)(what & 0xff),
00351 (byte)(size & 0xff));
00352 short[] total = new short[size + SensixMarshalling.HEADER_SIZE];
00353 SensixMarshalling.byte2short_arraycopy(hdr, 0, total, 0,
00354 SensixMarshalling.HEADER_SIZE);
00355 SensixMarshalling.byte2short_arraycopy(msg, 0, total,
00356 SensixMarshalling.HEADER_SIZE,
00357 size);
00358 if (mote != null) {
00359 try {
00360 SensixMsg sm = new SensixMsg();
00361 for (int i = 0; i < total.length; i++)
00362 sm.setElement_bytes(i, total[i]);
00363 mote.send(net.tinyos.message.MoteIF.TOS_BCAST_ADDR,
00364 (net.tinyos.message.Message)sm);
00365 } catch (java.io.IOException e) {
00366 log(Logger.LOG.ERROR, "Error sending to TinyOS nodes: " + e);
00367 }
00368 }
00369 }
00370
00371
00372
00373 public void addCapability(byte cap, Capability obj) {
00374 log(Logger.LOG.ERROR, "Invalid attempt to add " +
00375 SenseUtil.capabilityToString(cap) + " capability to TinyOS mote");
00376 }
00377
00378 public void apply(Functor funct, Response callback) {
00379 if (funct == null) {
00380 log(Logger.LOG.ERROR, "Apply on a null functor");
00381 return;
00382 }
00383 TaskImpl task = new TaskImpl(null, funct, callback);
00384
00385 log(Logger.LOG.DEBUG, "apply: " + funct.asString());
00386 synchronized(this) {
00387 track.add(task);
00388 track.num_peers(funct, 1);
00389 marshal.xmit_command(funct);
00390 }
00391 }
00392
00393 public synchronized void cancel(Functor funct) {
00394 if (funct == null) {
00395 log(Logger.LOG.ERROR, "Cancel on a null functor");
00396 return;
00397 }
00398 log(Logger.LOG.DEBUG, "cancel: " + funct.asString());
00399 track.cancelled(funct, true);
00400 track.remove(funct);
00401 }
00402
00403 public void dataready(Functor funct) {}
00404 }