00001 using System;
00002 using System.Threading;
00003 using System.Collections;
00004
00005 namespace ProtocolStack
00006 {
00012 public class ReceiveQueue : Queue
00013 {
00015 RemoteHostComms thisRemoteHost;
00016 Segment workingSegment;
00017 Thread processingThread;
00018
00022 public ReceiveQueue(RemoteHostComms thisRemoteHost) : base() {
00023 this.thisRemoteHost = thisRemoteHost;
00024 }
00025
00026 public void EnqueueBlocking(Segment seg) {
00027 lock(this) {
00028 Enqueue(seg);
00029 Monitor.Pulse(this);
00030 }
00031 }
00032
00033 public Segment DequeueBlocking() {
00034 Segment result;
00035 lock(this) {
00036 while (Count == 0) {
00037 Monitor.Wait(this);
00038 }
00039 result = (Segment)Dequeue();
00040 Monitor.Pulse(this);
00041 }
00042 return result;
00043 }
00044
00045 public void Start() {
00046 if (processingThread == null || !processingThread.IsAlive) {
00047 processingThread = new Thread(new ThreadStart(ProcessQueue));
00048 processingThread.Start();
00049 }
00050 }
00051
00052 public void Stop() {
00053 if (processingThread.IsAlive) {
00054 processingThread.Abort();
00055 processingThread.Join();
00056 }
00057 }
00058
00059
00063 void ProcessQueue() {
00064 while(true) {
00065 workingSegment = this.DequeueBlocking();
00066
00067 if (thisRemoteHost.SeqManager.IsProcessable(workingSegment.Headers)) {
00068 ProcessSegmentHeaders(workingSegment.Headers, workingSegment.Data.DataLength);
00069
00070 thisRemoteHost.MessageAssembler.Process(workingSegment);
00071 }
00072 }
00073 }
00074
00080 private void ProcessSegmentHeaders(SegmentHeaders myHeaders, uint dataLength) {
00081
00082 thisRemoteHost.SeqManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00083
00084 thisRemoteHost.AckManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00085
00086 thisRemoteHost.NackManager.ProcessReceivedSegmentHeaders(ref myHeaders);
00087
00088 thisRemoteHost.RateManager.ProcessReceivedSegmentHeaders(ref myHeaders, dataLength);
00089 }
00090 }
00091
00092
00093
00094 }