From 7a43f6be09cfe74dc4a757e949ba88ce033ac419 Mon Sep 17 00:00:00 2001 From: "madmaxoft@gmail.com" Date: Tue, 25 Sep 2012 08:23:19 +0000 Subject: Sockets are now owned by SocketThreads instead of ClientHandle (FS #246 and FS #247) git-svn-id: http://mc-server.googlecode.com/svn/trunk@891 0a769ca7-a7f5-676a-18bf-c427514a06d6 --- source/OSSupport/File.h | 6 +++ source/OSSupport/SocketThreads.cpp | 99 +++++++++++++++++--------------------- source/OSSupport/SocketThreads.h | 26 +++++----- 3 files changed, 62 insertions(+), 69 deletions(-) (limited to 'source/OSSupport') diff --git a/source/OSSupport/File.h b/source/OSSupport/File.h index d16784236..f3d21d450 100644 --- a/source/OSSupport/File.h +++ b/source/OSSupport/File.h @@ -46,6 +46,12 @@ Usage: class cFile { public: + #ifdef _WIN32 + static const char PathSeparator = '\\'; + #else + static const char PathSeparator = '/'; + #endif + /// The mode in which to open the file enum EMode { diff --git a/source/OSSupport/SocketThreads.cpp b/source/OSSupport/SocketThreads.cpp index ae23b4496..dd961770d 100644 --- a/source/OSSupport/SocketThreads.cpp +++ b/source/OSSupport/SocketThreads.cpp @@ -37,7 +37,7 @@ cSocketThreads::~cSocketThreads() -bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) +bool cSocketThreads::AddClient(const cSocket & a_Socket, cCallback * a_Client) { // Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client @@ -71,6 +71,7 @@ bool cSocketThreads::AddClient(cSocket * a_Socket, cCallback * a_Client) +/* void cSocketThreads::RemoveClient(const cSocket * a_Socket) { // Remove the socket (and associated client) from processing @@ -87,6 +88,7 @@ void cSocketThreads::RemoveClient(const cSocket * a_Socket) // Cannot assert here, this may actually happen legally, since cClientHandle has to clean up the socket and it may have already closed in the meantime // ASSERT(!"Removing an unknown socket"); } +*/ @@ -133,20 +135,13 @@ void cSocketThreads::NotifyWrite(const cCallback * a_Client) -void cSocketThreads::Write(const cSocket * a_Socket, const AString & a_Data) +void cSocketThreads::Write(const cCallback * a_Client, const AString & a_Data) { - // Puts a_Data into outgoing data queue for a_Socket - - if (!a_Socket->IsValid()) - { - // Socket already closed, ignore the request - return; - } - + // Puts a_Data into outgoing data queue for a_Client cCSLock Lock(m_CS); for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) { - if ((*itr)->Write(a_Socket, a_Data)) + if ((*itr)->Write(a_Client, a_Data)) { return; } @@ -181,24 +176,20 @@ void cSocketThreads::StopReading(const cCallback * a_Client) /// Queues the socket for closing, as soon as its outgoing data is sent -void cSocketThreads::QueueClose(const cSocket * a_Socket) +void cSocketThreads::QueueClose(const cCallback * a_Client) { - if (!a_Socket->IsValid()) - { - // Already closed, ignore the request - return; - } + LOGD("QueueClose(client %p)", a_Client); cCSLock Lock(m_CS); for (cSocketThreadList::iterator itr = m_Threads.begin(); itr != m_Threads.end(); ++itr) { - if ((*itr)->QueueClose(a_Socket)) + if ((*itr)->QueueClose(a_Client)) { return; } } // for itr - m_Threads[] - ASSERT(!"Queueing close of an unknown socket"); + ASSERT(!"Queueing close of an unknown client"); } @@ -240,13 +231,15 @@ cSocketThreads::cSocketThread::~cSocketThread() -void cSocketThreads::cSocketThread::AddClient(cSocket * a_Socket, cCallback * a_Client) +void cSocketThreads::cSocketThread::AddClient(const cSocket & a_Socket, cCallback * a_Client) { ASSERT(m_NumSlots < MAX_SLOTS); // Use HasEmptySlot() to check before adding m_Slots[m_NumSlots].m_Client = a_Client; m_Slots[m_NumSlots].m_Socket = a_Socket; m_Slots[m_NumSlots].m_Outgoing.clear(); + m_Slots[m_NumSlots].m_ShouldClose = false; + m_Slots[m_NumSlots].m_ShouldCallClient = true; m_NumSlots++; // Notify the thread of the change: @@ -297,7 +290,7 @@ bool cSocketThreads::cSocketThread::RemoveSocket(const cSocket * a_Socket) for (int i = m_NumSlots - 1; i >= 0 ; --i) { - if (m_Slots[i].m_Socket != a_Socket) + if (m_Slots[i].m_Socket != *a_Socket) { continue; } @@ -339,7 +332,7 @@ bool cSocketThreads::cSocketThread::HasSocket(const cSocket * a_Socket) const { for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket->GetSocket() == a_Socket->GetSocket()) + if (m_Slots[i].m_Socket == *a_Socket) { return true; } @@ -367,12 +360,12 @@ bool cSocketThreads::cSocketThread::NotifyWrite(const cCallback * a_Client) -bool cSocketThreads::cSocketThread::Write(const cSocket * a_Socket, const AString & a_Data) +bool cSocketThreads::cSocketThread::Write(const cCallback * a_Client, const AString & a_Data) { // Returns true if socket handled by this thread for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket == a_Socket) + if (m_Slots[i].m_Client == a_Client) { m_Slots[i].m_Outgoing.append(a_Data); @@ -397,13 +390,7 @@ bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) { if (m_Slots[i].m_Client == a_Client) { - m_Slots[i].m_Client = NULL; - m_Slots[i].m_ShouldClose = false; - - // Notify the thread that there's a stop reading request: - ASSERT(m_ControlSocket2.IsValid()); - m_ControlSocket2.Send("s", 1); - + m_Slots[i].m_ShouldCallClient = false; return true; } } // for i - m_Slots[] @@ -414,14 +401,13 @@ bool cSocketThreads::cSocketThread::StopReading (const cCallback * a_Client) -bool cSocketThreads::cSocketThread::QueueClose(const cSocket * a_Socket) +bool cSocketThreads::cSocketThread::QueueClose(const cCallback * a_Client) { // Returns true if socket handled by this thread for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket == a_Socket) + if (m_Slots[i].m_Client == a_Client) { - ASSERT(m_Slots[i].m_Client == NULL); // Should have stopped reading first m_Slots[i].m_ShouldClose = true; // Notify the thread that there's a close queued (in case its conditions are already met): @@ -561,11 +547,11 @@ void cSocketThreads::cSocketThread::PrepareSet(fd_set * a_Set, cSocket::xSocket cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (!m_Slots[i].m_Socket->IsValid()) + if (!m_Slots[i].m_Socket.IsValid()) { continue; } - cSocket::xSocket s = m_Slots[i].m_Socket->GetSocket(); + cSocket::xSocket s = m_Slots[i].m_Socket.GetSocket(); FD_SET(s, a_Set); if (s > a_Highest) { @@ -593,24 +579,24 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (!FD_ISSET(m_Slots[i].m_Socket->GetSocket(), a_Read)) + if (!FD_ISSET(m_Slots[i].m_Socket.GetSocket(), a_Read)) { continue; } char Buffer[1024]; - int Received = m_Slots[i].m_Socket->Receive(Buffer, ARRAYCOUNT(Buffer), 0); + int Received = m_Slots[i].m_Socket.Receive(Buffer, ARRAYCOUNT(Buffer), 0); if (Received == 0) { // The socket has been closed by the remote party, close our socket and let it be removed after we process all reading - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } } else if (Received > 0) { - if (m_Slots[i].m_Client != NULL) + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->DataReceived(Buffer, Received); } @@ -618,8 +604,8 @@ void cSocketThreads::cSocketThread::ReadFromSockets(fd_set * a_Read) else { // The socket has encountered an error, close it and let it be removed after we process all reading - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } @@ -637,38 +623,41 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - cSocket Socket(*(m_Slots[i].m_Socket)); - if (!Socket.IsValid() || !FD_ISSET(Socket.GetSocket(), a_Write)) + if (!m_Slots[i].m_Socket.IsValid() || !FD_ISSET(m_Slots[i].m_Socket.GetSocket(), a_Write)) { continue; } if (m_Slots[i].m_Outgoing.empty()) { // Request another chunk of outgoing data: - if (m_Slots[i].m_Client != NULL) + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->GetOutgoingData(m_Slots[i].m_Outgoing); } if (m_Slots[i].m_Outgoing.empty()) { // Nothing ready - if ((m_Slots[i].m_Client == NULL) && m_Slots[i].m_ShouldClose) + if (m_Slots[i].m_ShouldClose) { // Socket was queued for closing and there's no more data to send, close it now: - m_Slots[i].m_Socket->CloseSocket(); - m_Slots[i] = m_Slots[--m_NumSlots]; + + // DEBUG + LOGD("Socket was queued for closing, closing now. Slot %d, client %p, socket %d", i, m_Slots[i].m_Client, m_Slots[i].m_Socket.GetSocket()); + + m_Slots[i].m_Socket.CloseSocket(); + // The slot must be freed actively by the client, using RemoveClient() } continue; } } // if (outgoing data is empty) - int Sent = m_Slots[i].m_Socket->Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); + int Sent = m_Slots[i].m_Socket.Send(m_Slots[i].m_Outgoing.data(), m_Slots[i].m_Outgoing.size()); if (Sent < 0) { int Err = cSocket::GetLastError(); - LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket->GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); - m_Slots[i].m_Socket->CloseSocket(); - if (m_Slots[i].m_Client != NULL) + LOGWARNING("Error %d while writing to client \"%s\", disconnecting. \"%s\"", Err, m_Slots[i].m_Socket.GetIPString().c_str(), cSocket::GetErrorString(Err).c_str()); + m_Slots[i].m_Socket.CloseSocket(); + if (m_Slots[i].m_ShouldCallClient) { m_Slots[i].m_Client->SocketClosed(); } @@ -695,12 +684,12 @@ void cSocketThreads::cSocketThread::WriteToSockets(fd_set * a_Write) void cSocketThreads::cSocketThread::RemoveClosedSockets(void) { - // Removes sockets that have closed from m_Slots[] + // Removes sockets that have been queued for closing from m_Slots[] cCSLock Lock(m_Parent->m_CS); for (int i = m_NumSlots - 1; i >= 0; --i) { - if (m_Slots[i].m_Socket->IsValid()) + if (!m_Slots[i].m_ShouldClose || !m_Slots[i].m_Outgoing.empty()) { continue; } diff --git a/source/OSSupport/SocketThreads.h b/source/OSSupport/SocketThreads.h index 4683e0e4d..974270a11 100644 --- a/source/OSSupport/SocketThreads.h +++ b/source/OSSupport/SocketThreads.h @@ -76,25 +76,22 @@ public: ~cSocketThreads(); /// Add a (socket, client) pair for processing, data from a_Socket is to be sent to a_Client; returns true if successful - bool AddClient(cSocket * a_Socket, cCallback * a_Client); + bool AddClient(const cSocket & a_Socket, cCallback * a_Client); - /// Remove the socket (and associated client) from processing - void RemoveClient(const cSocket * a_Socket); - - /// Remove the associated socket and the client from processing + /// Remove the associated socket and the client from processing. The socket is left to send its data and is removed only after all its m_OutgoingData is sent void RemoveClient(const cCallback * a_Client); /// Notify the thread responsible for a_Client that the client has something to write void NotifyWrite(const cCallback * a_Client); - /// Puts a_Data into outgoing data queue for a_Socket - void Write(const cSocket * a_Socket, const AString & a_Data); + /// Puts a_Data into outgoing data queue for a_Client + void Write(const cCallback * a_Client, const AString & a_Data); - /// Stops reading from the socket - when this call returns, no more calls to the callbacks are made + /// Stops reading from the client - when this call returns, no more calls to the callbacks are made void StopReading(const cCallback * a_Client); - /// Queues the socket for closing, as soon as its outgoing data is sent - void QueueClose(const cSocket * a_Socket); + /// Queues the client for closing, as soon as its outgoing data is sent + void QueueClose(const cCallback * a_Client); private: @@ -112,15 +109,15 @@ private: bool HasEmptySlot(void) const {return m_NumSlots < MAX_SLOTS; } bool IsEmpty (void) const {return m_NumSlots == 0; } - void AddClient (cSocket * a_Socket, cCallback * a_Client); + void AddClient (const cSocket & a_Socket, cCallback * a_Client); // Takes ownership of the socket bool RemoveClient(const cCallback * a_Client); // Returns true if removed, false if not found bool RemoveSocket(const cSocket * a_Socket); // Returns true if removed, false if not found bool HasClient (const cCallback * a_Client) const; bool HasSocket (const cSocket * a_Socket) const; bool NotifyWrite (const cCallback * a_Client); // Returns true if client handled by this thread - bool Write (const cSocket * a_Socket, const AString & a_Data); // Returns true if socket handled by this thread + bool Write (const cCallback * a_Client, const AString & a_Data); // Returns true if client handled by this thread bool StopReading (const cCallback * a_Client); // Returns true if client handled by this thread - bool QueueClose (const cSocket * a_Socket); // Returns true if socket handled by this thread + bool QueueClose (const cCallback * a_Client); // Returns true if client handled by this thread bool Start(void); // Hide the cIsThread's Start method, we need to provide our own startup to create the control socket @@ -138,10 +135,11 @@ private: // Manipulation with these assumes that the parent's m_CS is locked struct sSlot { - cSocket * m_Socket; + cSocket m_Socket; // The socket is primarily owned by this cCallback * m_Client; AString m_Outgoing; // If sending writes only partial data, the rest is stored here for another send bool m_ShouldClose; // If true, the socket is to be closed after sending all outgoing data + bool m_ShouldCallClient; // If true, the client callbacks are called. Set to false in StopReading() } ; sSlot m_Slots[MAX_SLOTS]; int m_NumSlots; // Number of slots actually used -- cgit v1.2.3