00001 using System;
00002 using System.Collections;
00003 using System.Threading;
00004 using Common;
00005
00006 namespace ProtocolStack {
00011 public class TransmissionQueue : Queue {
00012 private Queue lowPriorityQueue;
00013
00014 public TransmissionQueue() : base() {
00015 lowPriorityQueue = new Queue();
00016 }
00017
00018 public void ResetConnection() {
00019 lock(this.SyncRoot) {
00020 Clear();
00021 }
00022 lock (lowPriorityQueue.SyncRoot) {
00023 lowPriorityQueue.Clear();
00024 }
00025 }
00026
00027 public void Enqueue(TransmissionQueueEntry tqe, bool isLowPriority) {
00028 lock (this.SyncRoot) {
00029 if (!isLowPriority) {
00030 base.Enqueue(tqe);
00031 } else {
00032 lock(lowPriorityQueue.SyncRoot) {
00033 lowPriorityQueue.Enqueue(tqe);
00034 Monitor.PulseAll(lowPriorityQueue.SyncRoot);
00035 }
00036 }
00037 Monitor.PulseAll(this.SyncRoot);
00038 }
00039 }
00040
00041 public void Enqueue(TransmissionQueueEntry[] tqes, bool isLowPriority) {
00042 lock(this.SyncRoot) {
00043 if (!isLowPriority) {
00044 foreach (TransmissionQueueEntry tqe in tqes) {
00045 base.Enqueue(tqe);
00046 }
00047 } else {
00048 lock(lowPriorityQueue.SyncRoot) {
00049 foreach (TransmissionQueueEntry tqe in tqes) {
00050 lowPriorityQueue.Enqueue(tqe);
00051 }
00052 Monitor.PulseAll(lowPriorityQueue.SyncRoot);
00053 }
00054 }
00055 Monitor.PulseAll(this.SyncRoot);
00056 }
00057 }
00058
00059 public void EnqueueUnprotected(TransmissionQueueEntry[] tqes, bool isLowPriority) {
00060 Queue target = (isLowPriority ? lowPriorityQueue : this);
00061 foreach (TransmissionQueueEntry tqe in tqes) {
00062 target.Enqueue(tqe);
00063 }
00064 }
00065
00070 public TransmissionQueueEntry DequeueBlocking() {
00071 TransmissionQueueEntry result;
00072 lock(this.SyncRoot) {
00073 while (Count == 0 && lowPriorityQueue.Count == 0) {
00074 Monitor.Wait(this.SyncRoot);
00075 }
00076 if (Count != 0) {
00077 result = (TransmissionQueueEntry)base.Dequeue();
00078
00079 } else {
00080 lock(lowPriorityQueue.SyncRoot) {
00081 result = (TransmissionQueueEntry)lowPriorityQueue.Dequeue();
00082
00083 Monitor.PulseAll(lowPriorityQueue.SyncRoot);
00084 }
00085 }
00086 Monitor.PulseAll(this.SyncRoot);
00087 }
00088 return result;
00089 }
00090
00091
00096 public bool IsEmpty {
00097 get {
00098 return (Count == 0);
00099 }
00100 }
00101 }
00102
00103
00104
00109 public class TransmissionQueueEntry {
00110 public SegmentData segmentData;
00111 public ushort destinationDeviceID;
00112 public bool isMessageStart;
00113 public bool isMessageEnd;
00114
00118 public TransmissionQueueEntry() {}
00119
00120 public TransmissionQueueEntry(SegmentData segmentData, ushort destinationDeviceID, bool isMessageStart, bool isMessageEnd) {
00121 this.segmentData = segmentData;
00122 this.destinationDeviceID = destinationDeviceID;
00123 this.isMessageStart = isMessageStart;
00124 this.isMessageEnd = isMessageEnd;
00125 }
00126
00127 public static TransmissionQueueEntry[] FromMessage(Message msg) {
00128 return TransmissionQueueEntry.FromMessage(msg, false);
00129 }
00130
00131 public static TransmissionQueueEntry[] FromMessage(Message msg, bool singleSegmentOnly) {
00132
00133
00134
00135
00136
00137 SegmentData[] result = SegmentData.FromMessage(msg);
00138 if (singleSegmentOnly && (result.Length != 1)) {
00139 throw new ApplicationException("Single segment only was specified, and we have " + result.Length + " segments");
00140 }
00141 TransmissionQueueEntry[] tqes = new TransmissionQueueEntry[result.Length];
00142 for(int i = 0; i < result.Length; i++) {
00143 tqes[i] = new TransmissionQueueEntry(result[i], msg.Destination, i == 0, i == (result.Length -1));
00144 }
00145 return tqes;
00146 }
00147 }
00148 }