[squid-dev] [PATCH] Bug 2907: high CPU usage on CONNECT when using delay pools

Amos Jeffries squid3 at treenet.co.nz
Sat Feb 21 06:20:41 UTC 2015


When delay pools are active on a CONNECT tunnel and the pool is drained
the I/O loop cycles very often transferring 1 byte until the pool is
topped-up at the end of the second.

Instead of looping constantly trying to read 1 byte at a time, add an
asynchronous event to wait for a few I/O cycles or until more bytes can
be read.

To protect against infinite loops of waiting when many tunnels are
competing for the pool allowance we only delay for a limited number of
loops before allowing at least 1 byte through. Also, the amount of time
waited is an odd fraction of 1 second so re-tries naturally spread
across any given second fairly, with connections rotating closer or
further from the time when pool topup happens.

Amos
-------------- next part --------------
=== modified file 'src/tunnel.cc'
--- src/tunnel.cc	2015-01-16 16:18:05 +0000
+++ src/tunnel.cc	2015-02-21 03:30:27 +0000
@@ -1,34 +1,35 @@
 /*
  * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 26    Secure Sockets Layer Proxy */
 
 #include "squid.h"
 #include "acl/FilledChecklist.h"
 #include "base/CbcPointer.h"
 #include "CachePeer.h"
+#include "cbdata.h"
 #include "client_side.h"
 #include "client_side_request.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/ConnOpener.h"
 #include "comm/Read.h"
 #include "comm/Write.h"
 #include "errorpage.h"
 #include "fde.h"
 #include "FwdState.h"
 #include "globals.h"
 #include "http.h"
 #include "HttpRequest.h"
 #include "HttpStateFlags.h"
 #include "ip/QosConfig.h"
 #include "LogTags.h"
 #include "MemBuf.h"
 #include "PeerSelectState.h"
 #include "SBuf.h"
 #include "SquidConfig.h"
@@ -98,63 +99,63 @@
     /// Whether we are reading a CONNECT response from a peer.
     bool waitingForConnectResponse() const { return connectRespBuf; }
     /// Whether we are waiting for the CONNECT request/response exchange with the peer.
     bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); }
 
     /// Whether the client sent a CONNECT request to us.
     bool clientExpectsConnectResponse() const {
 #if USE_OPENSSL
         // We are bumping and we had already send "OK CONNECTED"
         if (http.valid() && http->getConn() && http->getConn()->serverBump() && http->getConn()->serverBump()->step > Ssl::bumpStep1)
             return false;
 #endif
         return !(request != NULL &&
                  (request->flags.interceptTproxy || request->flags.intercepted));
     }
 
     class Connection
     {
 
     public:
-        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL) {}
+        Connection() : len (0), buf ((char *)xmalloc(SQUID_TCP_SO_RCVBUF)), size_ptr(NULL), delayedLoops(0) {}
 
         ~Connection();
 
         int bytesWanted(int lower=0, int upper = INT_MAX) const;
         void bytesIn(int const &);
 #if USE_DELAY_POOLS
 
         void setDelayId(DelayId const &);
 #endif
 
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
         /// handles a non-I/O error associated with this Connection
         void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
         int len;
         char *buf;
         int64_t *size_ptr;      /* pointer to size in an ConnStateData for logging */
 
         Comm::ConnectionPointer conn;    ///< The currently connected connection.
-
+        uint8_t delayedLoops; ///< how many times a read on this connection has been postponed.
     private:
 #if USE_DELAY_POOLS
 
         DelayId delayId;
 #endif
 
     };
 
     Connection client, server;
     int *status_ptr;        ///< pointer for logging HTTP status
     LogTags *logTag_ptr;    ///< pointer for logging Squid processing code
     MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
     bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
     SBuf preReadClientData;
 
     void copyRead(Connection &from, IOCB *completion);
 
     /// continue to set up connection to a peer, going async for SSL peers
     void connectToPeer();
 
