Fix delete oldest logic.

This commit is contained in:
Adam Ierymenko 2015-11-10 09:46:14 -08:00
parent b171c9a0db
commit 4328c6c3bc

View file

@ -112,33 +112,31 @@ class _ClusterSendQueue
{ {
public: public:
_ClusterSendQueue() : _ClusterSendQueue() :
_poolCount(0) _poolCount(0) {}
{
}
~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed ~_ClusterSendQueue() {} // memory is automatically freed when _chunks is destroyed
inline void enqueue(uint64_t ts,const Address &from,const Address &to,const void *data,unsigned int len,bool unite) inline void enqueue(uint64_t now,const Address &from,const Address &to,const void *data,unsigned int len,bool unite)
{ {
if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX) if (len > ZT_CLUSTER_SEND_QUEUE_DATA_MAX)
return; return;
Mutex::Lock _l(_lock); Mutex::Lock _l(_lock);
// Delete oldest queue entry if sender has too many queued packets // Delete oldest queue entry for this sender if this enqueue() would take them over the per-sender limit
{ {
std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0))); std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator qi(_bySrc.lower_bound(std::pair<Address,_ClusterSendQueueEntry *>(from,(_ClusterSendQueueEntry *)0)));
std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(_bySrc.end()); std::set< std::pair<Address,_ClusterSendQueueEntry *> >::iterator oldest(qi);
unsigned long countForSender = 0; unsigned long countForSender = 0;
while ((qi != _bySrc.end())&&(qi->first == from)) { while ((qi != _bySrc.end())&&(qi->first == from)) {
if (++countForSender > ZT_CLUSTER_MAX_QUEUE_PER_SENDER) { if (qi->second->timestamp < oldest->second->timestamp)
oldest = qi;
++countForSender;
++qi;
}
if (countForSender >= ZT_CLUSTER_MAX_QUEUE_PER_SENDER) {
_byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second)); _byDest.erase(std::pair<Address,_ClusterSendQueueEntry *>(oldest->second->toPeerAddress,oldest->second));
_pool[_poolCount++] = oldest->second; _pool[_poolCount++] = oldest->second;
_bySrc.erase(oldest); _bySrc.erase(oldest);
break;
} else if (oldest == _bySrc.end())
oldest = qi;
++qi;
} }
} }
@ -154,7 +152,7 @@ public:
_pool[_poolCount++] = &(_chunks.back().data[i]); _pool[_poolCount++] = &(_chunks.back().data[i]);
} }
e->timestamp = ts; e->timestamp = now;
e->fromPeerAddress = from; e->fromPeerAddress = from;
e->toPeerAddress = to; e->toPeerAddress = to;
memcpy(e->data,data,len); memcpy(e->data,data,len);