[squid-dev] [PATCH] Bug 2907: high CPU usage on CONNECT when using delay pools

Amos Jeffries squid3 at treenet.co.nz
Mon Feb 23 10:53:38 UTC 2015


On 21/02/2015 7:20 p.m., Amos Jeffries wrote:
> When delay pools are active on a CONNECT tunnel and the pool is drained
> the I/O loop cycles very often transferring 1 byte until the pool is
> topped-up at the end of the second.
> 
> Instead of looping constantly trying to read 1 byte at a time, add an
> asynchronous event to wait for a few I/O cycles or until more bytes can
> be read.
> 
> To protect against infinite loops of waiting when many tunnels are
> competing for the pool allowance we only delay for a limited number of
> loops before allowing at least 1 byte through. Also, the amount of time
> waited is an odd fraction of 1 second so re-tries naturally spread
> across any given second fairly, with connections rotating closer or
> further from the time when pool topup happens.

Updated patch attached.

The initial version was using generic_cbdata wrappers needlessly and at
high speeds the event queue could schedule a double-read on an FD.
Using the TunnelStateData directly as the CBDATA parameter and evicting
events from the queue when the state object is destructed prevents the
bad event occurances.

This has now had some (about an hour) production use in a high volume
high performance environment. The initial re-try behaviour still allows
some variance in service times, but overall the CPU consumption and (as
a result) total proxy speed appears to be much improved.

If there are no objections (and no problems in the testing environment)
I will apply this in approx. 3 days.

Amos


-------------- next part --------------
=== modified file 'src/tunnel.cc'
--- src/tunnel.cc	2015-01-16 16:18:05 +0000
+++ src/tunnel.cc	2015-02-23 07:48:39 +0000
@@ -1,34 +1,35 @@
 /*
  * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 26    Secure Sockets Layer Proxy */
 
 #include "squid.h"
 #include "acl/FilledChecklist.h"
 #include "base/CbcPointer.h"
 #include "CachePeer.h"
+#include "cbdata.h"
 #include "client_side.h"
 #include "client_side_request.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
 #include "comm/Read.h"
 #include "comm/Write.h"
 #include "errorpage.h"
 #include "fde.h"
 #include "FwdState.h"
 #include "globals.h"
 #include "http.h"
 #include "HttpRequest.h"
 #include "HttpStateFlags.h"
 #include "ip/QosConfig.h"
 #include "LogTags.h"
 #include "MemBuf.h"
 #include "PeerSelectState.h"
 #include "SBuf.h"
 #include "SquidConfig.h"
@@ -98,63 +99,68 @@
     /// Whether we are reading a CONNECT response from a peer.
     bool waitingForConnectResponse() const { return connectRespBuf; }
     /// Whether we are waiting for the CONNECT request/response exchange with the peer.
     bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); }
 
     /// Whether the client sent a CONNECT request to us.
     bool clientExpectsConnectResponse() const {
 #if USE_OPENSSL
         // We are bumping and we had already send "OK CONNECTED"
         if (http.valid() && http->getConn() && http->getConn()->serverBump() && http->getConn()->serverBump()->step > Ssl::bumpStep1)
             return false;
 #endif
         return !(request != NULL &&
                  (request->flags.interceptTproxy || request->flags.intercepted));
     }
 
     class Connection
     {
 
     public:
-        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {}
+        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL), delayedLoops(0),
+                       readPending(NULL), readPendingFunc(NULL) {}
 
         ~Connection();
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
         void bytesIn(int const &);
 #if USE_DELAY_POOLS
 
         void setDelayId(DelayId const &);
 #endif
 
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
         /// handles a non-I/O error associated with this Connection
         void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
         int len;
         char *buf;
         int64_t *size_ptr;      /* pointer to size in an ConnStateData for logging */
 
         Comm::ConnectionPointer conn;    ///< The currently connected connection.
+        uint8_t delayedLoops; ///< how many times a read on this connection has been postponed.
 
