summaryrefslogtreecommitdiffstats
path: root/src/OSSupport/SocketThreads.cpp
diff options
context:
space:
mode:
authorandrew <xdotftw@gmail.com>2014-01-21 19:40:41 +0100
committerandrew <xdotftw@gmail.com>2014-01-21 19:40:41 +0100
commit9bd8f74b598866539e3e5133188326bd7d14a827 (patch)
tree15f411bd03371e555250b898f3883c0fa20c2b8b /src/OSSupport/SocketThreads.cpp
parentScoreboard SendTo() (diff)
parentFix a crash but somewhere... (diff)
downloadcuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar.gz
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar.bz2
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar.lz
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar.xz
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.tar.zst
cuberite-9bd8f74b598866539e3e5133188326bd7d14a827.zip
Diffstat (limited to '')
-rw-r--r--src/OSSupport/SocketThreads.cpp283
1 files changed, 113 insertions, 170 deletions
diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp
index 3e505616c..b8069cf00 100644
--- a/src/OSSupport/SocketThreads.cpp
+++ b/src/OSSupport/SocketThreads.cpp
@@ -71,29 +71,6 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client)
-/*
-void cSocketThreads::RemoveClient(const cSocket * a_Socket)
-{
- // Remove the socket (and associated client) from processing
-
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->RemoveSocket(a_Socket))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- // Cannot assert here, this may actually happen legally, since cClientHandle has to clean up the socket and it may have already closed in the meantime
- // ASSERT(!"Removing an unknown socket");
-}
-*/
-
-
-
-
-
void cSocketThreads::RemoveClient(const cCallback * a_Client)
{
// Remove the associated socket and the client from processing
@@ -155,47 +132,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data)
-/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made
-void cSocketThreads::StopReading(const cCallback * a_Client)
-{
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->StopReading(a_Client))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- // Cannot assert, this normally happens if the socket is closed before the client deinitializes
- // ASSERT(!"Stopping reading on an unknown client");
-}
-
-
-
-
-
-/// Queues the socket for closing, as soon as its outgoing data is sent
-void cSocketThreads::QueueClose(const cCallback * a_Client)
-{
- LOGD("QueueClose(client %p)", a_Client);
-
- cCSLock Lock(m_CS);
- for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr)
- {
- if ((*itr)->QueueClose(a_Client))
- {
- return;
- }
- } // for itr - m_Threads[]
-
- ASSERT(!"Queueing close of an unknown client");
-}
-
-
-
-
-
////////////////////////////////////////////////////////////////////////////////
// cSocketThreads::cSocketThread:
@@ -233,13 +169,13 @@ cSocketThreads::cSocketThread::~cSocketThread()
void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client)
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding
m_Slots[m_NumSlots].m_Client = a_Client;
m_Slots[m_NumSlots].m_Socket = a_Socket;
m_Slots[m_NumSlots].m_Outgoing.clear();
- m_Slots[m_NumSlots].m_ShouldClose = false;
- m_Slots[m_NumSlots].m_ShouldCallClient = true;
+ m_Slots[m_NumSlots].m_State = sSlot::ssNormal;
m_NumSlots++;
// Notify the thread of the change:
@@ -253,7 +189,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac
bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
{
- // Returns true if removed, false if not found
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
if (m_NumSlots == 0)
{
@@ -267,36 +203,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client)
continue;
}
- // Found, remove it:
- m_Slots[i] = m_Slots[--m_NumSlots];
-
- // Notify the thread of the change:
- ASSERT(m_ControlSocket2.IsValid());
- m_ControlSocket2.Send("r", 1);
- return true;
- } // for i - m_Slots[]
-
- // Not found
- return false;
-}
-
-
-
-
-
-bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
-{
- // Returns true if removed, false if not found
-
- for (int i = m_NumSlots - 1; i >= 0 ; --i)
- {
- if (m_Slots[i].m_Socket != *a_Socket)
+ // Found the slot:
+ if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
{
- continue;
+ // The remote has already closed the socket, remove the slot altogether:
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ }
+ else
+ {
+ // Query and queue the last batch of outgoing data:
+ m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
+ if (m_Slots[i].m_Outgoing.empty())
+ {
+ // No more outgoing data, shut the socket down immediately:
+ m_Slots[i].m_Socket.ShutdownReadWrite();
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ }
+ else
+ {
+ // More data to send, shut down reading and wait for the rest to get sent:
+ m_Slots[i].m_State = sSlot::ssWritingRestOut;
+ }
+ m_Slots[i].m_Client = NULL;
}
-
- // Found, remove it:
- m_Slots[i] = m_Slots[--m_NumSlots];
// Notify the thread of the change:
ASSERT(m_ControlSocket2.IsValid());
@@ -314,6 +243,8 @@ bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket)
bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
+
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@@ -346,6 +277,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const
bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
{
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
+
if (HasClient(a_Client))
{
// Notify the thread that there's another packet in the queue:
@@ -362,7 +295,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client)
bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data)
{
- // Returns true if socket handled by this thread
+ ASSERT(m_Parent->m_CS.IsLockedByCurrentThread());
for (int i = m_NumSlots - 1; i >= 0; --i)
{
if (m_Slots[i].m_Client == a_Client)
@@ -383,47 +316,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr
-bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client)
-{
- // Returns true if client handled by this thread
- for (int i = m_NumSlots - 1; i >= 0; --i)
- {
- if (m_Slots[i].m_Client == a_Client)
- {
- m_Slots[i].m_ShouldCallClient = false;
- return true;
- }
- } // for i - m_Slots[]
- return false;
-}
-
-
-
-
-
-bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client)
-{
- // Returns true if socket handled by this thread
- for (int i = m_NumSlots - 1; i >= 0; --i)
- {
- if (m_Slots[i].m_Client == a_Client)
- {
- m_Slots[i].m_ShouldClose = true;
-
- // Notify the thread that there's a close queued (in case its conditions are already met):
- ASSERT(m_ControlSocket2.IsValid());
- m_ControlSocket2.Send("c", 1);
-
- return true;
- }
- } // for i - m_Slots[]
- return false;
-}
-
-
-
-
-
bool cSocketThreads::cSocketThread::Start(void)
{
// Create the control socket listener
@@ -497,10 +389,13 @@ void cSocketThreads::cSocketThread::Execute(void)
fd_set fdRead;
cSocket::xSocket Highest = m_ControlSocket1.GetSocket();
- PrepareSet(&fdRead, Highest);
+ PrepareSet(&fdRead, Highest, false);
// Wait for the sockets:
- if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1)
+ timeval Timeout;
+ Timeout.tv_sec = 5;
+ Timeout.tv_usec = 0;
+ if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1)
{
LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str());
continue;
@@ -511,8 +406,7 @@ void cSocketThreads::cSocketThread::Execute(void)
// Test sockets for writing:
fd_set fdWrite;
Highest = m_ControlSocket1.GetSocket();
- PrepareSet(&fdWrite, Highest);
- timeval Timeout;
+ PrepareSet(&fdWrite, Highest, true);
Timeout.tv_sec = 0;
Timeout.tv_usec = 0;
if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1)
@@ -522,6 +416,8 @@ void cSocketThreads::cSocketThread::Execute(void)
}
WriteToSockets(&fdWrite);
+
+ CleanUpShutSockets();
} // while (!mShouldTerminate)
}
@@ -529,7 +425,7 @@ void cSocketThreads::cSocketThread::Execute(void)
-void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest)
+void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting)
{
FD_ZERO(a_Set);
FD_SET(m_ControlSocket1.GetSocket(), a_Set);
@@ -541,6 +437,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket
{
continue;
}
+ if (m_Slots[i].m_State == sSlot::ssRemoteClosed)
+ {
+ // This socket won't provide nor consume any data anymore, don't put it in the Set
+ continue;
+ }
cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket();
FD_SET(s, a_Set);
if (s > a_Highest)
@@ -576,29 +477,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read)
}
char Buffer[1024];
int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0);
- if (Received == 0)
- {
- // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading
- m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
- {
- m_Slots[i].m_Client->SocketClosed();
- }
- }
- else if (Received > 0)
+ if (Received <= 0)
{
- if (m_Slots[i].m_ShouldCallClient)
+ // The socket has been closed by the remote party
+ switch (m_Slots[i].m_State)
{
- m_Slots[i].m_Client->DataReceived(Buffer, Received);
- }
+ case sSlot::ssNormal:
+ {
+ // Notify the callback that the remote has closed the socket; keep the slot
+ m_Slots[i].m_Client->SocketClosed();
+ m_Slots[i].m_State = sSlot::ssRemoteClosed;
+ break;
+ }
+ case sSlot::ssWritingRestOut:
+ case sSlot::ssShuttingDown:
+ case sSlot::ssShuttingDown2:
+ {
+ // Force-close the socket and remove the slot:
+ m_Slots[i].m_Socket.CloseSocket();
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ break;
+ }
+ default:
+ {
+ LOG("%s: Unexpected socket state: %d (%s)",
+ __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str()
+ );
+ ASSERT(!"Unexpected socket state");
+ break;
+ }
+ } // switch (m_Slots[i].m_State)
}
else
{
- // The socket has encountered an error, close it and let it be removed after we process all reading
- m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
- m_Slots[i].m_Client->SocketClosed();
+ m_Slots[i].m_Client->DataReceived(Buffer, Received);
}
}
} // for i - m_Slots[]
@@ -622,22 +536,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
if (m_Slots[i].m_Outgoing.empty())
{
// Request another chunk of outgoing data:
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing);
}
if (m_Slots[i].m_Outgoing.empty())
{
- // Nothing ready
- if (m_Slots[i].m_ShouldClose)
+ // No outgoing data is ready
+ if (m_Slots[i].m_State == sSlot::ssWritingRestOut)
{
- // Socket was queued for closing and there's no more data to send, close it now:
-
- // DEBUG
- LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket());
-
- m_Slots[i].m_Socket.CloseSocket();
- // The slot must be freed actively by the client, using RemoveClient()
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ m_Slots[i].m_Socket.ShutdownReadWrite();
}
continue;
}
@@ -649,7 +558,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
int Err = cSocket::GetLastError();
LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str());
m_Slots[i].m_Socket.CloseSocket();
- if (m_Slots[i].m_ShouldCallClient)
+ if (m_Slots[i].m_Client != NULL)
{
m_Slots[i].m_Client->SocketClosed();
}
@@ -657,6 +566,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
}
m_Slots[i].m_Outgoing.erase(0, Sent);
+ if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut))
+ {
+ m_Slots[i].m_State = sSlot::ssShuttingDown;
+ m_Slots[i].m_Socket.ShutdownReadWrite();
+ }
+
// _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled
// This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread)
/*
@@ -673,3 +588,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write)
+
+void cSocketThreads::cSocketThread::CleanUpShutSockets(void)
+{
+ for (int i = m_NumSlots - 1; i >= 0; i--)
+ {
+ switch (m_Slots[i].m_State)
+ {
+ case sSlot::ssShuttingDown2:
+ {
+ // The socket has reached the shutdown timeout, close it and clear its slot:
+ m_Slots[i].m_Socket.CloseSocket();
+ m_Slots[i] = m_Slots[--m_NumSlots];
+ break;
+ }
+ case sSlot::ssShuttingDown:
+ {
+ // The socket has been shut down for a single thread loop, let it loop once more before closing:
+ m_Slots[i].m_State = sSlot::ssShuttingDown2;
+ break;
+ }
+ default: break;
+ }
+ } // for i - m_Slots[]
+}
+
+
+
+