00001 using System;
00002 using System.Collections;
00003 using System.Collections.Specialized;
00004 using System.Threading;
00005 using Common;
00006
00007 namespace ProtocolStack
00008 {
00009
00014 public class MessageAssembly
00015 {
00017 AssemblyLine assemblyLine;
00018 protected IncomingMessageQueue messageQueueOutput;
00019 protected StackInterface thisStack;
00020 ChunkAssembler chunkAssembler;
00021 Thread processingThread;
00022
00024
00025
00026
00027
00028
00029 public MessageAssembly(RemoteHostComms remoteHost) {
00030 thisStack = remoteHost.thisStack;
00031 assemblyLine = new AssemblyLine();
00032 chunkAssembler = new ChunkAssembler();
00033 messageQueueOutput = thisStack.ReceivedMessages;
00034 }
00035
00037 public void Process(Segment s) {
00038
00039 lock(assemblyLine.SyncRoot) {
00040 assemblyLine.Add(s);
00041 if (s.IsRetransmission) {
00042 Console.WriteLine("Just added retransmissoin {0} to assembly line", s.Headers.SequenceNumber);
00043 }
00044 Monitor.PulseAll(assemblyLine.SyncRoot);
00045 }
00046 }
00047
00048 public void Start() {
00049 if (processingThread == null || !processingThread.IsAlive) {
00050 processingThread = new Thread(new ThreadStart(Run));
00051 assemblyLine.Clear();
00052 chunkAssembler.Init();
00053 processingThread.Start();
00054 } else {
00055 Console.Error.WriteLine("*** Error starting Message Assembly thread: Thread already started! ***");
00056 }
00057 }
00058
00059 public void Stop() {
00060 if (processingThread != null && processingThread.IsAlive) {
00061 processingThread.Abort();
00062 processingThread.Join();
00063 }
00064 }
00065
00066 void Run() {
00067 Console.WriteLine("Started Assembly line assembler");
00068 Message msg = null;
00069 try {
00070 while(true) {
00071
00072 msg = null;
00073 lock (assemblyLine.SyncRoot) {
00074 while (!assemblyLine.HoldsCompleteMessage) {
00075 Monitor.Wait(assemblyLine.SyncRoot);
00076 }
00077
00078 msg = assemblyLine.ExtractCompleteMessage();
00079 Monitor.PulseAll(assemblyLine.SyncRoot);
00080 }
00081
00082
00083 if (msg.Type == MessageType.Retransmission) {
00084 RetransmissionMessage retrans = (RetransmissionMessage)msg;
00085 Segment retransSegment = retrans.RawSegment;
00086 retransSegment.IsRetransmission = true;
00087 retransSegment.Headers.TimeReceived = DateTime.Now;
00088 Console.WriteLine("Just got retransmission of {0}. Adding to assembly. Retrans flag {1}", retrans.RawSegment.Headers.SequenceNumber, retrans.RawSegment.IsRetransmission);
00089 thisStack.GetRemoteHostByID(retransSegment.Headers.SourceDeviceID).AcceptIncomingSegment(retransSegment);
00090 } else if (msg.Type == MessageType.Chunk) {
00091 Console.WriteLine(" > Got Chunk");
00092 chunkAssembler.Add(msg);
00093 while (chunkAssembler.ContainsCompleteMessage()) {
00094 Message reChunkedMsg = chunkAssembler.ExtractCompleteMessage();
00095 if (reChunkedMsg != null) {
00096 Console.WriteLine(">>> Assembled Chunked Message");
00097 messageQueueOutput.EnqueueBlocking(reChunkedMsg);
00098 } else {
00099 throw new ApplicationException("Extracted a Null chunked message");
00100 }
00101 }
00102 } else {
00103 messageQueueOutput.EnqueueBlocking(msg);
00104 }
00105 }
00106 } catch (ThreadAbortException) {
00107 Console.WriteLine("Shutting down message assembly");
00108 }
00109
00110 }
00111 }
00112
00113
00114
00119 class AssemblyLine : SortedList {
00120
00121
00123
00124
00125
00126 private int startCount;
00130 private int finishCount;
00131
00133
00134
00135
00136 public AssemblyLine() : base() {
00137 startCount = 0;
00138 finishCount = 0;
00139 }
00140
00143
00144
00145
00146
00147 public void Add(Segment seg) {
00148 this.Add(seg.Headers.SequenceNumber, seg);
00149 if (seg.Headers.IsFirstInMessage) { startCount++; }
00150 if (seg.Headers.IsLastInMessage) { finishCount++; }
00151
00152 }
00153
00154
00159 public bool HoldsCompleteMessage {
00160 get {
00161 int i, j = -1;
00162 bool result = false;
00163 if (startCount > 0 && finishCount > 0) {
00164
00165
00166 this.IdentifyCompleteMessage(out i, out j);
00167 if (i == -1 || j == -1) { result = false; } else { result = true; }
00168 }
00169 return result;
00170 }
00171 }
00172
00177 public Message ExtractCompleteMessage() {
00178 Message result;
00179 int startIndex, finishIndex;
00180 SegmentData[] segmentData;
00181 uint[] seqNumbers;
00182
00183 IdentifyCompleteMessage(out startIndex, out finishIndex);
00184 segmentData = new SegmentData[finishIndex - startIndex + 1];
00185 seqNumbers = new uint[finishIndex - startIndex + 1];
00186 for (int i = startIndex; i <= finishIndex; i++) {
00187 segmentData[i-startIndex] = ((Segment)this.GetByIndex(i)).Data;
00188 seqNumbers[i-startIndex] = ((Segment)this.GetByIndex(i)).Headers.SequenceNumber;
00189 }
00190 foreach(uint seqNumber in seqNumbers) {
00191 Remove(seqNumber);
00192 }
00193 result = CreateTypedMessage(segmentData);
00194 startCount--;
00195 finishCount--;
00196 return result;
00197 }
00198
00199 public void PrintContents() {
00200 Console.Write("StartCount: {0}, FinishCouunt: {1}. Contents:", startCount, finishCount);
00201 for (int i = 0; i < this.Count; i++) {
00202 Segment s = (Segment)this.GetByIndex(i);
00203 Console.Write("[{0}] {1} {2}{3},", i, s.Headers.SequenceNumber, s.Headers.IsFirstInMessage ? "F" : "_", s.Headers.IsLastInMessage ? "L" : "_");
00204 }
00205 Console.WriteLine();
00206 }
00212 private void IdentifyCompleteMessage(out int startIndex, out int finishIndex) {
00213
00214 startIndex = -1;
00215 finishIndex = -1;
00216 bool foundIt = false;
00217 int index = 0;
00218 int potentialStartIndex;
00219 uint lastSeqSeen;
00220 Segment currentSegment;
00221
00222
00223
00224
00225
00226 while (!foundIt && index < Count) {
00227 currentSegment = (Segment)this.GetByIndex(index);
00228 if (!currentSegment.Headers.IsFirstInMessage) {
00229 index++;
00230 } else {
00231 potentialStartIndex = index;
00232
00233 if (!currentSegment.Headers.IsLastInMessage) {
00234
00235 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00236 index++;
00237 if (index >= Count) { break; }
00238 currentSegment = (Segment)this.GetByIndex(index);
00239 while ((!currentSegment.Headers.IsLastInMessage) && (currentSegment.Headers.SequenceNumber == (lastSeqSeen +1))) {
00240
00241 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00242 index++;
00243 if (index >= Count) { break; }
00244 currentSegment = (Segment)this.GetByIndex(index);
00245
00246 }
00247
00248
00249
00250 if (currentSegment.Headers.SequenceNumber == (lastSeqSeen + 1)) {
00251
00252 startIndex = potentialStartIndex;
00253 finishIndex = index;
00254 foundIt = true;
00255 } else {
00256 Console.WriteLine("Gap In sequence detected");
00257
00258
00259 }
00260 } else {
00261 startIndex = potentialStartIndex;
00262 finishIndex = index;
00263 foundIt = true;
00264 }
00265 }
00266 }
00267 }
00268
00269 public Message CreateTypedMessage(SegmentData[] src) {
00270 return CreateTypedMessageStatic(src);
00271 }
00272
00273 public static Message CreateTypedMessageStatic(SegmentData[] src) {
00274 byte[] data = SegmentData.ArrayToByteArray(src);
00275 MessageType msgType = (MessageType)src[0].Data[0];
00276 Message result;
00277 switch(msgType) {
00278 case MessageType.Noop: { result = new EmptyMessage(); break; }
00279 case MessageType.Retransmission: { result = new RetransmissionMessage(data); break; }
00280 case MessageType.CacheIndex: { result = new CacheIndexMessage(data); break; }
00281 case MessageType.NoChange: { result = new NoChangeMessage(data); break; }
00282 case MessageType.HTTPRequest: { result = new HTTPRequestMessage(data); break; }
00283 case MessageType.HTTPResponse: { result = new HTTPResponseMessage(data); break; }
00284 case MessageType.CacheUpdateHTTPResponse: { result = new CacheUpdateHTTPResponseMsg(data); break; }
00285 case MessageType.CacheUpdateNoChange: {result = new CacheUpdateNoChangeMsg(data); break; }
00286 case MessageType.Chunk: { result = new MessageChunk(data); break; }
00287 case MessageType.CacheIndexRequest : { result = new CacheIndexRequestMessage(data); break; }
00288 default: { throw new NotSupportedException(String.Format("Message type of {0} is not recognised", msgType)); }
00289 }
00290 return result;
00291 }
00292 }
00293
00294 public class ChunkAssembler {
00295
00299 class AssemblyContainer {
00300
00304 class ChunkedMessageAssembler {
00305 private int first;
00306 private int last;
00307 private int count;
00308 MessageChunk[] chunkArray;
00309
00310 public int ChunksOutstanding {
00311 get { return ((last - first) + 1) - count; }
00312 }
00313
00314 public bool BelongsHere(MessageChunk chunk) {
00315 return (chunk.offset >= first && chunk.offset <= last);
00316 }
00317
00318 public ChunkedMessageAssembler(MessageChunk initialChunk) {
00319 first = initialChunk.firstInMessage;
00320 last = initialChunk.lastInMessage;
00321 chunkArray = new MessageChunk[(last - first) + 1];
00322 count = 0;
00323 Add(initialChunk);
00324 }
00325
00326 public void Add(MessageChunk chunk) {
00327 if (chunk.firstInMessage == first && chunk.lastInMessage == last) {
00328 if (chunkArray[chunk.offset - first] == null) {
00329 chunkArray[chunk.offset - first] = chunk;
00330 count++;
00331 } else {
00332 throw new ApplicationException("Attempting to assign chunk to somewhere full");
00333 }
00334 } else {
00335 throw new ApplicationException("Attempting to add a chunk to a ChunkedMessageAssember that doesn't match first and last");
00336 }
00337 }
00338
00339 public Message ExtractMessage() {
00340 if (ChunksOutstanding == 0) {
00341 return AssemblyLine.CreateTypedMessageStatic(MessageChunk.GetMessageFromChunks(chunkArray));
00342 } else {
00343 throw new ApplicationException("Still chunks outstanding");
00344 }
00345 }
00346
00347 }
00348
00349 public DateTime timestamp;
00350 private ArrayList chunkedMessageAssemblers;
00351
00352 public AssemblyContainer(DateTime timestamp) {
00353 this.timestamp = timestamp;
00354 chunkedMessageAssemblers = new ArrayList();
00355 }
00356
00357 public void AddChunk(MessageChunk chunk) {
00358 bool foundHome = false;
00359 foreach(ChunkedMessageAssembler assembler in chunkedMessageAssemblers) {
00360 if (assembler.BelongsHere(chunk)) {
00361 assembler.Add(chunk);
00362 }
00363 foundHome = true;
00364 break;
00365 }
00366 if (!foundHome) {
00367 chunkedMessageAssemblers.Add(new ChunkedMessageAssembler(chunk));
00368 }
00369 }
00370
00371 public bool ContainsCompleteMessage() {
00372 bool result = false;
00373 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00374 result = (chunkedAssembler.ChunksOutstanding == 0);
00375 if (result) { break; }
00376 }
00377 return result;
00378 }
00379
00380 public Message ExtractCompleteMessage() {
00381 Message result = null;
00382 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00383 if (chunkedAssembler.ChunksOutstanding == 0) {
00384 result = chunkedAssembler.ExtractMessage();
00385 }
00386 if (result != null) {
00387 chunkedMessageAssemblers.Remove(chunkedAssembler);
00388 break; }
00389 }
00390 return result;
00391 }
00392 }
00393
00394 private ListDictionary assemblyContainers;
00395
00396 public ChunkAssembler() {
00397 assemblyContainers = new ListDictionary();
00398 }
00399
00400
00401 public void Add(Message msg) {
00402 if (msg.Type != MessageType.Chunk) {
00403 throw new ApplicationException("Unable to assemble non-chunked message in message chunk assembly");
00404 } else {
00405 MessageChunk chunk = (MessageChunk)msg;
00406 AssemblyContainer currentContainer;
00407 if (assemblyContainers.Contains(chunk.timestamp)) {
00408 currentContainer = (AssemblyContainer)assemblyContainers[chunk.timestamp];
00409 } else {
00410 currentContainer = new AssemblyContainer(chunk.timestamp);
00411 assemblyContainers.Add(chunk.timestamp, currentContainer);
00412 }
00413 currentContainer.AddChunk(chunk);
00414 }
00415 }
00416
00417 public void Init() {
00418 assemblyContainers.Clear();
00419 }
00420
00421 public bool ContainsCompleteMessage() {
00422 bool result = false;
00423 foreach (AssemblyContainer container in assemblyContainers.Values) {
00424 result = container.ContainsCompleteMessage();
00425 if (result) {break;}
00426 }
00427 return result;
00428 }
00429
00430 public Message ExtractCompleteMessage() {
00431 Message result = null;
00432 foreach (AssemblyContainer container in assemblyContainers.Values) {
00433 result = container.ExtractCompleteMessage();
00434 if (result != null) { break; }
00435 }
00436 return result;
00437 }
00438
00439
00440 }
00441 }
00442
00443