+        // XXX: make these an AsyncCall when event API can handle them
+        TunnelStateData *readPending;
+        EVH *readPendingFunc;
     private:
 #if USE_DELAY_POOLS
 
         DelayId delayId;
 #endif
 
     };
 
     Connection client, server;
     int *status_ptr;        ///< pointer for logging HTTP status
     LogTags *logTag_ptr;    ///< pointer for logging Squid processing code
     MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
     bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
     SBuf preReadClientData;
 
     void copyRead(Connection &from, IOCB *completion);
 
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
@@ -193,40 +199,42 @@
     void copy(size_t len, Connection &from, Connection &to, IOCB *);
     void handleConnectResponse(const size_t chunkSize);
     void readServer(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void readClient(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void writeClientDone(char *buf, size_t len, Comm::Flag flag, int xerrno);
     void writeServerDone(char *buf, size_t len, Comm::Flag flag, int xerrno);
 
     static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data);
     void readConnectResponseDone(char *buf, size_t len, Comm::Flag errcode, int xerrno);
     void copyClientBytes();
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
 
 static CNCB tunnelConnectDone;
 static ERCB tunnelErrorComplete;
 static CLCB tunnelServerClosed;
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
+static EVH tunnelDelayedClientRead;
+static EVH tunnelDelayedServerRead;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
 
     if (tunnelState->request != NULL)
         tunnelState->request->hier.stopPeerClock(false);
 
     if (tunnelState->noConnections()) {
         delete tunnelState;
         return;
     }
 
     if (!tunnelState->server.len) {
         tunnelState->client.conn->close();
@@ -245,53 +253,58 @@
         delete tunnelState;
         return;
     }
 
     if (!tunnelState->client.len) {
         tunnelState->server.conn->close();
         return;
     }
 }
 
 TunnelStateData::TunnelStateData() :
     url(NULL),
     http(),
     request(NULL),
     status_ptr(NULL),
     logTag_ptr(NULL),
     connectRespBuf(NULL),
     connectReqWriting(false)
 {
     debugs(26, 3, "TunnelStateData constructed this=" << this);
+    client.readPendingFunc = &tunnelDelayedClientRead;
+    server.readPendingFunc = &tunnelDelayedServerRead;
 }
 
 TunnelStateData::~TunnelStateData()
 {
     debugs(26, 3, "TunnelStateData destructed this=" << this);
     assert(noConnections());
     xfree(url);
     serverDestinations.clear();
     delete connectRespBuf;
 }
 
 TunnelStateData::Connection::~Connection()
 {
+    if (readPending)
+        eventDelete(readPendingFunc, readPending);
+
     safe_free(buf);
 }
 
 int
 TunnelStateData::Connection::bytesWanted(int lowerbound, int upperbound) const
 {
 #if USE_DELAY_POOLS
     return delayId.bytesWanted(lowerbound, upperbound);
 #else
 
     return upperbound;
 #endif
 }
 
 void
 TunnelStateData::Connection::bytesIn(int const &count)
 {
     debugs(26, 3, HERE << "len=" << len << " + count=" << count);
 #if USE_DELAY_POOLS
     delayId.bytesIn(count);
@@ -314,40 +327,41 @@
         return 3;
 
     return 1;
 }
 
 /* Read from server side and queue it for writing to the client */
 void
 TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert(cbdataReferenceValid(tunnelState));
     debugs(26, 3, HERE << c);
 
     tunnelState->readServer(buf, len, errcode, xerrno);
 }
 
 void
 TunnelStateData::readServer(char *, size_t len, Comm::Flag errcode, int xerrno)
 {
     debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode);
+    server.delayedLoops=0;
 
     /*
      * Bail out early on Comm::ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == Comm::ERR_CLOSING)
         return;
 
     if (len > 0) {
         server.bytesIn(len);
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
         copy(len, server, client, WriteClientDone);
 }
 
 /// Called when we read [a part of] CONNECT response from the peer
@@ -459,40 +473,41 @@
     debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror());
 
     if (!ignoreErrno(xerrno))
         conn->close();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->readClient(buf, len, errcode, xerrno);
 }
 
 void
 TunnelStateData::readClient(char *, size_t len, Comm::Flag errcode, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode);
+    client.delayedLoops=0;
 
     /*
      * Bail out early on Comm::ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == Comm::ERR_CLOSING)
         return;
 
     if (len > 0) {
         client.bytesIn(len);
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, client, server))
         copy(len, client, server, WriteServerDone);
 }
 
 /// Updates state after reading from client or server.
 /// Returns whether the caller should use the data just read.
@@ -659,47 +674,85 @@
 
 static void
 tunnelTimeout(const CommTimeoutCbParams &io)
 {
     TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data);
     debugs(26, 3, HERE << io.conn);
     /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     tunnelState->client.closeIfOpen();
     tunnelState->server.closeIfOpen();
 }
 
 void
 TunnelStateData::Connection::closeIfOpen()
 {
     if (Comm::IsConnOpen(conn))
         conn->close();
 }
 
+static void
+tunnelDelayedClientRead(void *data)
+{
+    if (!data)
+        return;
+    TunnelStateData *tunnel = static_cast<TunnelStateData*>(data);
+    if (!tunnel)
+        return;
+    tunnel->client.readPending = NULL;
+    static uint64_t counter=0;
+    debugs(26, 7, "Client read(2) delayed " << ++counter << " times");
+    tunnel->copyRead(tunnel->client, TunnelStateData::ReadClient);
+}
+
+static void
+tunnelDelayedServerRead(void *data)
+{
+    if (!data)
+        return;
+    TunnelStateData *tunnel = static_cast<TunnelStateData*>(data);
+    if (!tunnel)
+        return;
+    tunnel->server.readPending = NULL;
+    static uint64_t counter=0;
+    debugs(26, 7, "Server read(2) delayed " << ++counter << " times");
+    tunnel->copyRead(tunnel->server, TunnelStateData::ReadServer);
+}
+
 void
 TunnelStateData::copyRead(Connection &from, IOCB *completion)
 {
     assert(from.len == 0);
+    // If only the minimum permitted read size is going to be attempted
+    // then we schedule an event to try again in a few I/O cycles.
+    // Allow at least 1 byte to be read every (0.3*10) seconds.
+    int bw = from.bytesWanted(1, SQUID_TCP_SO_RCVBUF);
+    if (bw == 1 && ++from.delayedLoops < 10) {
+        from.readPending = this;
+        eventAdd("tunnelDelayedServerRead", from.readPendingFunc, from.readPending, 0.3, true);
+        return;
+    }
+
     AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
                                          CommIoCbPtrFun(completion, this));
-    comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
+    comm_read(from.conn, from.buf, bw, call);
 }
 
 void
 TunnelStateData::readConnectResponse()
 {
     assert(waitingForConnectResponse());
 
     AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
                                          CommIoCbPtrFun(ReadConnectResponseDone, this));
     comm_read(server.conn, connectRespBuf->space(),
               server.bytesWanted(1, connectRespBuf->spaceSize()), call);
 }
 
 void
 TunnelStateData::copyClientBytes()
 {
     if (preReadClientData.length()) {
         size_t copyBytes = preReadClientData.length() > SQUID_TCP_SO_RCVBUF ? SQUID_TCP_SO_RCVBUF : preReadClientData.length();
         memcpy(client.buf, preReadClientData.rawContent(), copyBytes);
         preReadClientData.consume(copyBytes);



More information about the squid-dev mailing list