00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046 #include <stdio.h>
00047 #include <stddef.h>
00048
00049 #include "net/rime.h"
00050 #include "net/rime/rudolph1.h"
00051 #include "cfs/cfs.h"
00052
00053 #define DEFAULT_SEND_INTERVAL CLOCK_SECOND * 2
00054 #define TRICKLE_INTERVAL CLOCK_SECOND / 2
00055 #define NACK_TIMEOUT CLOCK_SECOND / 4
00056 #define REPAIR_TIMEOUT CLOCK_SECOND / 4
00057
00058 struct rudolph1_hdr {
00059 uint8_t type;
00060 uint8_t version;
00061 uint16_t chunk;
00062 };
00063
00064 #define RUDOLPH1_DATASIZE 64
00065
00066 struct rudolph1_datapacket {
00067 struct rudolph1_hdr h;
00068 uint8_t datalen;
00069 uint8_t data[RUDOLPH1_DATASIZE];
00070 };
00071
00072 enum {
00073 TYPE_DATA,
00074 TYPE_NACK,
00075 };
00076
00077 #define DEBUG 0
00078 #if DEBUG
00079 #include <stdio.h>
00080 #define PRINTF(...) printf(__VA_ARGS__)
00081 #else
00082 #define PRINTF(...)
00083 #endif
00084
00085 #define LT(a, b) ((signed char)((a) - (b)) < 0)
00086
00087
00088 static int
00089 read_data(struct rudolph1_conn *c, uint8_t *dataptr, int chunk)
00090 {
00091 int len = 0;
00092
00093 if(c->cb->read_chunk) {
00094 len = c->cb->read_chunk(c, chunk * RUDOLPH1_DATASIZE,
00095 dataptr, RUDOLPH1_DATASIZE);
00096 }
00097 return len;
00098 }
00099
00100 static int
00101 format_data(struct rudolph1_conn *c, int chunk)
00102 {
00103 struct rudolph1_datapacket *p;
00104
00105 packetbuf_clear();
00106 p = packetbuf_dataptr();
00107 p->h.type = TYPE_DATA;
00108 p->h.version = c->version;
00109 p->h.chunk = chunk;
00110 p->datalen = read_data(c, p->data, chunk);
00111 packetbuf_set_datalen(sizeof(struct rudolph1_datapacket) -
00112 (RUDOLPH1_DATASIZE - p->datalen));
00113
00114 return p->datalen;
00115 }
00116
00117 static void
00118 write_data(struct rudolph1_conn *c, int chunk, uint8_t *data, int datalen)
00119 {
00120 if(chunk == 0) {
00121 c->cb->write_chunk(c, 0, RUDOLPH1_FLAG_NEWFILE, data, 0);
00122 }
00123
00124 if(datalen < RUDOLPH1_DATASIZE) {
00125 PRINTF("%d.%d: get %d bytes, file complete\n",
00126 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00127 datalen);
00128 c->cb->write_chunk(c, chunk * RUDOLPH1_DATASIZE,
00129 RUDOLPH1_FLAG_LASTCHUNK, data, datalen);
00130 } else {
00131 c->cb->write_chunk(c, chunk * RUDOLPH1_DATASIZE,
00132 RUDOLPH1_FLAG_NONE, data, datalen);
00133 }
00134 }
00135
00136 static void
00137 send_nack(struct rudolph1_conn *c)
00138 {
00139 struct rudolph1_hdr *hdr;
00140 packetbuf_clear();
00141 packetbuf_hdralloc(sizeof(struct rudolph1_hdr));
00142 hdr = packetbuf_hdrptr();
00143
00144 hdr->type = TYPE_NACK;
00145 hdr->version = c->version;
00146 hdr->chunk = c->chunk;
00147
00148 PRINTF("%d.%d: Sending nack for %d:%d\n",
00149 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00150 hdr->version, hdr->chunk);
00151 ipolite_send(&c->ipolite, NACK_TIMEOUT, sizeof(struct rudolph1_hdr));
00152 }
00153
00154 static void
00155 handle_data(struct rudolph1_conn *c, struct rudolph1_datapacket *p)
00156 {
00157 if(LT(c->version, p->h.version)) {
00158 PRINTF("%d.%d: rudolph1 new version %d, chunk %d\n",
00159 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00160 p->h.version, p->h.chunk);
00161 c->version = p->h.version;
00162 c->highest_chunk_heard = c->chunk = 0;
00163 if(p->h.chunk != 0) {
00164 send_nack(c);
00165 } else {
00166 write_data(c, 0, p->data, p->datalen);
00167 c->chunk = 1;
00168 }
00169
00170 } else if(p->h.version == c->version) {
00171 PRINTF("%d.%d: got chunk %d (%d) highest heard %d\n",
00172 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00173 p->h.chunk, c->chunk, c->highest_chunk_heard);
00174
00175 if(p->h.chunk == c->chunk) {
00176 PRINTF("%d.%d: received chunk %d\n",
00177 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00178 p->h.chunk);
00179 write_data(c, p->h.chunk, p->data, p->datalen);
00180 if(c->highest_chunk_heard < c->chunk) {
00181 c->highest_chunk_heard = c->chunk;
00182 }
00183 c->chunk++;
00184 } else if(p->h.chunk > c->chunk) {
00185 PRINTF("%d.%d: received chunk %d > %d, sending NACK\n",
00186 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00187 p->h.chunk, c->chunk);
00188 send_nack(c);
00189 c->highest_chunk_heard = p->h.chunk;
00190 } else if(p->h.chunk < c->chunk) {
00191
00192 }
00193
00194
00195
00196
00197 if(c->highest_chunk_heard > p->h.chunk) {
00198 send_nack(c);
00199 }
00200 } else {
00201
00202 }
00203
00204 }
00205
00206 static void
00207 recv_trickle(struct trickle_conn *trickle)
00208 {
00209 struct rudolph1_conn *c = (struct rudolph1_conn *)trickle;
00210 struct rudolph1_datapacket *p = packetbuf_dataptr();
00211
00212 if(p->h.type == TYPE_DATA) {
00213 PRINTF("%d.%d: received trickle with chunk %d\n",
00214 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00215 p->h.chunk);
00216 handle_data(c, p);
00217 }
00218 }
00219
00220 static void
00221 sent_ipolite(struct ipolite_conn *ipolite)
00222 {
00223 PRINTF("%d.%d: Sent ipolite\n",
00224 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
00225 }
00226
00227 static void
00228 dropped_ipolite(struct ipolite_conn *ipolite)
00229 {
00230 PRINTF("%d.%d: dropped ipolite\n",
00231 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
00232 }
00233
00234 static void
00235 recv_ipolite(struct ipolite_conn *ipolite, const rimeaddr_t *from)
00236 {
00237 struct rudolph1_conn *c = (struct rudolph1_conn *)
00238 ((char *)ipolite - offsetof(struct rudolph1_conn, ipolite));
00239 struct rudolph1_datapacket *p = packetbuf_dataptr();
00240
00241 PRINTF("%d.%d: Got ipolite type %d\n",
00242 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00243 p->h.type);
00244
00245 c->nacks++;
00246
00247 if(p->h.type == TYPE_NACK) {
00248 PRINTF("%d.%d: Got NACK for %d:%d (%d:%d)\n",
00249 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00250 p->h.version, p->h.chunk,
00251 c->version, c->chunk);
00252 if(p->h.version == c->version) {
00253 if(p->h.chunk < c->chunk) {
00254
00255 PRINTF("%d.%d: sending repair for chunk %d\n",
00256 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00257 p->h.chunk);
00258 format_data(c, p->h.chunk);
00259 ipolite_send(&c->ipolite, REPAIR_TIMEOUT, sizeof(struct rudolph1_hdr));
00260 }
00261 } else if(LT(p->h.version, c->version)) {
00262 format_data(c, 0);
00263 ipolite_send(&c->ipolite, c->send_interval / 2, sizeof(struct rudolph1_hdr));
00264 }
00265 } else if(p->h.type == TYPE_DATA) {
00266
00267 PRINTF("%d.%d: got repair for chunk %d\n",
00268 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00269 p->h.chunk);
00270 handle_data(c, p);
00271 }
00272 }
00273
00274 static void
00275 send_next_packet(void *ptr)
00276 {
00277 struct rudolph1_conn *c = ptr;
00278 int len;
00279 if(c->nacks == 0) {
00280 len = format_data(c, c->chunk);
00281 trickle_send(&c->trickle);
00282 if(len == RUDOLPH1_DATASIZE) {
00283 ctimer_set(&c->t, c->send_interval, send_next_packet, c);
00284 }
00285 PRINTF("%d.%d: send_next_packet chunk %d, next %d\n",
00286 rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
00287 c->chunk, c->chunk + 1);
00288
00289 c->highest_chunk_heard = c->chunk;
00290 c->chunk++;
00291
00292 } else {
00293 ctimer_set(&c->t, c->send_interval, send_next_packet, c);
00294 }
00295 c->nacks = 0;
00296 }
00297
00298 static const struct ipolite_callbacks ipolite = { recv_ipolite, sent_ipolite,
00299 dropped_ipolite };
00300 static const struct trickle_callbacks trickle = { recv_trickle };
00301
00302 void
00303 rudolph1_open(struct rudolph1_conn *c, uint16_t channel,
00304 const struct rudolph1_callbacks *cb)
00305 {
00306 trickle_open(&c->trickle, TRICKLE_INTERVAL, channel, &trickle);
00307 ipolite_open(&c->ipolite, channel + 1, 1, &ipolite);
00308 c->cb = cb;
00309 c->version = 0;
00310 c->send_interval = DEFAULT_SEND_INTERVAL;
00311 }
00312
00313 void
00314 rudolph1_close(struct rudolph1_conn *c)
00315 {
00316 trickle_close(&c->trickle);
00317 ipolite_close(&c->ipolite);
00318 }
00319
00320 void
00321 rudolph1_send(struct rudolph1_conn *c, clock_time_t send_interval)
00322 {
00323 c->version++;
00324 c->chunk = c->highest_chunk_heard = 0;
00325
00326 format_data(c, 0);
00327 trickle_send(&c->trickle);
00328 c->chunk++;
00329 c->send_interval = send_interval;
00330 ctimer_set(&c->t, send_interval, send_next_packet, c);
00331 }
00332
00333 void
00334 rudolph1_stop(struct rudolph1_conn *c)
00335 {
00336 ctimer_stop(&c->t);
00337 }
00338
00339