From 3c0e8c8da0e397f3fda6b1565e7b2e9eeb4a747b Mon Sep 17 00:00:00 2001 From: madmaxoft Date: Sun, 19 Jan 2014 19:31:43 +0100 Subject: Rewritten SocketThreads for proper shutdown scenario. This fixes #560 and #390. --- src/OSSupport/Socket.cpp | 19 ++++ src/OSSupport/Socket.h | 6 +- src/OSSupport/SocketThreads.cpp | 236 ++++++++++++++++++++-------------------- src/OSSupport/SocketThreads.h | 79 ++++++++------ 4 files changed, 182 insertions(+), 158 deletions(-) (limited to 'src/OSSupport') diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index 8ea5d8320..d80c9bb3d 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -87,6 +87,25 @@ void cSocket::CloseSocket() +void cSocket::ShutdownReadWrite(void) +{ + #ifdef _WIN32 + int res = shutdown(m_Socket, SD_BOTH); + #else + int res = shutdown(m_Socket, SHUT_RDWR); + #endif + if (res != 0) + { + LOGWARN("%s: Error shutting down socket %d (%s): %d (%s)", + __FUNCTION__, m_Socket, m_IPString.c_str(), this->GetLastError(), GetLastErrorString().c_str() + ); + } +} + + + + + AString cSocket::GetErrorString( int a_ErrNo ) { char buffer[ 1024 ]; diff --git a/src/OSSupport/Socket.h b/src/OSSupport/Socket.h index b86560de8..91c9ca5fd 100644 --- a/src/OSSupport/Socket.h +++ b/src/OSSupport/Socket.h @@ -39,7 +39,11 @@ public: bool IsValid(void) const { return IsValidSocket(m_Socket); } void CloseSocket(void); - + + /** Notifies the socket that we don't expect any more reads nor writes on it. + Most TCPIP implementations use this to send the FIN flag in a packet */ + void ShutdownReadWrite(void); + operator xSocket(void) const; xSocket GetSocket(void) const; diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index b222a2e4e..b8069cf00 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -132,47 +132,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) -/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made -void cSocketThreads::StopReading(const cCallback * a_Client) -{ - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->StopReading(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - // Cannot assert, this normally happens if the socket is closed before the client deinitializes - // ASSERT(!"Stopping reading on an unknown client"); -} - - - - - -/// Queues the socket for closing, as soon as its outgoing data is sent -void cSocketThreads::QueueClose(const cCallback * a_Client) -{ - LOGD("QueueClose(client %p)", a_Client); - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->QueueClose(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - ASSERT(!"Queueing close of an unknown client"); -} - - - - - //////////////////////////////////////////////////////////////////////////////// // cSocketThreads::cSocketThread: @@ -210,13 +169,13 @@ cSocketThreads::cSocketThread::~cSocketThread() void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Outgoing.clear(); - m_Slots[m_NumSlots].m_ShouldClose = false; - m_Slots[m_NumSlots].m_ShouldCallClient = true; + m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; // Notify the thread of the change: @@ -230,7 +189,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) { - // Returns true if removed, false if not found + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); if (m_NumSlots == 0) { @@ -244,8 +203,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) continue; } - // Found, remove it: - m_Slots[i] = m_Slots[--m_NumSlots]; + // Found the slot: + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + // The remote has already closed the socket, remove the slot altogether: + m_Slots[i] = m_Slots[--m_NumSlots]; + } + else + { + // Query and queue the last batch of outgoing data: + m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + if (m_Slots[i].m_Outgoing.empty()) + { + // No more outgoing data, shut the socket down immediately: + m_Slots[i].m_Socket.ShutdownReadWrite(); + m_Slots[i].m_State = sSlot::ssShuttingDown; + } + else + { + // More data to send, shut down reading and wait for the rest to get sent: + m_Slots[i].m_State = sSlot::ssWritingRestOut; + } + m_Slots[i].m_Client = NULL; + } // Notify the thread of the change: ASSERT(m_ControlSocket2.IsValid()); @@ -263,6 +243,8 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -295,6 +277,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + if (HasClient(a_Client)) { // Notify the thread that there's another packet in the queue: @@ -311,7 +295,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) { - // Returns true if socket handled by this thread + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -332,47 +316,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr -bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) -{ - // Returns true if client handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldCallClient = false; - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client) -{ - // Returns true if socket handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldClose = true; - - // Notify the thread that there's a close queued (in case its conditions are already met): - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("c", 1); - - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - bool cSocketThreads::cSocketThread::Start(void) { // Create the control socket listener @@ -446,10 +389,13 @@ void cSocketThreads::cSocketThread::Execute(void) fd_set fdRead; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdRead, Highest); + PrepareSet(&fdRead, Highest, false); // Wait for the sockets: - if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1) + timeval Timeout; + Timeout.tv_sec = 5; + Timeout.tv_usec = 0; + if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) { LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; @@ -460,8 +406,7 @@ void cSocketThreads::cSocketThread::Execute(void) // Test sockets for writing: fd_set fdWrite; Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest); - timeval Timeout; + PrepareSet(&fdWrite, Highest, true); Timeout.tv_sec = 0; Timeout.tv_usec = 0; if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) @@ -471,6 +416,8 @@ void cSocketThreads::cSocketThread::Execute(void) } WriteToSockets(&fdWrite); + + CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -478,7 +425,7 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest) +void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) { FD_ZERO(a_Set); FD_SET(m_ControlSocket1.GetSocket(), a_Set); @@ -490,6 +437,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket { continue; } + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + // This socket won't provide nor consume any data anymore, don't put it in the Set + continue; + } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); FD_SET(s, a_Set); if (s > a_Highest) @@ -525,29 +477,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) } char Buffer[1024]; int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); - if (Received == 0) + if (Received <= 0) { - // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - m_Slots[i].m_Client->SocketClosed(); - } - } - else if (Received > 0) - { - if (m_Slots[i].m_ShouldCallClient) - { - m_Slots[i].m_Client->DataReceived(Buffer, Received); - } + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) } else { - // The socket has encountered an error, close it and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_Client->DataReceived(Buffer, Received); } } } // for i - m_Slots[] @@ -571,22 +536,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) if (m_Slots[i].m_Outgoing.empty()) { // Request another chunk of outgoing data: - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); } if (m_Slots[i].m_Outgoing.empty()) { - // Nothing ready - if (m_Slots[i].m_ShouldClose) + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) { - // Socket was queued for closing and there's no more data to send, close it now: - - // DEBUG - LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket()); - - m_Slots[i].m_Socket.CloseSocket(); - // The slot must be freed actively by the client, using RemoveClient() + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); } continue; } @@ -598,7 +558,7 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) int Err = cSocket::GetLastError(); LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->SocketClosed(); } @@ -606,6 +566,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } m_Slots[i].m_Outgoing.erase(0, Sent); + if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) + { + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread) /* @@ -622,3 +588,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) + +void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +{ + for (int i = m_NumSlots - 1; i >= 0; i--) + { + switch (m_Slots[i].m_State) + { + case sSlot::ssShuttingDown2: + { + // The socket has reached the shutdown timeout, close it and clear its slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + case sSlot::ssShuttingDown: + { + // The socket has been shut down for a single thread loop, let it loop once more before closing: + m_Slots[i].m_State = sSlot::ssShuttingDown2; + break; + } + default: break; + } + } // for i - m_Slots[] +} + + + + diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h index e16ae69fb..9e1947ab6 100644 --- a/src/OSSupport/SocketThreads.h +++ b/src/OSSupport/SocketThreads.h @@ -7,19 +7,20 @@ /* Additional details: -When a client is terminating a connection: -- they call the StopReading() method to disable callbacks for the incoming data -- they call the Write() method to queue any outstanding outgoing data -- they call the QueueClose() method to queue the socket to close after outgoing data has been sent. -When a socket slot is marked as having no callback, it is kept alive until its outgoing data queue is empty and its m_ShouldClose flag is set. -This means that the socket can be written to several times before finally closing it via QueueClose() +When a client wants to terminate the connection, they call the RemoveClient() function. This calls the +callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData +buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data +queue is empty, then shutdown is called on it and finally the socket is closed after a timeout. +If at any time within this the remote end closes the socket, then the socket is closed directly. +As soon as the socket is closed, the slot is finally removed from the SocketThread. +The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot. */ -/// How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) +/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */ #define MAX_SLOTS 63 @@ -27,8 +28,6 @@ This means that the socket can be written to several times before finally closin #pragma once -#ifndef CSOCKETTHREADS_H_INCLUDED -#define CSOCKETTHREADS_H_INCLUDED #include "Socket.h" #include "IsThread.h" @@ -64,13 +63,13 @@ public: // Force a virtual destructor in all subclasses: virtual ~cCallback() {} - /// Called when data is received from the remote party + /** Called when data is received from the remote party */ virtual void DataReceived(const char * a_Data, int a_Size) = 0; - /// Called when data can be sent to remote party; the function is supposed to append outgoing data to a_Data + /** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */ virtual void GetOutgoingData(AString & a_Data) = 0; - /// Called when the socket has been closed for any reason + /** Called when the socket has been closed for any reason */ virtual void SocketClosed(void) = 0; } ; @@ -78,26 +77,21 @@ public: cSocketThreads(void); ~cSocketThreads(); - /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful + /** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */ bool AddClient(const cSocket & a_Socket, cCallback * a_Client); /** Remove the associated socket and the client from processing. - The socket is left to send its data and is removed only after all its m_OutgoingData is sent + The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent + and after the socket is properly shutdown (unless the remote disconnects before that) */ void RemoveClient(const cCallback * a_Client); - /// Notify the thread responsible for a_Client that the client has something to write + /** Notify the thread responsible for a_Client that the client has something to write */ void NotifyWrite(const cCallback * a_Client); - /// Puts a_Data into outgoing data queue for a_Client + /** Puts a_Data into outgoing data queue for a_Client */ void Write(const cCallback * a_Client, const AString & a_Data); - /// Stops reading from the client - when this call returns, no more calls to the callbacks are made - void StopReading(const cCallback * a_Client); - - /// Queues the client for closing, as soon as its outgoing data is sent - void QueueClose(const cCallback * a_Client); - private: class cSocketThread : @@ -120,8 +114,6 @@ private: bool HasSocket (const cSocket * a_Socket) const; bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread - bool StopReading (const cCallback * a_Client); // Returns true if client handled by this thread - bool QueueClose (const cCallback * a_Client); // Returns true if client handled by this thread bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket @@ -135,24 +127,45 @@ private: cSocket m_ControlSocket1; cSocket m_ControlSocket2; - // Socket-client-packetqueues triplets. + // Socket-client-dataqueues-state quadruplets. // Manipulation with these assumes that the parent's m_CS is locked struct sSlot { - cSocket m_Socket; // The socket is primarily owned by this + /** The socket is primarily owned by this object */ + cSocket m_Socket; + + /** The callback to call for events. May be NULL */ cCallback * m_Client; - AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send - bool m_ShouldClose; // If true, the socket is to be closed after sending all outgoing data - bool m_ShouldCallClient; // If true, the client callbacks are called. Set to false in StopReading() + + /** If sending writes only partial data, the rest is stored here for another send. + Also used when the slot is being removed to store the last batch of outgoing data. */ + AString m_Outgoing; + + enum eState + { + ssNormal, ///< Normal read / write operations + ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data + ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown() + ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection) + ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback) + } m_State; } ; + sSlot m_Slots[MAX_SLOTS]; int m_NumSlots; // Number of slots actually used virtual void Execute(void) override; - void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1 + /** Puts all sockets into the set, along with m_ControlSocket1. + Only sockets that are able to send and receive data are put in the Set. + Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */ + void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting); + void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write + + /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ + void CleanUpShutSockets(void); } ; typedef std::list cSocketThreadList; @@ -165,9 +178,3 @@ private: - -#endif // CSOCKETTHREADS_H_INCLUDED - - - - -- cgit v1.2.3