00001 using System;
00002 using System.Collections;
00003 using System.Collections.Specialized;
00004 using System.Threading;
00005 using GPRSWeb.SmartDeviceClient.Common;
00006
00007 namespace GPRSWeb.SmartDeviceClient.SmartProtocolStack.RemoteHost
00008 {
00009
00014 public class MessageAssembly
00015 {
00017 AssemblyLine assemblyLine;
00018 protected MessageQueue messageQueueOutput;
00019 protected StackInterface thisStack;
00020 ChunkAssembler chunkAssembler;
00021 Thread processingThread;
00022
00024
00025
00026
00027
00028
00029 public MessageAssembly(RemoteHostComms remoteHost) {
00030 thisStack = remoteHost.thisStack;
00031 assemblyLine = new AssemblyLine();
00032 chunkAssembler = new ChunkAssembler();
00033 messageQueueOutput = thisStack.ReceivedMessages;
00034 }
00035
00037 public void Process(Segment s) {
00038
00039 lock(assemblyLine.SyncRoot) {
00040 assemblyLine.Add(s);
00041 if (s.IsRetransmission) {
00042 Console.WriteLine("Just added retransmissoin {0} to assembly line", s.Headers.SequenceNumber);
00043 }
00044 Monitor.PulseAll(assemblyLine.SyncRoot);
00045 }
00046 }
00047
00048 public void Start() {
00049 if (processingThread == null || !processingThread.IsAlive) {
00050 processingThread = new Thread(new ThreadStart(Run));
00051 assemblyLine.Clear();
00052 chunkAssembler.Init();
00053 processingThread.Start();
00054 } else {
00055 Console.Error.WriteLine("*** Error starting Message Assembly thread: Thread already started! ***");
00056 }
00057 }
00058
00059 public void Stop() {
00060 if (processingThread != null && processingThread.IsAlive) {
00061 processingThread.Abort();
00062 processingThread.Join();
00063 }
00064 }
00065
00066 void Run() {
00067 Console.WriteLine("Started Assembly line assembler");
00068 Message msg = null;
00069 try {
00070 while(true) {
00071
00072 msg = null;
00073 lock (assemblyLine.SyncRoot) {
00074 while (!assemblyLine.HoldsCompleteMessage) {
00075 Monitor.Wait(assemblyLine.SyncRoot);
00076 }
00077
00078 msg = assemblyLine.ExtractCompleteMessage();
00079 Monitor.PulseAll(assemblyLine.SyncRoot);
00080 }
00081
00082
00083 if (msg.Type == MessageType.Retransmission) {
00084 RetransmissionMessage retrans = (RetransmissionMessage)msg;
00085 Segment retransSegment = retrans.RawSegment;
00086 retransSegment.IsRetransmission = true;
00087 Console.WriteLine("Just got retransmission of {0}. Adding to assembly. Retrans flag {1}", retrans.RawSegment.Headers.SequenceNumber, retrans.RawSegment.IsRetransmission);
00088 thisStack.GetRemoteHostByID(retransSegment.Headers.SourceDeviceID).AcceptIncomingSegment(retransSegment);
00089 } else if (msg.Type == MessageType.Chunk) {
00090 Console.WriteLine("> Got Message Chunk");
00091 chunkAssembler.Add(msg);
00092 while (chunkAssembler.ContainsCompleteMessage()) {
00093 Message reChunkedMsg = chunkAssembler.ExtractCompleteMessage();
00094 if (reChunkedMsg != null) {
00095 Console.WriteLine("> Got a re-constituted chunked message");
00096 messageQueueOutput.EnqueueBlocking(reChunkedMsg);
00097 } else {
00098 throw new ApplicationException("Extracted a Null chunked message");
00099 }
00100 }
00101 } else {
00102 messageQueueOutput.EnqueueBlocking(msg);
00103 }
00104 }
00105 } catch (ThreadAbortException) {
00106 Console.WriteLine("Shutting down message assembly");
00107 }
00108
00109 }
00110 }
00111
00112 class SortedList : ListDictionary {
00113 public new void Add(object key, object value) {
00114 base.Add(key, value);
00115
00116 }
00117
00118 public object GetByIndex(int i) {
00119 return base[i];
00120 }
00121
00122 public new void Remove(object key) {
00123 }
00124 }
00125
00126
00131 class AssemblyLine : SortedList {
00132
00133
00135
00136
00137
00138 private int startCount;
00142 private int finishCount;
00143
00145
00146
00147
00148 public AssemblyLine() : base() {
00149 startCount = 0;
00150 finishCount = 0;
00151 }
00152
00155
00156
00157
00158
00159 public void Add(Segment seg) {
00160 this.Add(seg.Headers.SequenceNumber, seg);
00161 if (seg.Headers.IsFirstInMessage) { startCount++; }
00162 if (seg.Headers.IsLastInMessage) { finishCount++; }
00163
00164 }
00165
00166
00171 public bool HoldsCompleteMessage {
00172 get {
00173 int i, j = -1;
00174 bool result = false;
00175 if (startCount > 0 && finishCount > 0) {
00176
00177
00178 this.IdentifyCompleteMessage(out i, out j);
00179 if (i == -1 || j == -1) { result = false; } else { result = true; }
00180 }
00181 return result;
00182 }
00183 }
00184
00189 public Message ExtractCompleteMessage() {
00190 Message result;
00191 int startIndex, finishIndex;
00192 SegmentData[] segmentData;
00193 uint[] seqNumbers;
00194
00195 IdentifyCompleteMessage(out startIndex, out finishIndex);
00196 segmentData = new SegmentData[finishIndex - startIndex + 1];
00197 seqNumbers = new uint[finishIndex - startIndex + 1];
00198 for (int i = startIndex; i <= finishIndex; i++) {
00199 segmentData[i-startIndex] = ((Segment)this.GetByIndex(i)).Data;
00200 seqNumbers[i-startIndex] = ((Segment)this.GetByIndex(i)).Headers.SequenceNumber;
00201 }
00202 foreach(uint seqNumber in seqNumbers) {
00203 Remove(seqNumber);
00204 }
00205 result = CreateTypedMessage(segmentData);
00206 startCount--;
00207 finishCount--;
00208 return result;
00209 }
00210
00211 public void PrintContents() {
00212 Console.Write("StartCount: {0}, FinishCouunt: {1}. Contents:", startCount, finishCount);
00213 for (int i = 0; i < this.Count; i++) {
00214 Segment s = (Segment)this.GetByIndex(i);
00215 Console.Write("[{0}] {1} {2}{3},", i, s.Headers.SequenceNumber, s.Headers.IsFirstInMessage ? "F" : "_", s.Headers.IsLastInMessage ? "L" : "_");
00216 }
00217 Console.WriteLine();
00218 }
00224 private void IdentifyCompleteMessage(out int startIndex, out int finishIndex) {
00225
00226 startIndex = -1;
00227 finishIndex = -1;
00228 bool foundIt = false;
00229 int index = 0;
00230 int potentialStartIndex;
00231 uint lastSeqSeen;
00232 Segment currentSegment;
00233
00234
00235
00236
00237
00238 while (!foundIt && index < Count) {
00239 currentSegment = (Segment)this.GetByIndex(index);
00240 if (!currentSegment.Headers.IsFirstInMessage) {
00241 index++;
00242 } else {
00243 potentialStartIndex = index;
00244
00245 if (!currentSegment.Headers.IsLastInMessage) {
00246
00247 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00248 index++;
00249 if (index >= Count) { break; }
00250 currentSegment = (Segment)this.GetByIndex(index);
00251 while ((!currentSegment.Headers.IsLastInMessage) && (currentSegment.Headers.SequenceNumber == (lastSeqSeen +1))) {
00252
00253 lastSeqSeen = currentSegment.Headers.SequenceNumber;
00254 index++;
00255 if (index >= Count) { break; }
00256 currentSegment = (Segment)this.GetByIndex(index);
00257
00258 }
00259
00260
00261
00262 if (currentSegment.Headers.SequenceNumber == (lastSeqSeen + 1)) {
00263
00264 startIndex = potentialStartIndex;
00265 finishIndex = index;
00266 foundIt = true;
00267 } else {
00268 Console.WriteLine("Gap In sequence detected");
00269
00270
00271 }
00272 } else {
00273 startIndex = potentialStartIndex;
00274 finishIndex = index;
00275 foundIt = true;
00276 }
00277 }
00278 }
00279 }
00280
00281 public Message CreateTypedMessage(SegmentData[] src) {
00282 return CreateTypedMessageStatic(src);
00283 }
00284
00285 public static Message CreateTypedMessageStatic(SegmentData[] src) {
00286 byte[] data = SegmentData.ArrayToByteArray(src);
00287 MessageType msgType = (MessageType)src[0].Data[0];
00288 Message result;
00289 switch(msgType) {
00290 case MessageType.Noop: { result = new EmptyMessage(); break; }
00291 case MessageType.Retransmission: { result = new RetransmissionMessage(data); break; }
00292 case MessageType.CacheIndex: { result = new CacheIndexMessage(data); break; }
00293 case MessageType.NoChange: { result = new NoChangeMessage(data); break; }
00294 case MessageType.HTTPRequest: { result = new HTTPRequestMessage(data); break; }
00295 case MessageType.HTTPResponse: { result = new HTTPResponseMessage(data); break; }
00296 case MessageType.CacheUpdateHTTPResponse: { result = new CacheUpdateHTTPResponseMsg(data); break; }
00297 case MessageType.CacheUpdateNoChange: {result = new CacheUpdateNoChangeMsg(data); break; }
00298 case MessageType.Chunk: { result = new MessageChunk(data); break; }
00299 case MessageType.CacheIndexRequest : { result = new CacheIndexRequestMessage(data); break; }
00300 default: { throw new NotSupportedException(String.Format("Message type of {0} is not recognised", msgType)); }
00301 }
00302 return result;
00303 }
00304 }
00305
00306 public class ChunkAssembler {
00307
00311 class AssemblyContainer {
00312
00316 class ChunkedMessageAssembler {
00317 private int first;
00318 private int last;
00319 private int count;
00320 MessageChunk[] chunkArray;
00321
00322 public int ChunksOutstanding {
00323 get { return ((last - first) + 1) - count; }
00324 }
00325
00326 public bool BelongsHere(MessageChunk chunk) {
00327 return (chunk.offset >= first && chunk.offset <= last);
00328 }
00329
00330 public ChunkedMessageAssembler(MessageChunk initialChunk) {
00331 first = initialChunk.firstInMessage;
00332 last = initialChunk.lastInMessage;
00333 chunkArray = new MessageChunk[(last - first) + 1];
00334 count = 0;
00335 Add(initialChunk);
00336 }
00337
00338 public void Add(MessageChunk chunk) {
00339 if (chunk.firstInMessage == first && chunk.lastInMessage == last) {
00340 if (chunkArray[chunk.offset - first] == null) {
00341 chunkArray[chunk.offset - first] = chunk;
00342 count++;
00343 } else {
00344 throw new ApplicationException("Attempting to assign chunk to somewhere full");
00345 }
00346 } else {
00347 throw new ApplicationException("Attempting to add a chunk to a ChunkedMessageAssember that doesn't match first and last");
00348 }
00349 }
00350
00351 public Message ExtractMessage() {
00352 if (ChunksOutstanding == 0) {
00353 return AssemblyLine.CreateTypedMessageStatic(MessageChunk.GetMessageFromChunks(chunkArray));
00354 } else {
00355 throw new ApplicationException("Still chunks outstanding");
00356 }
00357 }
00358
00359 }
00360
00361 public DateTime timestamp;
00362 private ArrayList chunkedMessageAssemblers;
00363
00364 public AssemblyContainer(DateTime timestamp) {
00365 this.timestamp = timestamp;
00366 chunkedMessageAssemblers = new ArrayList();
00367 }
00368
00369 public void AddChunk(MessageChunk chunk) {
00370 bool foundHome = false;
00371 foreach(ChunkedMessageAssembler assembler in chunkedMessageAssemblers) {
00372 if (assembler.BelongsHere(chunk)) {
00373 assembler.Add(chunk);
00374 }
00375 foundHome = true;
00376 break;
00377 }
00378 if (!foundHome) {
00379 chunkedMessageAssemblers.Add(new ChunkedMessageAssembler(chunk));
00380 }
00381 }
00382
00383 public bool ContainsCompleteMessage() {
00384 bool result = false;
00385 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00386 result = (chunkedAssembler.ChunksOutstanding == 0);
00387 if (result) { break; }
00388 }
00389 return result;
00390 }
00391
00392 public Message ExtractCompleteMessage() {
00393 Message result = null;
00394 foreach (ChunkedMessageAssembler chunkedAssembler in chunkedMessageAssemblers) {
00395 if (chunkedAssembler.ChunksOutstanding == 0) {
00396 result = chunkedAssembler.ExtractMessage();
00397 }
00398 if (result != null) {
00399 chunkedMessageAssemblers.Remove(chunkedAssembler);
00400 break; }
00401 }
00402 return result;
00403 }
00404 }
00405
00406 private ListDictionary assemblyContainers;
00407
00408 public ChunkAssembler() {
00409 assemblyContainers = new ListDictionary();
00410 }
00411
00412
00413 public void Add(Message msg) {
00414 if (msg.Type != MessageType.Chunk) {
00415 throw new ApplicationException("Unable to assemble non-chunked message in message chunk assembly");
00416 } else {
00417 MessageChunk chunk = (MessageChunk)msg;
00418 AssemblyContainer currentContainer;
00419 if (assemblyContainers.Contains(chunk.timestamp)) {
00420 currentContainer = (AssemblyContainer)assemblyContainers[chunk.timestamp];
00421 } else {
00422 currentContainer = new AssemblyContainer(chunk.timestamp);
00423 assemblyContainers.Add(chunk.timestamp, currentContainer);
00424 }
00425 currentContainer.AddChunk(chunk);
00426 }
00427 }
00428
00429 public void Init() {
00430 assemblyContainers.Clear();
00431 }
00432
00433 public bool ContainsCompleteMessage() {
00434 bool result = false;
00435 foreach (AssemblyContainer container in assemblyContainers.Values) {
00436 result = container.ContainsCompleteMessage();
00437 if (result) {break;}
00438 }
00439 return result;
00440 }
00441
00442 public Message ExtractCompleteMessage() {
00443 Message result = null;
00444 foreach (AssemblyContainer container in assemblyContainers.Values) {
00445 result = container.ExtractCompleteMessage();
00446 if (result != null) { break; }
00447 }
00448 return result;
00449 }
00450
00451
00452 }
00453 }
00454
00455