00001 using System;
00002 using System.Net;
00003 using System.Net.Sockets;
00004 using System.Collections;
00005 using System.Threading;
00006
00007 namespace ProtocolStack {
00008
00014 public class SegmentTransmitter {
00015 RemoteHostComms thisRemoteHost;
00016 StackInterface thisStack;
00017 UdpClient myUDP;
00018
00019 Segment workingSegment;
00020 IPAddress currentDestination;
00021 Thread processingThread;
00022
00027 public void Start() {
00028 if (processingThread == null || !processingThread.IsAlive) {
00029 Console.WriteLine("Starting Segment Transmitter...");
00030 try {
00031 StackSettings s = thisStack.Settings;
00032 myUDP = new UdpClient(new IPEndPoint(thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID), s.MyDeviceID == s.SERVER_DEVICE_ID ? s.ServerSendFromPortNumber : s.ClientSendFromPortNumber));
00033 } catch (ApplicationException appEx) {
00034 throw new ApplicationException(String.Format("Unable to create transmission socket", thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID)), appEx);
00035 } catch (SocketException sockEx) {
00036 throw new ApplicationException(String.Format("Unable to create transmission socket for IP Address [{0}]", thisStack.ipMappings.Lookup(thisStack.settings.MyDeviceID)), sockEx);
00037 }
00038 processingThread = new Thread(new ThreadStart(Run));
00039 processingThread.Start();
00040 Console.WriteLine("...Segment Transmitter Started.");
00041 }
00042 }
00043
00048 public void Stop() {
00049 if (processingThread != null && processingThread.IsAlive) {
00050 processingThread.Abort();
00051 processingThread.Join();
00052 myUDP.Close();
00053 }
00054 }
00055
00061 public SegmentTransmitter(RemoteHostComms thisRemoteHost) {
00062 this.thisRemoteHost = thisRemoteHost;
00063 this.thisStack = thisRemoteHost.thisStack;
00064 }
00065
00070 public void Run() {
00071
00072
00073
00074
00075
00076 try {
00077 while (true) {
00078
00079 TransmissionQueueEntry currentQueueEntry;
00080 if (thisRemoteHost.readyToSendUserData) {
00081 currentQueueEntry = thisRemoteHost.TxQueue.DequeueBlocking();
00082 } else {
00083 Console.WriteLine("### Not ready to send ###");
00084 lock(thisRemoteHost.AckManager) {
00085 Console.WriteLine("### Waiting for ACKable to be ready ###");
00086 while (!thisRemoteHost.AckManager.IsAckableWaiting()) {
00087 Monitor.Wait(thisRemoteHost.AckManager);
00088 Console.WriteLine("Woke up");
00089 }
00090 Console.WriteLine("### ACKable ready, creating empty message carrier");
00091 currentQueueEntry = TransmissionQueueEntry.FromMessage(new EmptyMessage(thisRemoteHost.RemoteDeviceID), true)[0];
00092 }
00093 }
00094 BlockForCreditAndSend(currentQueueEntry);
00095 }
00096 } catch (ThreadAbortException) {
00097
00098 }
00099 }
00100
00101 private void BlockForCreditAndSend(TransmissionQueueEntry currentQueueEntry) {
00102 bool originalUnsent = true;
00103 while (originalUnsent) {
00104 if (thisRemoteHost.RateManager.WaitForSend(currentQueueEntry.segmentData.DataLength, 10)) {
00105 SendTQE(currentQueueEntry);
00106 originalUnsent = false;
00107 } else {
00108
00109 EmptyMessage msg = new EmptyMessage();
00110 msg.Destination = currentQueueEntry.destinationDeviceID;
00111 TransmissionQueueEntry[] entryArray = TransmissionQueueEntry.FromMessage(msg, true);
00112 Console.WriteLine(" 10 seconds elapsed without credit to send - sending credit update message to try and free deadlock *");
00113 SendTQE(entryArray[0]);
00114 originalUnsent = true;
00115 }
00116 }
00117 }
00118
00119 private void SendTQE(TransmissionQueueEntry currentQueueEntry) {
00121 workingSegment = new Segment();
00122 uint sequenceOffset = 0;
00123 RateInfo rateInfo = null;
00124
00125 bool isAckable = thisRemoteHost.AckManager.IsAckableWaiting();
00126 AckInfo ackInfo = thisRemoteHost.AckManager.GetNextAck();
00127 workingSegment.Headers = new SegmentHeaders();
00128 NackInfo nackInfo = thisRemoteHost.NackManager.GetNextNack();
00129 uint seqNum = thisRemoteHost.SeqManager.GetNextSequenceNumber();
00130 bool isResynch = thisRemoteHost.SeqManager.IsResynching;
00131 if (isResynch) {
00132 sequenceOffset = thisRemoteHost.SeqManager.GetSequenceOffset(seqNum);
00133 } else {
00134 rateInfo = thisRemoteHost.RateManager.GetRateInfoForTx(isAckable);
00135 }
00136
00137 currentDestination = thisStack.ipMappings.Lookup(currentQueueEntry.destinationDeviceID);
00138
00139
00140 workingSegment.Data = currentQueueEntry.segmentData;
00141
00142
00143
00144 workingSegment.Headers.IsAckable = isAckable;
00145 workingSegment.Headers.IsResynch = isResynch;
00146 workingSegment.Headers.IsFirstInMessage = currentQueueEntry.isMessageStart;
00147 workingSegment.Headers.IsLastInMessage = currentQueueEntry.isMessageEnd;
00148
00149
00150 workingSegment.Headers.SourceDeviceID = thisStack.settings.MyDeviceID;
00151 workingSegment.Headers.SequenceNumber = seqNum;
00152
00153
00154 if (ackInfo == null) {
00155 workingSegment.Headers.ClearAck();
00156 } else {
00157 workingSegment.Headers.SetAck(ackInfo);
00158 }
00159 if (nackInfo == null) {
00160 workingSegment.Headers.ClearNACKStatus();
00161 } else {
00162 workingSegment.Headers.SetNACKStatus(nackInfo);
00163 Console.WriteLine("Adding NACK on outgoing segment #{0} for retrans of seg #{1}", workingSegment.Headers.SequenceNumber, nackInfo.SequenceNumber);
00164 }
00165
00166
00167 if (isResynch) {
00168 workingSegment.Headers.SynchOrRate = sequenceOffset;
00169 } else {
00170 workingSegment.Headers.SynchOrRate = rateInfo.data;
00171 }
00172
00173
00174 workingSegment.Headers.TimeSent = DateTime.Now.ToUniversalTime();
00175 tempSend(workingSegment, currentDestination);
00176 thisRemoteHost.RetransmissionBuffer.PutSegment(workingSegment);
00177 thisRemoteHost.RateManager.DecreaseSendCredit(workingSegment.Data.DataLength);
00178
00179
00180 if (isAckable) {
00181 thisRemoteHost.AckManager.SentAckableSegment(workingSegment.Headers.SequenceNumber);
00182 }
00183 }
00184
00185 public void tempSend(Segment targetSegment, IPAddress destination) {
00186 byte[] datagram = targetSegment.ToByteArray();
00187 StackSettings s = thisStack.Settings;
00188 myUDP.Send(datagram, datagram.Length, new IPEndPoint(destination, (s.MyDeviceID == 0) ? s.ClientReceivePortNumber : s.ServerReceivePortNumber));
00189 Console.WriteLine(" > Sent #{0}, Credit Onboard: {1}, ACKable: {2}, Size: {3}", targetSegment.Headers.SequenceNumber, targetSegment.Headers.SynchOrRate ,targetSegment.Headers.IsAckable, targetSegment.Data.DataLength);
00190
00191 }
00192 }
00193
00194
00195
00196 public class SegmentReceiver {
00197
00198
00199
00200
00202 Thread processingThread;
00203 StackInterface thisStack;
00204 UdpClient udpListener;
00205
00207 public ReceiveQueue destinationQueue;
00208
00210 public SegmentReceiver(StackInterface thisStack) {
00211 this.thisStack = thisStack;
00212 }
00213
00214 public void Start() {
00215 Console.WriteLine("Starting Segment Receiver...");
00216 if (processingThread == null || !processingThread.IsAlive) {
00217 processingThread = new Thread(new ThreadStart(Listen));
00218 processingThread.Start();
00219 Console.WriteLine("... Segment Receiver Started.");
00220 } else {
00221 Console.WriteLine("... Segment Receiver Already Started!");
00222 }
00223
00224 }
00225
00226 public void Stop() {
00227 Console.WriteLine("Stopping Segment Receiver...");
00228 if (processingThread != null && processingThread.IsAlive) {
00229 processingThread.Abort();
00230 if (udpListener != null) { udpListener.Close(); }
00231 processingThread.Join();
00232 Console.WriteLine("... Segment Receiver Stopped.");
00233 } else {
00234 Console.WriteLine("... Segment Receiver Already Stopped!");
00235 }
00236 }
00237
00239 private void Listen() {
00240 IPEndPoint currentEndPoint;
00241 Segment receivedSegment;
00242 int localPortNumber = -1;
00243 if (thisStack.settings.MyDeviceID == thisStack.Settings.SERVER_DEVICE_ID) {
00244 localPortNumber = thisStack.Settings.ServerReceivePortNumber;
00245 } else {
00246 localPortNumber = thisStack.Settings.ClientReceivePortNumber;
00247 }
00248
00249 udpListener = new UdpClient(new IPEndPoint(IPAddress.Any, localPortNumber));
00250 try {
00251 while(true) {
00252 try {
00253
00254 currentEndPoint = new IPEndPoint(IPAddress.Any, localPortNumber);
00255 receivedSegment = new Segment(udpListener.Receive(ref currentEndPoint));
00256 receivedSegment.Headers.TimeReceived = DateTime.Now.ToUniversalTime();
00257 Console.WriteLine(" < Got #{0}", receivedSegment.Headers.SequenceNumber);
00258 thisStack.GetRemoteHostByID(receivedSegment.Headers.SourceDeviceID).AcceptIncomingSegment(receivedSegment);
00259 } catch (ThreadAbortException) {
00260 } catch (Exception ex) {
00261 Console.Error.WriteLine("\n *** Error in Protocol Stack ***\nUnable to correctly receive segment.\nSkipping over and continuting. Details:\n\n(" + ex + ")");
00262 }
00263 }
00264 } catch (ThreadAbortException) {
00265 Console.WriteLine("Segment Receive Listen Thread Aborted");
00266 } finally {
00267 if (udpListener != null) { udpListener.Close(); }
00268 }
00269 }
00270 }
00271
00272
00276 public class ConnectionReset : ApplicationException {}
00277
00281 public delegate void ConnectionResetDelegate();
00282
00283 }