diff options
Diffstat (limited to 'src/OSSupport')
-rw-r--r-- | src/OSSupport/BlockingTCPLink.cpp | 4 | ||||
-rw-r--r-- | src/OSSupport/Errors.cpp | 53 | ||||
-rw-r--r-- | src/OSSupport/Errors.h | 5 | ||||
-rw-r--r-- | src/OSSupport/Event.cpp | 17 | ||||
-rw-r--r-- | src/OSSupport/File.cpp | 9 | ||||
-rw-r--r-- | src/OSSupport/File.h | 45 | ||||
-rw-r--r-- | src/OSSupport/Socket.cpp | 51 | ||||
-rw-r--r-- | src/OSSupport/Socket.h | 13 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.cpp | 286 | ||||
-rw-r--r-- | src/OSSupport/SocketThreads.h | 82 |
10 files changed, 282 insertions, 283 deletions
diff --git a/src/OSSupport/BlockingTCPLink.cpp b/src/OSSupport/BlockingTCPLink.cpp index 08aec0c65..af50eda5d 100644 --- a/src/OSSupport/BlockingTCPLink.cpp +++ b/src/OSSupport/BlockingTCPLink.cpp @@ -2,7 +2,7 @@ #include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules #include "BlockingTCPLink.h" - +#include "Errors.h" @@ -75,7 +75,7 @@ bool cBlockingTCPLink::Connect(const char * iAddress, unsigned int iPort) server.sin_port = htons( (unsigned short)iPort); if (connect(m_Socket, (struct sockaddr *)&server, sizeof(server))) { - LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort, cSocket::GetErrorString( cSocket::GetLastError() ).c_str() ); + LOGWARN("cTCPLink: Connection to \"%s:%d\" failed (%s)", iAddress, iPort,GetOSErrorString( cSocket::GetLastError() ).c_str() ); CloseSocket(); return false; } diff --git a/src/OSSupport/Errors.cpp b/src/OSSupport/Errors.cpp new file mode 100644 index 000000000..2e05f1df1 --- /dev/null +++ b/src/OSSupport/Errors.cpp @@ -0,0 +1,53 @@ + +#include "Globals.h" + +#include "Errors.h" + +AString GetOSErrorString( int a_ErrNo ) +{ + char buffer[ 1024 ]; + AString Out; + + #ifdef _WIN32 + + FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL); + Printf(Out, "%d: %s", a_ErrNo, buffer); + if (!Out.empty() && (Out[Out.length() - 1] == '\n')) + { + Out.erase(Out.length() - 2); + } + return Out; + + #else // _WIN32 + + // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r(): + + #if ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r() + + char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); + if( res != NULL ) + { + Printf(Out, "%d: %s", a_ErrNo, res); + return Out; + } + + #else // XSI version of strerror_r(): + + int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); + if( res == 0 ) + { + Printf(Out, "%d: %s", a_ErrNo, buffer); + return Out; + } + + #endif // strerror_r() version + + else + { + Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo); + return Out; + } + + #endif // else _WIN32 +} + diff --git a/src/OSSupport/Errors.h b/src/OSSupport/Errors.h new file mode 100644 index 000000000..8ce9deb10 --- /dev/null +++ b/src/OSSupport/Errors.h @@ -0,0 +1,5 @@ + +#pragma once + +AString GetOSErrorString(int a_ErrNo); + diff --git a/src/OSSupport/Event.cpp b/src/OSSupport/Event.cpp index cbacbba17..649a0a3cf 100644 --- a/src/OSSupport/Event.cpp +++ b/src/OSSupport/Event.cpp @@ -7,7 +7,7 @@ #include "Globals.h" // NOTE: MSVC stupidness requires this to be the same across all modules #include "Event.h" - +#include "Errors.h" @@ -35,14 +35,16 @@ cEvent::cEvent(void) m_Event = sem_open(EventName.c_str(), O_CREAT, 777, 0 ); if (m_Event == SEM_FAILED) { - LOGERROR("cEvent: Cannot create event, errno = %i. Aborting server.", errno); + AString error = GetOSErrorString(errno); + LOGERROR("cEvent: Cannot create event, err = %s. Aborting server.", error.c_str()); abort(); } // Unlink the semaphore immediately - it will continue to function but will not pollute the namespace // We don't store the name, so can't call this in the destructor if (sem_unlink(EventName.c_str()) != 0) { - LOGWARN("ERROR: Could not unlink cEvent. (%i)", errno); + AString error = GetOSErrorString(errno); + LOGWARN("ERROR: Could not unlink cEvent. (%s)", error.c_str()); } } #endif // *nix @@ -61,7 +63,8 @@ cEvent::~cEvent() { if (sem_close(m_Event) != 0) { - LOGERROR("ERROR: Could not close cEvent. (%i)", errno); + AString error = GetOSErrorString(errno); + LOGERROR("ERROR: Could not close cEvent. (%s)", error.c_str()); } } else @@ -88,7 +91,8 @@ void cEvent::Wait(void) int res = sem_wait(m_Event); if (res != 0 ) { - LOGWARN("cEvent: waiting for the event failed: %i, errno = %i. Continuing, but server may be unstable.", res, errno); + AString error = GetOSErrorString(errno); + LOGWARN("cEvent: waiting for the event failed: %i, err = %s. Continuing, but server may be unstable.", res, error.c_str()); } #endif } @@ -108,7 +112,8 @@ void cEvent::Set(void) int res = sem_post(m_Event); if (res != 0) { - LOGWARN("cEvent: Could not set cEvent: %i, errno = %d", res, errno); + AString error = GetOSErrorString(errno); + LOGWARN("cEvent: Could not set cEvent: %i, err = %s", res, error.c_str()); } #endif } diff --git a/src/OSSupport/File.cpp b/src/OSSupport/File.cpp index 9f7c0d439..0ebd04915 100644 --- a/src/OSSupport/File.cpp +++ b/src/OSSupport/File.cpp @@ -450,3 +450,12 @@ int cFile::Printf(const char * a_Fmt, ...) + +void cFile::Flush(void) +{ + fflush(m_File); +} + + + + diff --git a/src/OSSupport/File.h b/src/OSSupport/File.h index 01663a229..07fce6661 100644 --- a/src/OSSupport/File.h +++ b/src/OSSupport/File.h @@ -18,6 +18,8 @@ Usage: 2, Check if the file was opened using IsOpen() 3, Read / write 4, Destroy the instance + +For reading entire files into memory, just use the static cFile::ReadWholeFile() */ @@ -55,7 +57,7 @@ public: static const char PathSeparator = '/'; #endif - /// The mode in which to open the file + /** The mode in which to open the file */ enum eMode { fmRead, // Read-only. If the file doesn't exist, object will not be valid @@ -63,13 +65,13 @@ public: fmReadWrite // Read/write. If the file already exists, it will be left intact; writing will overwrite the data from the beginning } ; - /// Simple constructor - creates an unopened file object, use Open() to open / create a real file + /** Simple constructor - creates an unopened file object, use Open() to open / create a real file */ cFile(void); - /// Constructs and opens / creates the file specified, use IsOpen() to check for success + /** Constructs and opens / creates the file specified, use IsOpen() to check for success */ cFile(const AString & iFileName, eMode iMode); - /// Auto-closes the file, if open + /** Auto-closes the file, if open */ ~cFile(); bool Open(const AString & iFileName, eMode iMode); @@ -77,60 +79,63 @@ public: bool IsOpen(void) const; bool IsEOF(void) const; - /// Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open + /** Reads up to iNumBytes bytes into iBuffer, returns the number of bytes actually read, or -1 on failure; asserts if not open */ int Read (void * iBuffer, int iNumBytes); - /// Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open + /** Writes up to iNumBytes bytes from iBuffer, returns the number of bytes actually written, or -1 on failure; asserts if not open */ int Write(const void * iBuffer, int iNumBytes); - /// Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open + /** Seeks to iPosition bytes from file start, returns old position or -1 for failure; asserts if not open */ int Seek (int iPosition); - /// Returns the current position (bytes from file start) or -1 for failure; asserts if not open + /** Returns the current position (bytes from file start) or -1 for failure; asserts if not open */ int Tell (void) const; - /// Returns the size of file, in bytes, or -1 for failure; asserts if not open + /** Returns the size of file, in bytes, or -1 for failure; asserts if not open */ int GetSize(void) const; - /// Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error + /** Reads the file from current position till EOF into an AString; returns the number of bytes read or -1 for error */ int ReadRestOfFile(AString & a_Contents); // tolua_begin - /// Returns true if the file specified exists + /** Returns true if the file specified exists */ static bool Exists(const AString & a_FileName); - /// Deletes a file, returns true if successful + /** Deletes a file, returns true if successful */ static bool Delete(const AString & a_FileName); - /// Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)! + /** Renames a file or folder, returns true if successful. May fail if dest already exists (libc-dependant)! */ static bool Rename(const AString & a_OrigPath, const AString & a_NewPath); - /// Copies a file, returns true if successful. + /** Copies a file, returns true if successful. */ static bool Copy(const AString & a_SrcFileName, const AString & a_DstFileName); - /// Returns true if the specified path is a folder + /** Returns true if the specified path is a folder */ static bool IsFolder(const AString & a_Path); - /// Returns true if the specified path is a regular file + /** Returns true if the specified path is a regular file */ static bool IsFile(const AString & a_Path); - /// Returns the size of the file, or a negative number on error + /** Returns the size of the file, or a negative number on error */ static int GetSize(const AString & a_FileName); - /// Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute + /** Creates a new folder with the specified name. Returns true if successful. Path may be relative or absolute */ static bool CreateFolder(const AString & a_FolderPath); - /// Returns the entire contents of the specified file as a string. Returns empty string on error. + /** Returns the entire contents of the specified file as a string. Returns empty string on error. */ static AString ReadWholeFile(const AString & a_FileName); // tolua_end - /// Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). + /** Returns the list of all items in the specified folder (files, folders, nix pipes, whatever's there). */ static AStringVector GetFolderContents(const AString & a_Folder); // Exported in ManualBindings.cpp int Printf(const char * a_Fmt, ...); + /** Flushes all the bufferef output into the file (only when writing) */ + void Flush(void); + private: #ifdef USE_STDIO_FILE FILE * m_File; diff --git a/src/OSSupport/Socket.cpp b/src/OSSupport/Socket.cpp index 8ea5d8320..4226a7535 100644 --- a/src/OSSupport/Socket.cpp +++ b/src/OSSupport/Socket.cpp @@ -87,52 +87,19 @@ void cSocket::CloseSocket() -AString cSocket::GetErrorString( int a_ErrNo ) +void cSocket::ShutdownReadWrite(void) { - char buffer[ 1024 ]; - AString Out; - #ifdef _WIN32 - - FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, a_ErrNo, 0, buffer, ARRAYCOUNT(buffer), NULL); - Printf(Out, "%d: %s", a_ErrNo, buffer); - if (!Out.empty() && (Out[Out.length() - 1] == '\n')) - { - Out.erase(Out.length() - 2); - } - return Out; - - #else // _WIN32 - - // According to http://linux.die.net/man/3/strerror_r there are two versions of strerror_r(): - - #if ( _GNU_SOURCE ) && !defined(ANDROID_NDK) // GNU version of strerror_r() - - char * res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); - if( res != NULL ) - { - Printf(Out, "%d: %s", a_ErrNo, res); - return Out; - } - - #else // XSI version of strerror_r(): - - int res = strerror_r( errno, buffer, ARRAYCOUNT(buffer) ); - if( res == 0 ) - { - Printf(Out, "%d: %s", a_ErrNo, buffer); - return Out; - } - - #endif // strerror_r() version - - else + int res = shutdown(m_Socket, SD_BOTH); + #else + int res = shutdown(m_Socket, SHUT_RDWR); + #endif + if (res != 0) { - Printf(Out, "Error %d while getting error string for error #%d!", errno, a_ErrNo); - return Out; + LOGWARN("%s: Error shutting down socket %d (%s): %d (%s)", + __FUNCTION__, m_Socket, m_IPString.c_str(), this->GetLastError(), GetLastErrorString().c_str() + ); } - - #endif // else _WIN32 } diff --git a/src/OSSupport/Socket.h b/src/OSSupport/Socket.h index b86560de8..4ca3d61f4 100644 --- a/src/OSSupport/Socket.h +++ b/src/OSSupport/Socket.h @@ -14,7 +14,7 @@ #endif - +#include "Errors.h" class cSocket @@ -39,7 +39,11 @@ public: bool IsValid(void) const { return IsValidSocket(m_Socket); } void CloseSocket(void); - + + /** Notifies the socket that we don't expect any more reads nor writes on it. + Most TCPIP implementations use this to send the FIN flag in a packet */ + void ShutdownReadWrite(void); + operator xSocket(void) const; xSocket GetSocket(void) const; @@ -53,11 +57,10 @@ public: /// Initializes the network stack. Returns 0 on success, or another number as an error code. static int WSAStartup(void); - static AString GetErrorString(int a_ErrNo); static int GetLastError(); static AString GetLastErrorString(void) { - return GetErrorString(GetLastError()); + return GetOSErrorString(GetLastError()); } /// Creates a new socket of the specified address family @@ -111,4 +114,4 @@ public: private: xSocket m_Socket; AString m_IPString; -};
\ No newline at end of file +}; diff --git a/src/OSSupport/SocketThreads.cpp b/src/OSSupport/SocketThreads.cpp index 3e505616c..74932daf8 100644 --- a/src/OSSupport/SocketThreads.cpp +++ b/src/OSSupport/SocketThreads.cpp @@ -7,6 +7,7 @@ #include "Globals.h" #include "SocketThreads.h" +#include "Errors.h" @@ -71,29 +72,6 @@ bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) -/* -void cSocketThreads::RemoveClient(const cSocket * a_Socket) -{ - // Remove the socket (and associated client) from processing - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->RemoveSocket(a_Socket)) - { - return; - } - } // for itr - m_Threads[] - - // Cannot assert here, this may actually happen legally, since cClientHandle has to clean up the socket and it may have already closed in the meantime - // ASSERT(!"Removing an unknown socket"); -} -*/ - - - - - void cSocketThreads::RemoveClient(const cCallback * a_Client) { // Remove the associated socket and the client from processing @@ -155,47 +133,6 @@ void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) -/// Stops reading from the socket - when this call returns, no more calls to the callbacks are made -void cSocketThreads::StopReading(const cCallback * a_Client) -{ - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->StopReading(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - // Cannot assert, this normally happens if the socket is closed before the client deinitializes - // ASSERT(!"Stopping reading on an unknown client"); -} - - - - - -/// Queues the socket for closing, as soon as its outgoing data is sent -void cSocketThreads::QueueClose(const cCallback * a_Client) -{ - LOGD("QueueClose(client %p)", a_Client); - - cCSLock Lock(m_CS); - for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) - { - if ((*itr)->QueueClose(a_Client)) - { - return; - } - } // for itr - m_Threads[] - - ASSERT(!"Queueing close of an unknown client"); -} - - - - - //////////////////////////////////////////////////////////////////////////////// // cSocketThreads::cSocketThread: @@ -233,13 +170,13 @@ cSocketThreads::cSocketThread::~cSocketThread() void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Outgoing.clear(); - m_Slots[m_NumSlots].m_ShouldClose = false; - m_Slots[m_NumSlots].m_ShouldCallClient = true; + m_Slots[m_NumSlots].m_State = sSlot::ssNormal; m_NumSlots++; // Notify the thread of the change: @@ -253,7 +190,7 @@ void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallbac bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) { - // Returns true if removed, false if not found + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); if (m_NumSlots == 0) { @@ -267,36 +204,29 @@ bool cSocketThreads::cSocketThread::RemoveClient(const cCallback * a_Client) continue; } - // Found, remove it: - m_Slots[i] = m_Slots[--m_NumSlots]; - - // Notify the thread of the change: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("r", 1); - return true; - } // for i - m_Slots[] - - // Not found - return false; -} - - - - - -bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket) -{ - // Returns true if removed, false if not found - - for (int i = m_NumSlots - 1; i >= 0 ; --i) - { - if (m_Slots[i].m_Socket != *a_Socket) + // Found the slot: + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) { - continue; + // The remote has already closed the socket, remove the slot altogether: + m_Slots[i] = m_Slots[--m_NumSlots]; + } + else + { + // Query and queue the last batch of outgoing data: + m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); + if (m_Slots[i].m_Outgoing.empty()) + { + // No more outgoing data, shut the socket down immediately: + m_Slots[i].m_Socket.ShutdownReadWrite(); + m_Slots[i].m_State = sSlot::ssShuttingDown; + } + else + { + // More data to send, shut down reading and wait for the rest to get sent: + m_Slots[i].m_State = sSlot::ssWritingRestOut; + } + m_Slots[i].m_Client = NULL; } - - // Found, remove it: - m_Slots[i] = m_Slots[--m_NumSlots]; // Notify the thread of the change: ASSERT(m_ControlSocket2.IsValid()); @@ -314,6 +244,8 @@ bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket) bool cSocketThreads::cSocketThread::HasClient(const cCallback * a_Client) const { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -346,6 +278,8 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) { + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); + if (HasClient(a_Client)) { // Notify the thread that there's another packet in the queue: @@ -362,7 +296,7 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) { - // Returns true if socket handled by this thread + ASSERT(m_Parent->m_CS.IsLockedByCurrentThread()); for (int i = m_NumSlots - 1; i >= 0; --i) { if (m_Slots[i].m_Client == a_Client) @@ -383,47 +317,6 @@ bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AStr -bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) -{ - // Returns true if client handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldCallClient = false; - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - -bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client) -{ - // Returns true if socket handled by this thread - for (int i = m_NumSlots - 1; i >= 0; --i) - { - if (m_Slots[i].m_Client == a_Client) - { - m_Slots[i].m_ShouldClose = true; - - // Notify the thread that there's a close queued (in case its conditions are already met): - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("c", 1); - - return true; - } - } // for i - m_Slots[] - return false; -} - - - - - bool cSocketThreads::cSocketThread::Start(void) { // Create the control socket listener @@ -497,10 +390,13 @@ void cSocketThreads::cSocketThread::Execute(void) fd_set fdRead; cSocket::xSocket Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdRead, Highest); + PrepareSet(&fdRead, Highest, false); // Wait for the sockets: - if (select(Highest + 1, &fdRead, NULL, NULL, NULL) == -1) + timeval Timeout; + Timeout.tv_sec = 5; + Timeout.tv_usec = 0; + if (select(Highest + 1, &fdRead, NULL, NULL, &Timeout) == -1) { LOG("select(R) call failed in cSocketThread: \"%s\"", cSocket::GetLastErrorString().c_str()); continue; @@ -511,8 +407,7 @@ void cSocketThreads::cSocketThread::Execute(void) // Test sockets for writing: fd_set fdWrite; Highest = m_ControlSocket1.GetSocket(); - PrepareSet(&fdWrite, Highest); - timeval Timeout; + PrepareSet(&fdWrite, Highest, true); Timeout.tv_sec = 0; Timeout.tv_usec = 0; if (select(Highest + 1, NULL, &fdWrite, NULL, &Timeout) == -1) @@ -522,6 +417,8 @@ void cSocketThreads::cSocketThread::Execute(void) } WriteToSockets(&fdWrite); + + CleanUpShutSockets(); } // while (!mShouldTerminate) } @@ -529,7 +426,7 @@ void cSocketThreads::cSocketThread::Execute(void) -void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest) +void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting) { FD_ZERO(a_Set); FD_SET(m_ControlSocket1.GetSocket(), a_Set); @@ -541,6 +438,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket { continue; } + if (m_Slots[i].m_State == sSlot::ssRemoteClosed) + { + // This socket won't provide nor consume any data anymore, don't put it in the Set + continue; + } cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); FD_SET(s, a_Set); if (s > a_Highest) @@ -576,29 +478,42 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) } char Buffer[1024]; int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); - if (Received == 0) - { - // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) - { - m_Slots[i].m_Client->SocketClosed(); - } - } - else if (Received > 0) + if (Received <= 0) { - if (m_Slots[i].m_ShouldCallClient) + // The socket has been closed by the remote party + switch (m_Slots[i].m_State) { - m_Slots[i].m_Client->DataReceived(Buffer, Received); - } + case sSlot::ssNormal: + { + // Notify the callback that the remote has closed the socket; keep the slot + m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_State = sSlot::ssRemoteClosed; + break; + } + case sSlot::ssWritingRestOut: + case sSlot::ssShuttingDown: + case sSlot::ssShuttingDown2: + { + // Force-close the socket and remove the slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + default: + { + LOG("%s: Unexpected socket state: %d (%s)", + __FUNCTION__, m_Slots[i].m_Socket.GetSocket(), m_Slots[i].m_Socket.GetIPString().c_str() + ); + ASSERT(!"Unexpected socket state"); + break; + } + } // switch (m_Slots[i].m_State) } else { - // The socket has encountered an error, close it and let it be removed after we process all reading - m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { - m_Slots[i].m_Client->SocketClosed(); + m_Slots[i].m_Client->DataReceived(Buffer, Received); } } } // for i - m_Slots[] @@ -622,22 +537,17 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) if (m_Slots[i].m_Outgoing.empty()) { // Request another chunk of outgoing data: - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); } if (m_Slots[i].m_Outgoing.empty()) { - // Nothing ready - if (m_Slots[i].m_ShouldClose) + // No outgoing data is ready + if (m_Slots[i].m_State == sSlot::ssWritingRestOut) { - // Socket was queued for closing and there's no more data to send, close it now: - - // DEBUG - LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket()); - - m_Slots[i].m_Socket.CloseSocket(); - // The slot must be freed actively by the client, using RemoveClient() + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); } continue; } @@ -647,9 +557,9 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) if (Sent < 0) { int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); + LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), GetOSErrorString(Err).c_str()); m_Slots[i].m_Socket.CloseSocket(); - if (m_Slots[i].m_ShouldCallClient) + if (m_Slots[i].m_Client != NULL) { m_Slots[i].m_Client->SocketClosed(); } @@ -657,6 +567,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) } m_Slots[i].m_Outgoing.erase(0, Sent); + if (m_Slots[i].m_Outgoing.empty() && (m_Slots[i].m_State == sSlot::ssWritingRestOut)) + { + m_Slots[i].m_State = sSlot::ssShuttingDown; + m_Slots[i].m_Socket.ShutdownReadWrite(); + } + // _X: If there's data left, it means the client is not reading fast enough, the server would unnecessarily spin in the main loop with zero actions taken; so signalling is disabled // This means that if there's data left, it will be sent only when there's incoming data or someone queues another packet (for any socket handled by this thread) /* @@ -673,3 +589,31 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) + +void cSocketThreads::cSocketThread::CleanUpShutSockets(void) +{ + for (int i = m_NumSlots - 1; i >= 0; i--) + { + switch (m_Slots[i].m_State) + { + case sSlot::ssShuttingDown2: + { + // The socket has reached the shutdown timeout, close it and clear its slot: + m_Slots[i].m_Socket.CloseSocket(); + m_Slots[i] = m_Slots[--m_NumSlots]; + break; + } + case sSlot::ssShuttingDown: + { + // The socket has been shut down for a single thread loop, let it loop once more before closing: + m_Slots[i].m_State = sSlot::ssShuttingDown2; + break; + } + default: break; + } + } // for i - m_Slots[] +} + + + + diff --git a/src/OSSupport/SocketThreads.h b/src/OSSupport/SocketThreads.h index 858729c49..9e1947ab6 100644 --- a/src/OSSupport/SocketThreads.h +++ b/src/OSSupport/SocketThreads.h @@ -7,19 +7,20 @@ /* Additional details: -When a client is terminating a connection: -- they call the StopReading() method to disable callbacks for the incoming data -- they call the Write() method to queue any outstanding outgoing data -- they call the QueueClose() method to queue the socket to close after outgoing data has been sent. -When a socket slot is marked as having no callback, it is kept alive until its outgoing data queue is empty and its m_ShouldClose flag is set. -This means that the socket can be written to several times before finally closing it via QueueClose() +When a client wants to terminate the connection, they call the RemoveClient() function. This calls the +callback one last time to read all the available outgoing data, putting it in the slot's m_OutgoingData +buffer. Then it marks the slot as having no callback. The socket is kept alive until its outgoing data +queue is empty, then shutdown is called on it and finally the socket is closed after a timeout. +If at any time within this the remote end closes the socket, then the socket is closed directly. +As soon as the socket is closed, the slot is finally removed from the SocketThread. +The graph in $/docs/SocketThreads States.gv shows the state-machine transitions of the slot. */ -/// How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) +/** How many clients should one thread handle? (must be less than FD_SETSIZE for your platform) */ #define MAX_SLOTS 63 @@ -27,8 +28,6 @@ This means that the socket can be written to several times before finally closin #pragma once -#ifndef CSOCKETTHREADS_H_INCLUDED -#define CSOCKETTHREADS_H_INCLUDED #include "Socket.h" #include "IsThread.h" @@ -64,13 +63,13 @@ public: // Force a virtual destructor in all subclasses: virtual ~cCallback() {} - /// Called when data is received from the remote party + /** Called when data is received from the remote party */ virtual void DataReceived(const char * a_Data, int a_Size) = 0; - /// Called when data can be sent to remote party; the function is supposed to append outgoing data to a_Data + /** Called when data can be sent to remote party; the function is supposed to *append* outgoing data to a_Data */ virtual void GetOutgoingData(AString & a_Data) = 0; - /// Called when the socket has been closed for any reason + /** Called when the socket has been closed for any reason */ virtual void SocketClosed(void) = 0; } ; @@ -78,24 +77,21 @@ public: cSocketThreads(void); ~cSocketThreads(); - /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful + /** Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful */ bool AddClient(const cSocket & a_Socket, cCallback * a_Client); - /// Remove the associated socket and the client from processing. The socket is left to send its data and is removed only after all its m_OutgoingData is sent + /** Remove the associated socket and the client from processing. + The socket is left to send its last outgoing data and is removed only after all its m_Outgoing is sent + and after the socket is properly shutdown (unless the remote disconnects before that) + */ void RemoveClient(const cCallback * a_Client); - /// Notify the thread responsible for a_Client that the client has something to write + /** Notify the thread responsible for a_Client that the client has something to write */ void NotifyWrite(const cCallback * a_Client); - /// Puts a_Data into outgoing data queue for a_Client + /** Puts a_Data into outgoing data queue for a_Client */ void Write(const cCallback * a_Client, const AString & a_Data); - /// Stops reading from the client - when this call returns, no more calls to the callbacks are made - void StopReading(const cCallback * a_Client); - - /// Queues the client for closing, as soon as its outgoing data is sent - void QueueClose(const cCallback * a_Client); - private: class cSocketThread : @@ -114,13 +110,10 @@ private: void AddClient (const cSocket & a_Socket, cCallback * a_Client); // Takes ownership of the socket bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found - bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found bool HasClient (const cCallback * a_Client) const; bool HasSocket (const cSocket * a_Socket) const; bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread - bool StopReading (const cCallback * a_Client); // Returns true if client handled by this thread - bool QueueClose (const cCallback * a_Client); // Returns true if client handled by this thread bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket @@ -134,24 +127,45 @@ private: cSocket m_ControlSocket1; cSocket m_ControlSocket2; - // Socket-client-packetqueues triplets. + // Socket-client-dataqueues-state quadruplets. // Manipulation with these assumes that the parent's m_CS is locked struct sSlot { - cSocket m_Socket; // The socket is primarily owned by this + /** The socket is primarily owned by this object */ + cSocket m_Socket; + + /** The callback to call for events. May be NULL */ cCallback * m_Client; - AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send - bool m_ShouldClose; // If true, the socket is to be closed after sending all outgoing data - bool m_ShouldCallClient; // If true, the client callbacks are called. Set to false in StopReading() + + /** If sending writes only partial data, the rest is stored here for another send. + Also used when the slot is being removed to store the last batch of outgoing data. */ + AString m_Outgoing; + + enum eState + { + ssNormal, ///< Normal read / write operations + ssWritingRestOut, ///< The client callback was removed, continue to send outgoing data + ssShuttingDown, ///< The last outgoing data has been sent, the socket has called shutdown() + ssShuttingDown2, ///< The shutdown has been done at least 1 thread loop ago (timeout detection) + ssRemoteClosed, ///< The remote end has closed the connection (and we still have a client callback) + } m_State; } ; + sSlot m_Slots[MAX_SLOTS]; int m_NumSlots; // Number of slots actually used virtual void Execute(void) override; - void PrepareSet (fd_set * a_Set, cSocket::xSocket & a_Highest); // Puts all sockets into the set, along with m_ControlSocket1 + /** Puts all sockets into the set, along with m_ControlSocket1. + Only sockets that are able to send and receive data are put in the Set. + Is a_IsForWriting is true, the ssWritingRestOut sockets are added as well. */ + void PrepareSet(fd_set * a_Set, cSocket::xSocket & a_Highest, bool a_IsForWriting); + void ReadFromSockets(fd_set * a_Read); // Reads from sockets indicated in a_Read void WriteToSockets (fd_set * a_Write); // Writes to sockets indicated in a_Write + + /** Removes those slots in ssShuttingDown2 state, sets those with ssShuttingDown state to ssShuttingDown2 */ + void CleanUpShutSockets(void); } ; typedef std::list<cSocketThread *> cSocketThreadList; @@ -164,9 +178,3 @@ private: - -#endif // CSOCKETTHREADS_H_INCLUDED - - - - |