implement streaming limiting (initial)

This commit is contained in:
Jeff Becker 2016-07-28 11:16:29 -04:00
parent 17bfa35f77
commit aa3a93b6a0
No known key found for this signature in database
GPG key ID: AB950234D6EA286B
5 changed files with 129 additions and 10 deletions

View file

@ -797,7 +797,10 @@ namespace stream
StreamingDestination::StreamingDestination (std::shared_ptr<i2p::client::ClientDestination> owner, uint16_t localPort, bool gzip):
m_Owner (owner), m_LocalPort (localPort), m_Gzip (gzip),
m_PendingIncomingTimer (m_Owner->GetService ())
m_PendingIncomingTimer (m_Owner->GetService ()),
m_ConnTrackTimer(m_Owner->GetService()),
m_ConnsPerMinute(DEFAULT_MAX_CONNS_PER_MIN),
m_LastBanClear(i2p::util::GetMillisecondsSinceEpoch())
{
}
@ -812,17 +815,23 @@ namespace stream
}
void StreamingDestination::Start ()
{
{
ScheduleConnTrack();
}
void StreamingDestination::Stop ()
{
ResetAcceptor ();
m_PendingIncomingTimer.cancel ();
m_ConnTrackTimer.cancel();
{
std::unique_lock<std::mutex> l(m_StreamsMutex);
m_Streams.clear ();
}
}
{
std::unique_lock<std::mutex> l(m_ConnsMutex);
m_Conns.clear ();
}
}
void StreamingDestination::HandleNextPacket (Packet * packet)
@ -856,7 +865,22 @@ namespace stream
incomingStream->HandleNextPacket (it1);
m_SavedPackets.erase (it);
}
}
}
auto ident = incomingStream->GetRemoteIdentity();
if(ident)
{
auto ih = ident->GetIdentHash();
if(DropNewStream(ih))
{
// drop
LogPrint(eLogWarning, "Streaming: Too many inbound streams from ", ih.ToBase32());
DeleteStream(incomingStream);
incomingStream = nullptr;
delete packet;
return;
}
} else
LogPrint(eLogWarning, "Streaming: Inbound stream has no identity");
// accept
if (m_Acceptor != nullptr)
m_Acceptor (incomingStream);
@ -1009,6 +1033,62 @@ namespace stream
else
msg = nullptr;
return msg;
}
}
void StreamingDestination::SetMaxConnsPerMinute(const uint32_t conns)
{
m_ConnsPerMinute = conns;
}
bool StreamingDestination::DropNewStream(const i2p::data::IdentHash & ih)
{
std::lock_guard<std::mutex> lock(m_ConnsMutex);
if (m_Banned.size() > MAX_BANNED_CONNS) return true; // overload
auto end = m_Banned.end();
if ( std::find(m_Banned.begin(), end, ih) != end) return true; // already banned
auto itr = m_Conns.find(ih);
if (itr == m_Conns.end())
m_Conns[ih] = 0;
m_Conns[ih] = m_Conns[ih] + 1;
bool ban = m_Conns[ih] <= m_ConnsPerMinute;
if (ban)
{
m_Banned.push_back(ih);
m_Conns.erase(ih);
}
return ban;
}
void StreamingDestination::HandleConnTrack(const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
{ // acquire lock
std::lock_guard<std::mutex> lock(m_ConnsMutex);
// clear conn tracking
m_Conns.clear();
// check for ban clear
auto ts = i2p::util::GetMillisecondsSinceEpoch();
if (ts - m_LastBanClear >= DEFAULT_BAN_INTERVAL)
{
// clear bans
m_Banned.clear();
m_LastBanClear = ts;
}
}
// reschedule timer
ScheduleConnTrack();
}
}
void StreamingDestination::ScheduleConnTrack()
{
m_ConnTrackTimer.expires_from_now (boost::posix_time::seconds(60));
m_ConnTrackTimer.async_wait (
std::bind (&StreamingDestination::HandleConnTrack,
shared_from_this (), std::placeholders::_1));
}
}
}