@@ -314,40 +315,41 @@
         return 3;
 
     return 1;
 }
 
 /* Read from server side and queue it for writing to the client */
 void
 TunnelStateData::ReadServer(const Comm::ConnectionPointer &c, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert(cbdataReferenceValid(tunnelState));
     debugs(26, 3, HERE << c);
 
     tunnelState->readServer(buf, len, errcode, xerrno);
 }
 
 void
 TunnelStateData::readServer(char *, size_t len, Comm::Flag errcode, int xerrno)
 {
     debugs(26, 3, HERE << server.conn << ", read " << len << " bytes, err=" << errcode);
+    server.delayedLoops=0;
 
     /*
      * Bail out early on Comm::ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == Comm::ERR_CLOSING)
         return;
 
     if (len > 0) {
         server.bytesIn(len);
         kb_incr(&(statCounter.server.all.kbytes_in), len);
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, server, client))
         copy(len, server, client, WriteClientDone);
 }
 
 /// Called when we read [a part of] CONNECT response from the peer
@@ -459,40 +461,41 @@
     debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror());
 
     if (!ignoreErrno(xerrno))
         conn->close();
 }
 
 /* Read from client side and queue it for writing to the server */
 void
 TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, Comm::Flag errcode, int xerrno, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     assert (cbdataReferenceValid (tunnelState));
 
     tunnelState->readClient(buf, len, errcode, xerrno);
 }
 
 void
 TunnelStateData::readClient(char *, size_t len, Comm::Flag errcode, int xerrno)
 {
     debugs(26, 3, HERE << client.conn << ", read " << len << " bytes, err=" << errcode);
+    client.delayedLoops=0;
 
     /*
      * Bail out early on Comm::ERR_CLOSING
      * - close handlers will tidy up for us
      */
 
     if (errcode == Comm::ERR_CLOSING)
         return;
 
     if (len > 0) {
         client.bytesIn(len);
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
     if (keepGoingAfterRead(len, errcode, xerrno, client, server))
         copy(len, client, server, WriteServerDone);
 }
 
 /// Updates state after reading from client or server.
 /// Returns whether the caller should use the data just read.
@@ -659,47 +662,87 @@
 
 static void
 tunnelTimeout(const CommTimeoutCbParams &io)
 {
     TunnelStateData *tunnelState = static_cast<TunnelStateData *>(io.data);
     debugs(26, 3, HERE << io.conn);
     /* Temporary lock to protect our own feets (comm_close -> tunnelClientClosed -> Free) */
     CbcPointer<TunnelStateData> safetyLock(tunnelState);
 
     tunnelState->client.closeIfOpen();
     tunnelState->server.closeIfOpen();
 }
 
 void
 TunnelStateData::Connection::closeIfOpen()
 {
     if (Comm::IsConnOpen(conn))
         conn->close();
 }
 
+static void
+tunnelDelayedClientRead(void *data)
+{
+    if (!data)
+        return;
+    TunnelStateData *tunnel = NULL;
+    static_cast<generic_cbdata*>(data)->unwrap(&tunnel);
+    if (!tunnel)
+        return;
+    static uint64_t counter=0;
+    debugs(26, 0, "Client read(2) delayed " << ++counter << " times");
+    tunnel->copyRead(tunnel->client, TunnelStateData::ReadClient);
+}
+
+static void
+tunnelDelayedServerRead(void *data)
+{
+    if (!data)
+        return;
+    TunnelStateData *tunnel = NULL;
+    static_cast<generic_cbdata*>(data)->unwrap(&tunnel);
+    if (!tunnel)
+        return;
+    static uint64_t counter=0;
+    debugs(26, 0, "Server read(2) delayed " << ++counter << " times");
+    tunnel->copyRead(tunnel->server, TunnelStateData::ReadServer);
+}
+
 void
 TunnelStateData::copyRead(Connection &from, IOCB *completion)
 {
     assert(from.len == 0);
+    // If only the minimum permitted read size is going to be attempted
+    // then we schedule an event to try again in a few I/O cycles.
+    // Allow at least 1 byte to be read every (0.3*10) seconds.
+    int bw = from.bytesWanted(1, SQUID_TCP_SO_RCVBUF);
+    if (bw == 1 && ++from.delayedLoops < 10) {
+        if (completion == TunnelStateData::ReadServer)
+            eventAdd("tunnelDelayedServerRead", &tunnelDelayedServerRead, new generic_cbdata(this), 0.3, true);
+        else
+            eventAdd("tunnelDelayedClientRead", &tunnelDelayedClientRead, new generic_cbdata(this), 0.3, true);
+        return;
+    }
+
     AsyncCall::Pointer call = commCbCall(5,4, "TunnelBlindCopyReadHandler",
                                          CommIoCbPtrFun(completion, this));
-    comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
+    comm_read(from.conn, from.buf, bw, call);
 }
 
 void
 TunnelStateData::readConnectResponse()
 {
     assert(waitingForConnectResponse());
 
     AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
                                          CommIoCbPtrFun(ReadConnectResponseDone, this));
     comm_read(server.conn, connectRespBuf->space(),
               server.bytesWanted(1, connectRespBuf->spaceSize()), call);
 }
 
 void
 TunnelStateData::copyClientBytes()
 {
     if (preReadClientData.length()) {
         size_t copyBytes = preReadClientData.length() > SQUID_TCP_SO_RCVBUF ? SQUID_TCP_SO_RCVBUF : preReadClientData.length();
         memcpy(client.buf, preReadClientData.rawContent(), copyBytes);
         preReadClientData.consume(copyBytes);



More information about the squid-dev mailing list