[squid-dev] [PATCH] refactor ConnStateData pipeline handling

Amos Jeffries squid3 at treenet.co.nz
Tue Nov 17 10:56:08 UTC 2015


Updated patch attached. AFAICS this contains all the previously
requested changes.

To recap:

This refactors the request pipeline management API to use std::list
instead of a custom linked-list with accessors spread over both
ConnStateData and ClientSocketContext.

To do this a new class Pipeline is created with methods wrapping
std::list API and extending it slightly to meet the HTTP/1.1 pipeline
behaviours and perform basic stats gathering. The pipeline management
methods and state variables are moved inside this class.

ClientSocketContext was performing several layering violations in
relation to ConnStateData when one transaction ended and the next needed
starting. Treating the pipeline properly as a std::list forced removal
of that violation.

* actions for starting or resuming a transaction on the connection are
now moved to ConnStateData::kick(). Which gets called after each
transaction completes.
 - with some further cleanup it can be called at any point the
ConnStateData needs to resume processing. However, that is left out of
scope for this patch.

* the ClientSocketContext scope now ends when the finished() method is
used to mark completion of these contexts transactions. Which will mark
itself done and de-register from the Pipeline queue. The ConnStateData
kick() method still needs to be called to resume other transactions
processing.

* the queue is now holding RefCounted Pointers. So that the
ClientSocketContext destructor no longer needs to be careful of
registrations, and the queue entries are guaranteed to still exist while
queued.

* The old freeAllContexts() and notifyAllContexts(int) members of
ConnStateData have been combined into Pipeline::terminateAll(int).



The ClientSocketContext and ConnStateData documentation is updated to
describe what they do in regards to connection and transaction processing.


Initial testing revealed CONNECT tunnels always being logged as ABORTED.
This turns out to be techincally correct, since the only way a tunnel
can finish is for client or server to just close the connection.
However, it is not right to log these as abnormal aborts. Instead, I
have now made the context be finished() just prior to the
TunnelStateData being destroyed. That way normal closure should show up
only as TUNNEL, but timeouts and I/O errors should still be recorded as
abnormal.

Potential BUGS:

* The on_unsupported_protocol handling function appears to be a bit
broken. It pop()'s contexts off the pipeline directly without going
through the proper finished() process to release their state data. I
have highlighted that with an XXX and comment.

* The ssl-bump handling logic begins with a terminateAll(0) begin run on
all active contexts. It does not check whether there is any existing
pipeline of requests waiting to be processed. And the action prematurely
purges the CONNECT message context, even though it may be a) still
useful for splice to resume as a blind tunnel, and b) is actually still
being processed (albeit as bumping).


Neither of those are new breakage though, so markgin wit XXX and leaving
for a followup when the SSL handling logis get cleaned up.


Amos

-------------- next part --------------
=== modified file 'src/Makefile.am'
--- src/Makefile.am	2015-11-08 15:09:16 +0000
+++ src/Makefile.am	2015-11-15 09:12:02 +0000
@@ -394,40 +394,42 @@
 	Notes.h \
 	Notes.cc \
 	Parsing.cc \
 	Parsing.h \
 	$(XPROF_STATS_SOURCE) \
 	pconn.cc \
 	pconn.h \
 	PeerDigest.h \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
 	PeerPoolMgr.h \
 	PeerPoolMgr.cc \
 	PeerSelectState.h \
 	PingData.h \
+	Pipeline.cc \
+	Pipeline.h \
 	protos.h \
 	redirect.h \
 	redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	RemovalPolicy.h \
 	send-announce.h \
 	send-announce.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	SBufDetailedStats.cc \
 	SBufStatsAction.h \
 	SBufStatsAction.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	SquidNew.cc \
@@ -1362,40 +1364,42 @@
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.cc \
 	Notes.h \
 	Parsing.cc \
 	pconn.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
 	PeerPoolMgr.h \
 	PeerPoolMgr.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	redirect.h \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
 	StrList.h \
@@ -1801,40 +1805,42 @@
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	multicast.h \
 	multicast.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.cc \
 	Notes.h \
 	Parsing.cc \
 	pconn.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	redirect.h \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	StrList.h \
 	StrList.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.cc \
 	SquidMath.h \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
@@ -2046,40 +2052,42 @@
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	multicast.h \
 	multicast.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.cc \
 	Notes.h \
 	Parsing.cc \
 	pconn.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	RemovalPolicy.cc \
 	redirect.h \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
 	StatHist.cc \
@@ -2286,40 +2294,42 @@
 	mem_node.cc \
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	multicast.h \
 	multicast.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.cc \
 	Notes.h \
 	Parsing.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	pconn.cc \
 	redirect.h \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
@@ -2614,40 +2624,42 @@
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.cc \
 	Notes.h \
 	Parsing.cc \
 	pconn.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
 	PeerPoolMgr.h \
 	PeerPoolMgr.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	redirect.h \
 	tests/stub_libauth_acls.cc \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
@@ -3475,40 +3487,42 @@
 	MemBuf.cc \
 	MemObject.cc \
 	mime.h \
 	mime.cc \
 	mime_header.h \
 	mime_header.cc \
 	neighbors.h \
 	neighbors.cc \
 	Notes.h \
 	Notes.cc \
 	Parsing.cc \
 	pconn.cc \
 	peer_digest.cc \
 	peer_proxy_negotiate_auth.h \
 	peer_proxy_negotiate_auth.cc \
 	peer_select.cc \
 	peer_sourcehash.h \
 	peer_sourcehash.cc \
 	peer_userhash.h \
 	peer_userhash.cc \
+	Pipeline.cc \
+	Pipeline.h \
 	redirect.h \
 	tests/stub_redirect.cc \
 	refresh.h \
 	refresh.cc \
 	RemovalPolicy.cc \
 	$(SBUF_SOURCE) \
 	SBufAlgos.h \
 	SBufAlgos.cc \
 	SBufDetailedStats.h \
 	tests/stub_SBufDetailedStats.cc \
 	$(SNMP_SOURCE) \
 	SquidMath.h \
 	SquidMath.cc \
 	IoStats.h \
 	stat.h \
 	stat.cc \
 	StatCounters.h \
 	StatCounters.cc \
 	StatHist.h \
 	tests/stub_StatHist.cc \

=== added file 'src/Pipeline.cc'
--- src/Pipeline.cc	1970-01-01 00:00:00 +0000
+++ src/Pipeline.cc	2015-11-17 06:06:41 +0000
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+/*
+ * DEBUG: section 33    Client Request Pipeline
+ */
+#include "squid.h"
+#include "client_side.h"
+#include "Debug.h"
+#include "Pipeline.h"
+
+void
+Pipeline::add(const ClientSocketContextPointer &c)
+{
+    requests.push_back(c);
+    ++nrequests;
+    debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c);
+}
+
+ClientSocketContextPointer
+Pipeline::front() const
+{
+    if (requests.empty()) {
+        debugs(33, 3, "Pipeline " << (void*)this << " empty");
+        return ClientSocketContextPointer();
+    }
+
+    debugs(33, 3, "Pipeline " << (void*)this << " front " << requests.front());
+    return requests.front();
+}
+
+void
+Pipeline::terminateAll(int xerrno)
+{
+    while (!requests.empty()) {
+        ClientSocketContextPointer context = requests.front();
+        debugs(33, 3, "Pipeline " << (void*)this << " notify(" << xerrno << ") " << context);
+        context->noteIoError(xerrno);
+        context->finished();  // cleanup and self-deregister
+        assert(context != requests.front());
+    }
+}
+
+void
+Pipeline::popMe(const ClientSocketContextPointer &which)
+{
+    if (requests.empty())
+        return;
+
+    debugs(33, 3, "Pipeline " << (void*)this << " drop " << requests.front());
+    // in reality there may be multiple contexts doing processing in parallel.
+    // XXX: pipeline still assumes HTTP/1 FIFO semantics are obeyed.
+    assert(which == requests.front());
+    requests.pop_front();
+}
+

=== added file 'src/Pipeline.h'
--- src/Pipeline.h	1970-01-01 00:00:00 +0000
+++ src/Pipeline.h	2015-11-17 06:58:01 +0000
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#ifndef SQUID_SRC_PIPELINE_H
+#define SQUID_SRC_PIPELINE_H
+
+#include "base/RefCount.h"
+
+#include <list>
+
+class ClientSocketContext;
+typedef RefCount<ClientSocketContext> ClientSocketContextPointer;
+
+/**
+ * A queue of transactions awaiting completion.
+ *
+ * Transactions in the queue may be fully processed, but not yet delivered,
+ * or only partially processed.
+ *
+ * - HTTP/1 pipelined requests can be processed out of order but
+ *   responses MUST be written to the client in-order.
+ *   The front() context is for the response writing transaction.
+ *   The back context may still be reading a request payload/body.
+ *   Other contexts are in deferred I/O state, but may be accumulating
+ *   payload/body data to be written later.
+ *
+ * - HTTP/2 multiplexed streams can be processed and delivered in any order.
+ *
+ * For consistency we treat the pipeline as a FIFO queue in both cases.
+ */
+class Pipeline
+{
+    Pipeline(const Pipeline &) = delete;
+    Pipeline & operator =(const Pipeline &) = delete;
+
+public:
+    Pipeline() : nrequests(0) {}
+    ~Pipeline() = default;
+
+    /// register a new request context to the pipeline
+    void add(const ClientSocketContextPointer &);
+
+    /// get the first request context in the pipeline
+    ClientSocketContextPointer front() const;
+
+    /// how many requests are currently pipelined
+    size_t count() const {return requests.size();}
+
+    /// whether there are none or any requests currently pipelined
+    bool empty() const {return requests.empty();}
+
+    /// tell everybody about the err, and abort all waiting requests
+    void terminateAll(const int xerrno);
+
+    /// deregister the front request from the pipeline
+    void popMe(const ClientSocketContextPointer &);
+
+    /// Number of requests seen in this pipeline (so far).
+    /// Includes incomplete transactions.
+    uint32_t nrequests;
+
+private:
+    /// requests parsed from the connection but not yet completed.
+    std::list<ClientSocketContextPointer> requests;
+};
+
+#endif /* SQUID_SRC_PIPELINE_H */
+

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2015-11-09 18:12:10 +0000
+++ src/client_side.cc	2015-11-17 08:13:04 +0000
@@ -35,41 +35,41 @@
  * Tell client_side_reply that we *want* an error page before any
  * stream calls occur. Then we simply read as normal.
  *
  *
  \section pconn_logic Persistent connection logic:
  *
  \par
  * requests (httpClientRequest structs) get added to the connection
  * list, with the current one being chr
  *
  \par
  * The request is *immediately* kicked off, and data flows through
  * to clientSocketRecipient.
  *
  \par
  * If the data that arrives at clientSocketRecipient is not for the current
  * request, clientSocketRecipient simply returns, without requesting more
  * data, or sending it.
  *
  \par
- * ClientKeepAliveNextRequest will then detect the presence of data in
+ * ConnStateData::kick() will then detect the presence of data in
  * the next ClientHttpRequest, and will send it, restablishing the
  * data flow.
  */
 
 #include "squid.h"
 #include "acl/FilledChecklist.h"
 #include "anyp/PortCfg.h"
 #include "base/Subscription.h"
 #include "base/TextException.h"
 #include "CachePeer.h"
 #include "client_db.h"
 #include "client_side.h"
 #include "client_side_reply.h"
 #include "client_side_request.h"
 #include "ClientRequestContext.h"
 #include "clientStream.h"
 #include "comm.h"
 #include "comm/Connection.h"
 #include "comm/Loops.h"
 #include "comm/Read.h"
@@ -203,117 +203,87 @@
 ClientSocketContext::getTail() const
 {
     if (http->client_stream.tail)
         return (clientStreamNode *)http->client_stream.tail->data;
 
     return NULL;
 }
 
 clientStreamNode *
 ClientSocketContext::getClientReplyContext() const
 {
     return (clientStreamNode *)http->client_stream.tail->prev->data;
 }
 
 ConnStateData *
 ClientSocketContext::getConn() const
 {
     return http->getConn();
 }
 
-void
-ClientSocketContext::removeFromConnectionList(ConnStateData * conn)
-{
-    ClientSocketContext::Pointer *tempContextPointer;
-    assert(conn != NULL && cbdataReferenceValid(conn));
-    assert(conn->getCurrentContext() != NULL);
-    /* Unlink us from the connection request list */
-    tempContextPointer = & conn->currentobject;
-
-    while (tempContextPointer->getRaw()) {
-        if (*tempContextPointer == this)
-            break;
-
-        tempContextPointer = &(*tempContextPointer)->next;
-    }
-
-    assert(tempContextPointer->getRaw() != NULL);
-    *tempContextPointer = next;
-    next = NULL;
-}
-
 ClientSocketContext::~ClientSocketContext()
 {
     clientStreamNode *node = getTail();
 
     if (node) {
         ClientSocketContext *streamContext = dynamic_cast<ClientSocketContext *> (node->data.getRaw());
 
         if (streamContext) {
             /* We are *always* the tail - prevent recursive free */
             assert(this == streamContext);
             node->data = NULL;
         }
     }
 
-    if (connRegistered_)
-        deRegisterWithConn();
-
     httpRequestFree(http);
-
-    /* clean up connection links to us */
-    assert(this != next.getRaw());
 }
 
 void
 ClientSocketContext::registerWithConn()
 {
     assert (!connRegistered_);
     assert (http);
     assert (http->getConn() != NULL);
     connRegistered_ = true;
-    http->getConn()->addContextToQueue(this);
-}
-
-void
-ClientSocketContext::deRegisterWithConn()
-{
-    assert (connRegistered_);
-    removeFromConnectionList(http->getConn());
-    connRegistered_ = false;
+    http->getConn()->pipeline.add(ClientSocketContext::Pointer(this));
 }
 
 void
-ClientSocketContext::connIsFinished()
+ClientSocketContext::finished()
 {
     assert (http);
     assert (http->getConn() != NULL);
-    deRegisterWithConn();
+    ConnStateData *conn = http->getConn();
+
     /* we can't handle any more stream data - detach */
     clientStreamDetach(getTail(), http);
+
+    assert(connRegistered_);
+    connRegistered_ = false;
+    assert(conn->pipeline.front() == this); // XXX: still assumes HTTP/1 semantics
+    conn->pipeline.popMe(ClientSocketContext::Pointer(this));
 }
 
 ClientSocketContext::ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
     clientConnection(aConn),
     http(aReq),
     reply(NULL),
-    next(NULL),
     writtenToSocket(0),
     mayUseConnection_ (false),
     connRegistered_ (false)
 {
     assert(http != NULL);
     memset (reqbuf, '\0', sizeof (reqbuf));
     flags.deferred = 0;
     flags.parsed_ok = 0;
     deferredparams.node = NULL;
     deferredparams.rep = NULL;
 }
 
 void
 ClientSocketContext::writeControlMsg(HttpControlMsg &msg)
 {
     HttpReply::Pointer rep(msg.reply);
     Must(rep != NULL);
 
     // remember the callback
     cbControlMsgSent = msg.cbSuccess;
@@ -641,77 +611,40 @@
 ClientHttpRequest::freeResources()
 {
     safe_free(uri);
     safe_free(log_uri);
     safe_free(redirect.location);
     range_iter.boundary.clean();
     HTTPMSGUNLOCK(request);
 
     if (client_stream.tail)
         clientStreamAbort((clientStreamNode *)client_stream.tail->data, this);
 }
 
 void
 httpRequestFree(void *data)
 {
     ClientHttpRequest *http = (ClientHttpRequest *)data;
     assert(http != NULL);
     delete http;
 }
 
-bool
-ConnStateData::areAllContextsForThisConnection() const
-{
-    ClientSocketContext::Pointer context = getCurrentContext();
-
-    while (context.getRaw()) {
-        if (context->http->getConn() != this)
-            return false;
-
-        context = context->next;
-    }
-
-    return true;
-}
-
-void
-ConnStateData::freeAllContexts()
-{
-    ClientSocketContext::Pointer context;
-
-    while ((context = getCurrentContext()).getRaw() != NULL) {
-        assert(getCurrentContext() !=
-               getCurrentContext()->next);
-        context->connIsFinished();
-        assert (context != currentobject);
-    }
-}
-
-/// propagates abort event to all contexts
-void
-ConnStateData::notifyAllContexts(int xerrno)
-{
-    typedef ClientSocketContext::Pointer CSCP;
-    for (CSCP c = getCurrentContext(); c.getRaw(); c = c->next)
-        c->noteIoError(xerrno);
-}
-
 /* This is a handler normally called by comm_close() */
 void ConnStateData::connStateClosed(const CommCloseCbParams &)
 {
     deleteThis("ConnStateData::connStateClosed");
 }
 
 #if USE_AUTH
 void
 ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *by)
 {
     if (auth_ == NULL) {
         if (aur != NULL) {
             debugs(33, 2, "Adding connection-auth to " << clientConnection << " from " << by);
             auth_ = aur;
         }
         return;
     }
 
     // clobered with self-pointer
     // NP: something nasty is going on in Squid, but harmless.
@@ -773,42 +706,41 @@
         auth_->releaseAuthServer();
         auth_ = NULL;
         // this is a fatal type of problem.
         // Close the connection immediately with TCP RST to abort all traffic flow
         comm_reset_close(clientConnection);
         return;
     }
 
     /* NOT REACHABLE */
 }
 #endif
 
 // cleans up before destructor is called
 void
 ConnStateData::swanSong()
 {
     debugs(33, 2, HERE << clientConnection);
     flags.readMore = false;
     DeregisterRunner(this);
     clientdbEstablished(clientConnection->remote, -1);  /* decrement */
-    assert(areAllContextsForThisConnection());
-    freeAllContexts();
+    pipeline.terminateAll(0);
 
     unpinConnection(true);
 
     Server::swanSong(); // closes the client connection
 
 #if USE_AUTH
     // NP: do this bit after closing the connections to avoid side effects from unwanted TCP RST
     setAuth(NULL, "ConnStateData::SwanSong cleanup");
 #endif
 
     flags.swanSang = true;
 }
 
 bool
 ConnStateData::isOpen() const
 {
     return cbdataReferenceValid(this) && // XXX: checking "this" in a method
            Comm::IsConnOpen(clientConnection) &&
            !fd_table[clientConnection->fd].closing();
 }
@@ -869,47 +801,40 @@
         return (r->content_length <= 0 || Config.onoff.request_entities);
 
     default:
         /* For other types of requests we don't care */
         return 1;
     }
 
     /* NOT REACHED */
 }
 
 int
 clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength)
 {
     if (Config.maxRequestBodySize &&
             bodyLength > Config.maxRequestBodySize)
         return 1;       /* too large */
 
     return 0;
 }
 
-// careful: the "current" context may be gone if we wrote an early response
-ClientSocketContext::Pointer
-ConnStateData::getCurrentContext() const
-{
-    return currentobject;
-}
-
 void
 ClientSocketContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData)
 {
     debugs(33, 2, "clientSocketRecipient: Deferring request " << http->uri);
     assert(flags.deferred == 0);
     flags.deferred = 1;
     deferredparams.node = node;
     deferredparams.rep = rep;
     deferredparams.queuedBuffer = receivedData;
     return;
 }
 
 bool
 ClientSocketContext::startOfOutput() const
 {
     return http->out.size == 0;
 }
 
 size_t
 ClientSocketContext::lengthToSend(Range<int64_t> const &available)
@@ -1402,41 +1327,42 @@
 {
     // dont tryt to deliver if client already ABORTED
     if (!http->getConn() || !cbdataReferenceValid(http->getConn()) || !Comm::IsConnOpen(http->getConn()->clientConnection))
         return;
 
     /* Test preconditions */
     assert(node != NULL);
     PROF_start(clientSocketRecipient);
     /* TODO: handle this rather than asserting
      * - it should only ever happen if we cause an abort and
      * the callback chain loops back to here, so we can simply return.
      * However, that itself shouldn't happen, so it stays as an assert for now.
      */
     assert(cbdataReferenceValid(node));
     assert(node->node.next == NULL);
     ClientSocketContext::Pointer context = dynamic_cast<ClientSocketContext *>(node->data.getRaw());
     assert(context != NULL);
 
     /* TODO: check offset is what we asked for */
 
-    if (context != http->getConn()->getCurrentContext())
+    // TODO: enforces HTTP/1 MUST on pipeline order, but is irrelevant to HTTP/2
+    if (context != http->getConn()->pipeline.front())
         context->deferRecipientForLater(node, rep, receivedData);
     else
         http->getConn()->handleReply(rep, receivedData);
 
     PROF_stop(clientSocketRecipient);
 }
 
 /**
  * Called when a downstream node is no longer interested in
  * our data. As we are a terminal node, this means on aborts
  * only
  */
 void
 clientSocketDetach(clientStreamNode * node, ClientHttpRequest * http)
 {
     /* Test preconditions */
     assert(node != NULL);
     /* TODO: handle this rather than asserting
      * - it should only ever happen if we cause an abort and
      * the callback chain loops back to here, so we can simply return.
@@ -1483,121 +1409,118 @@
 {
     debugs(33, 2, HERE << conn->clientConnection << " Sending next");
 
     /** If the client stream is waiting on a socket write to occur, then */
 
     if (deferredRequest->flags.deferred) {
         /** NO data is allowed to have been sent. */
         assert(deferredRequest->http->out.size == 0);
         /** defer now. */
         clientSocketRecipient(deferredRequest->deferredparams.node,
                               deferredRequest->http,
                               deferredRequest->deferredparams.rep,
                               deferredRequest->deferredparams.queuedBuffer);
     }
 
     /** otherwise, the request is still active in a callbacksomewhere,
      * and we are done
      */
 }
 
-/// called when we have successfully finished writing the response
 void
-ClientSocketContext::keepaliveNextRequest()
+ConnStateData::kick()
 {
-    ConnStateData * conn = http->getConn();
-
-    debugs(33, 3, HERE << "ConnnStateData(" << conn->clientConnection << "), Context(" << clientConnection << ")");
-    connIsFinished();
+    if (!Comm::IsConnOpen(clientConnection)) {
+        debugs(33, 2, clientConnection << " Connection was closed");
+        return;
+    }
 
-    if (conn->pinning.pinned && !Comm::IsConnOpen(conn->pinning.serverConnection)) {
-        debugs(33, 2, HERE << conn->clientConnection << " Connection was pinned but server side gone. Terminating client connection");
-        conn->clientConnection->close();
+    if (pinning.pinned && !Comm::IsConnOpen(pinning.serverConnection)) {
+        debugs(33, 2, clientConnection << " Connection was pinned but server side gone. Terminating client connection");
+        clientConnection->close();
         return;
     }
 
     /** \par
      * We are done with the response, and we are either still receiving request
      * body (early response!) or have already stopped receiving anything.
      *
      * If we are still receiving, then clientParseRequest() below will fail.
      * (XXX: but then we will call readNextRequest() which may succeed and
      * execute a smuggled request as we are not done with the current request).
      *
      * If we stopped because we got everything, then try the next request.
      *
      * If we stopped receiving because of an error, then close now to avoid
      * getting stuck and to prevent accidental request smuggling.
      */
 
-    if (const char *reason = conn->stoppedReceiving()) {
-        debugs(33, 3, HERE << "closing for earlier request error: " << reason);
-        conn->clientConnection->close();
+    if (const char *reason = stoppedReceiving()) {
+        debugs(33, 3, "closing for earlier request error: " << reason);
+        clientConnection->close();
         return;
     }
 
     /** \par
      * Attempt to parse a request from the request buffer.
      * If we've been fed a pipelined request it may already
      * be in our read buffer.
      *
      \par
      * This needs to fall through - if we're unlucky and parse the _last_ request
      * from our read buffer we may never re-register for another client read.
      */
 
-    if (conn->clientParseRequests()) {
-        debugs(33, 3, HERE << conn->clientConnection << ": parsed next request from buffer");
+    if (clientParseRequests()) {
+        debugs(33, 3, clientConnection << ": parsed next request from buffer");
     }
 
     /** \par
      * Either we need to kick-start another read or, if we have
      * a half-closed connection, kill it after the last request.
      * This saves waiting for half-closed connections to finished being
      * half-closed _AND_ then, sometimes, spending "Timeout" time in
      * the keepalive "Waiting for next request" state.
      */
-    if (commIsHalfClosed(conn->clientConnection->fd) && (conn->getConcurrentRequestCount() == 0)) {
-        debugs(33, 3, "ClientSocketContext::keepaliveNextRequest: half-closed client with no pending requests, closing");
-        conn->clientConnection->close();
+    if (commIsHalfClosed(clientConnection->fd) && pipeline.empty()) {
+        debugs(33, 3, "half-closed client with no pending requests, closing");
+        clientConnection->close();
         return;
     }
 
-    ClientSocketContext::Pointer deferredRequest;
-
     /** \par
      * At this point we either have a parsed request (which we've
      * kicked off the processing for) or not. If we have a deferred
      * request (parsed but deferred for pipeling processing reasons)
      * then look at processing it. If not, simply kickstart
      * another read.
      */
-
-    if ((deferredRequest = conn->getCurrentContext()).getRaw()) {
-        debugs(33, 3, HERE << conn->clientConnection << ": calling PushDeferredIfNeeded");
-        ClientSocketContextPushDeferredIfNeeded(deferredRequest, conn);
-    } else if (conn->flags.readMore) {
-        debugs(33, 3, HERE << conn->clientConnection << ": calling conn->readNextRequest()");
-        conn->readNextRequest();
+    ClientSocketContext::Pointer deferredRequest = pipeline.front();
+    if (deferredRequest != nullptr) {
+        debugs(33, 3, clientConnection << ": calling PushDeferredIfNeeded");
+        ClientSocketContextPushDeferredIfNeeded(deferredRequest, this);
+    } else if (flags.readMore) {
+        debugs(33, 3, clientConnection << ": calling readNextRequest()");
+        readNextRequest();
     } else {
         // XXX: Can this happen? CONNECT tunnels have deferredRequest set.
-        debugs(33, DBG_IMPORTANT, HERE << "abandoning " << conn->clientConnection);
+        debugs(33, DBG_IMPORTANT, MYNAME << "abandoning " << clientConnection);
     }
 }
 
 void
 clientUpdateSocketStats(const LogTags &logType, size_t size)
 {
     if (size == 0)
         return;
 
     statCounter.client_http.kbytes_out += size;
 
     if (logType.isTcpHit())
         statCounter.client_http.hit_kbytes_out += size;
 }
 
 /**
  * increments iterator "i"
  * used by clientPackMoreRanges
  *
  \retval true    there is still data available to pack more ranges
@@ -1758,40 +1681,41 @@
 void
 ClientSocketContext::noteIoError(const int xerrno)
 {
     if (http) {
         http->logType.err.timedout = (xerrno == ETIMEDOUT);
         // aborted even if xerrno is zero (which means read abort/eof)
         http->logType.err.aborted = (xerrno != ETIMEDOUT);
     }
 }
 
 void
 ClientSocketContext::doClose()
 {
     clientConnection->close();
 }
 
 /// called when we encounter a response-related error
 void
 ClientSocketContext::initiateClose(const char *reason)
 {
+    debugs(33, 4, clientConnection << " because " << reason);
     http->getConn()->stopSending(reason); // closes ASAP
 }
 
 void
 ConnStateData::stopSending(const char *error)
 {
     debugs(33, 4, HERE << "sending error (" << clientConnection << "): " << error <<
            "; old receiving error: " <<
            (stoppedReceiving() ? stoppedReceiving_ : "none"));
 
     if (const char *oldError = stoppedSending()) {
         debugs(33, 3, HERE << "already stopped sending: " << oldError);
         return; // nothing has changed as far as this connection is concerned
     }
     stoppedSending_ = error;
 
     if (!stoppedReceiving()) {
         if (const int64_t expecting = mayNeedToReadMoreBody()) {
             debugs(33, 5, HERE << "must still read " << expecting <<
                    " request body bytes with " << inBuf.length() << " unused");
@@ -1812,86 +1736,88 @@
            (entry ? entry->objectLen() : 0));
     clientUpdateSocketStats(http->logType, size);
 
     /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
 
     if (errflag == Comm::ERR_CLOSING || !Comm::IsConnOpen(conn))
         return;
 
     if (errflag || clientHttpRequestStatus(conn->fd, http)) {
         initiateClose("failure or true request status");
         /* Do we leak here ? */
         return;
     }
 
     switch (socketState()) {
 
     case STREAM_NONE:
         pullData();
         break;
 
-    case STREAM_COMPLETE:
+    case STREAM_COMPLETE: {
         debugs(33, 5, conn << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
-        if (http->request->flags.proxyKeepalive)
-            keepaliveNextRequest();
-        else
-            initiateClose("STREAM_COMPLETE NOKEEPALIVE");
+        ConnStateData *c = http->getConn();
+        if (!http->request->flags.proxyKeepalive)
+            clientConnection->close();
+        finished();
+        c->kick();
+        }
         return;
 
     case STREAM_UNPLANNED_COMPLETE:
         initiateClose("STREAM_UNPLANNED_COMPLETE");
         return;
 
     case STREAM_FAILED:
         initiateClose("STREAM_FAILED");
         return;
 
     default:
         fatal("Hit unreachable code in clientWriteComplete\n");
     }
 }
 
 ClientSocketContext *
 ConnStateData::abortRequestParsing(const char *const uri)
 {
     ClientHttpRequest *http = new ClientHttpRequest(this);
     http->req_sz = inBuf.length();
     http->uri = xstrdup(uri);
     setLogUri (http, uri);
     ClientSocketContext *context = new ClientSocketContext(clientConnection, http);
     StoreIOBuffer tempBuffer;
     tempBuffer.data = context->reqbuf;
     tempBuffer.length = HTTP_REQBUF_SZ;
     clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach,
                      clientReplyStatus, new clientReplyContext(http), clientSocketRecipient,
                      clientSocketDetach, context, tempBuffer);
     return context;
 }
 
 void
 ConnStateData::startShutdown()
 {
     // RegisteredRunner API callback - Squid has been shut down
 
     // if connection is idle terminate it now,
     // otherwise wait for grace period to end
-    if (getConcurrentRequestCount() == 0)
+    if (pipeline.empty())
         endingShutdown();
 }
 
 void
 ConnStateData::endingShutdown()
 {
     // RegisteredRunner API callback - Squid shutdown grace period is over
 
     // force the client connection to close immediately
     // swanSong() in the close handler will cleanup.
     if (Comm::IsConnOpen(clientConnection))
         clientConnection->close();
 
     // deregister now to ensure finalShutdown() does not kill us prematurely.
     // fd_table purge will cleanup if close handler was not fast enough.
     DeregisterRunner(this);
 }
 
 char *
 skipLeadingSpace(char *aString)
@@ -2243,75 +2169,52 @@
         //  But have not parsed there yet!! flag for local-only handling.
         http->flags.internal = true;
 
     } else if (csd->port->flags.accelSurrogate || csd->switchedToHttps()) {
         /* accelerator mode */
         prepareAcceleratedURL(csd, http, hp);
     }
 
     if (!http->uri) {
         /* No special rewrites have been applied above, use the
          * requested url. may be rewritten later, so make extra room */
         int url_sz = hp->requestUri().length() + Config.appendDomainLen + 5;
         http->uri = (char *)xcalloc(url_sz, 1);
         SBufToCstring(http->uri, hp->requestUri());
     }
 
     result->flags.parsed_ok = 1;
     return result;
 }
 
-void
-ConnStateData::addContextToQueue(ClientSocketContext * context)
-{
-    ClientSocketContext::Pointer *S;
-
-    for (S = (ClientSocketContext::Pointer *) & currentobject; S->getRaw();
-            S = &(*S)->next);
-    *S = context;
-
-    ++nrequests;
-}
-
-int
-ConnStateData::getConcurrentRequestCount() const
-{
-    int result = 0;
-    ClientSocketContext::Pointer *T;
-
-    for (T = (ClientSocketContext::Pointer *) ¤tobject;
-            T->getRaw(); T = &(*T)->next, ++result);
-    return result;
-}
-
 bool
 ConnStateData::connFinishedWithConn(int size)
 {
     if (size == 0) {
-        if (getConcurrentRequestCount() == 0 && inBuf.isEmpty()) {
+        if (pipeline.empty() && inBuf.isEmpty()) {
             /* no current or pending requests */
             debugs(33, 4, HERE << clientConnection << " closed");
             return true;
         } else if (!Config.onoff.half_closed_clients) {
             /* admin doesn't want to support half-closed client sockets */
             debugs(33, 3, HERE << clientConnection << " aborted (half_closed_clients disabled)");
-            notifyAllContexts(0); // no specific error implies abort
+            pipeline.terminateAll(0);
             return true;
         }
     }
 
     return false;
 }
 
 void
 ConnStateData::consumeInput(const size_t byteCount)
 {
     assert(byteCount > 0 && byteCount <= inBuf.length());
     inBuf.consume(byteCount);
     debugs(33, 5, "inBuf has " << inBuf.length() << " unused bytes");
 }
 
 void
 ConnStateData::clientAfterReadingRequests()
 {
     // Were we expecting to read more request body from half-closed connection?
     if (mayNeedToReadMoreBody() && commIsHalfClosed(clientConnection->fd)) {
@@ -2413,51 +2316,56 @@
                 repContext->setReplyToError(request->method, err);
                 assert(context->http->out.offset == 0);
                 context->pullData();
                 return true;
             }
         }
     }
 
     return false;
 }
 #endif // USE_OPENSSL
 
 /**
  * Check on_unsupported_protocol checklist and return true if tunnel mode selected
  * or false otherwise
  */
 bool
 clientTunnelOnError(ConnStateData *conn, ClientSocketContext *context, HttpRequest *request, const HttpRequestMethod& method, err_type requestError, Http::StatusCode errStatusCode, const char *requestErrorBytes)
 {
     if (conn->port->flags.isIntercepted() &&
-            Config.accessList.on_unsupported_protocol && conn->nrequests <= 1) {
+            Config.accessList.on_unsupported_protocol && conn->pipeline.nrequests <= 1) {
         ACLFilledChecklist checklist(Config.accessList.on_unsupported_protocol, request, NULL);
         checklist.requestErrorType = requestError;
         checklist.src_addr = conn->clientConnection->remote;
         checklist.my_addr = conn->clientConnection->local;
         checklist.conn(conn);
         allow_t answer = checklist.fastCheck();
         if (answer == ACCESS_ALLOWED && answer.kind == 1) {
             debugs(33, 3, "Request will be tunneled to server");
-            if (context)
-                context->removeFromConnectionList(conn);
+            if (context) {
+                // XXX: Either the context is finished() or it should stay queued.
+                // The below may leak client streams BodyPipe objects. BUT, we need
+                // to check if client-streams detatch is safe to do here (finished() will detatch).
+                assert(conn->pipeline.front() == context); // XXX: still assumes HTTP/1 semantics
+                conn->pipeline.popMe(ClientSocketContextPointer(context));
+            }
             Comm::SetSelect(conn->clientConnection->fd, COMM_SELECT_READ, NULL, NULL, 0);
             conn->fakeAConnectRequest("unknown-protocol", conn->preservedClientData);
             return true;
         } else {
             debugs(33, 3, "Continue with returning the error: " << requestError);
         }
     }
 
     if (context) {
         conn->quitAfterError(request);
         clientStreamNode *node = context->getClientReplyContext();
         clientReplyContext *repContext = dynamic_cast<clientReplyContext *>(node->data.getRaw());
         assert (repContext);
 
         repContext->setReplyToError(requestError, errStatusCode, method, context->http->uri, conn->clientConnection->remote, NULL, requestErrorBytes, NULL);
 
         assert(context->http->out.offset == 0);
         context->pullData();
     } // else Probably an ERR_REQUEST_START_TIMEOUT error so just return.
     return false;
@@ -2668,41 +2576,41 @@
     clientProcessRequestFinished(conn, request);
 }
 
 int
 ConnStateData::pipelinePrefetchMax() const
 {
     // TODO: Support pipelined requests through pinned connections.
     if (pinning.pinned)
         return 0;
     return Config.pipeline_max_prefetch;
 }
 
 /**
  * Limit the number of concurrent requests.
  * \return true  when there are available position(s) in the pipeline queue for another request.
  * \return false when the pipeline queue is full or disabled.
  */
 bool
 ConnStateData::concurrentRequestQueueFilled() const
 {
-    const int existingRequestCount = getConcurrentRequestCount();
+    const int existingRequestCount = pipeline.count();
 
     // default to the configured pipeline size.
     // add 1 because the head of pipeline is counted in concurrent requests and not prefetch queue
 #if USE_OPENSSL
     const int internalRequest = (transparent() && sslBumpMode == Ssl::bumpSplice) ? 1 : 0;
 #else
     const int internalRequest = 0;
 #endif
     const int concurrentRequestLimit = pipelinePrefetchMax() + 1 + internalRequest;
 
     // when queue filled already we cant add more.
     if (existingRequestCount >= concurrentRequestLimit) {
         debugs(33, 3, clientConnection << " max concurrent requests reached (" << concurrentRequestLimit << ")");
         debugs(33, 5, clientConnection << " deferring new request until one is done");
         return true;
     }
 
     return false;
 }
 
@@ -3032,55 +2940,55 @@
             if (context->mayUseConnection()) {
                 debugs(33, 3, HERE << "Not parsing new requests, as this request may need the connection");
                 break;
             }
         } else {
             debugs(33, 5, clientConnection << ": not enough request data: " <<
                    inBuf.length() << " < " << Config.maxRequestHeaderSize);
             Must(inBuf.length() < Config.maxRequestHeaderSize);
             break;
         }
     }
 
     /* XXX where to 'finish' the parsing pass? */
     return parsed_req;
 }
 
 void
 ConnStateData::afterClientRead()
 {
     /* Process next request */
-    if (getConcurrentRequestCount() == 0)
+    if (pipeline.empty())
         fd_note(clientConnection->fd, "Reading next request");
 
     if (!clientParseRequests()) {
         if (!isOpen())
             return;
         /*
          * If the client here is half closed and we failed
          * to parse a request, close the connection.
          * The above check with connFinishedWithConn() only
          * succeeds _if_ the buffer is empty which it won't
          * be if we have an incomplete request.
-         * XXX: This duplicates ClientSocketContext::keepaliveNextRequest
+         * XXX: This duplicates ConnStateData::kick
          */
-        if (getConcurrentRequestCount() == 0 && commIsHalfClosed(clientConnection->fd)) {
+        if (pipeline.empty() && commIsHalfClosed(clientConnection->fd)) {
             debugs(33, 5, clientConnection << ": half-closed connection, no completed request parsed, connection closing.");
             clientConnection->close();
             return;
         }
     }
 
     if (!isOpen())
         return;
 
     clientAfterReadingRequests();
 }
 
 /**
  * called when new request data has been read from the socket
  *
  * \retval false called comm_close or setReplyToError (the caller should bail)
  * \retval true  we did not call comm_close or setReplyToError
  */
 bool
 ConnStateData::handleReadData()
@@ -3170,41 +3078,41 @@
         Must(!bodyParser->needsMoreSpace() || bodyPipe->buf().hasContent());
     } catch (...) { // TODO: be more specific
         debugs(33, 3, HERE << "malformed chunks" << bodyPipe->status());
         return ERR_INVALID_REQ;
     }
 
     debugs(33, 7, HERE << "need more chunked data" << *bodyPipe->status());
     return ERR_NONE;
 }
 
 /// quit on errors related to chunked request body handling
 void
 ConnStateData::abortChunkedRequestBody(const err_type error)
 {
     finishDechunkingRequest(false);
 
     // XXX: The code below works if we fail during initial request parsing,
     // but if we fail when the server connection is used already, the server may send
     // us its response too, causing various assertions. How to prevent that?
 #if WE_KNOW_HOW_TO_SEND_ERRORS
-    ClientSocketContext::Pointer context = getCurrentContext();
+    ClientSocketContext::Pointer context = pipeline.front();
     if (context != NULL && !context->http->out.offset) { // output nothing yet
         clientStreamNode *node = context->getClientReplyContext();
         clientReplyContext *repContext = dynamic_cast<clientReplyContext*>(node->data.getRaw());
         assert(repContext);
         const Http::StatusCode scode = (error == ERR_TOO_BIG) ?
                                        Http::scPayloadTooLarge : HTTP_BAD_REQUEST;
         repContext->setReplyToError(error, scode,
                                     repContext->http->request->method,
                                     repContext->http->uri,
                                     CachePeer,
                                     repContext->http->request,
                                     inBuf, NULL);
         context->pullData();
     } else {
         // close or otherwise we may get stuck as nobody will notice the error?
         comm_reset_close(clientConnection);
     }
 #else
     debugs(33, 3, HERE << "aborting chunked request without error " << error);
     comm_reset_close(clientConnection);
@@ -3257,41 +3165,40 @@
     */
     debugs(33, 3, "requestTimeout: FD " << io.fd << ": lifetime is expired.");
     io.conn->close();
 }
 
 static void
 clientLifetimeTimeout(const CommTimeoutCbParams &io)
 {
     ClientHttpRequest *http = static_cast<ClientHttpRequest *>(io.data);
     debugs(33, DBG_IMPORTANT, "WARNING: Closing client connection due to lifetime timeout");
     debugs(33, DBG_IMPORTANT, "\t" << http->uri);
     http->logType.err.timedout = true;
     if (Comm::IsConnOpen(io.conn))
         io.conn->close();
 }
 
 ConnStateData::ConnStateData(const MasterXaction::Pointer &xact) :
     AsyncJob("ConnStateData"), // kids overwrite
     Server(xact),
     bodyParser(nullptr),
-    nrequests(0),
 #if USE_OPENSSL
     sslBumpMode(Ssl::bumpEnd),
 #endif
     needProxyProtocolHeader_(false),
 #if USE_OPENSSL
     switchedToHttps_(false),
     sslServerBump(NULL),
     signAlgorithm(Ssl::algSignTrusted),
 #endif
     stoppedSending_(NULL),
     stoppedReceiving_(NULL)
 {
     flags.readMore = true; // kids may overwrite
     flags.swanSang = false;
 
     pinning.host = NULL;
     pinning.port = -1;
     pinning.pinned = false;
     pinning.auth = false;
     pinning.zeroReply = false;
@@ -3806,43 +3713,44 @@
 
     if (certProperties.signAlgorithm == Ssl::algSignUntrusted) {
         assert(port->untrustedSigningCert.get());
         certProperties.signWithX509.resetAndLock(port->untrustedSigningCert.get());
         certProperties.signWithPkey.resetAndLock(port->untrustedSignPkey.get());
     } else {
         assert(port->signingCert.get());
         certProperties.signWithX509.resetAndLock(port->signingCert.get());
 
         if (port->signPkey.get())
             certProperties.signWithPkey.resetAndLock(port->signPkey.get());
     }
     signAlgorithm = certProperties.signAlgorithm;
 
     certProperties.signHash = Ssl::DefaultSignHash;
 }
 
 void
 ConnStateData::getSslContextStart()
 {
-    assert(areAllContextsForThisConnection());
-    freeAllContexts();
-    /* careful: freeAllContexts() above frees request, host, etc. */
+    // XXX starting SSL with a pipeline of requests still waiting for non-SSL replies?
+    assert(pipeline.count() < 2); // the CONNECT is okay for now. Anything else is a bug.
+    pipeline.terminateAll(0);
+    /* careful: terminateAll(0) above frees request, host, etc. */
 
     if (port->generateHostCertificates) {
         Ssl::CertificateProperties certProperties;
         buildSslCertGenerationParams(certProperties);
         sslBumpCertKey = certProperties.dbKey().c_str();
         assert(sslBumpCertKey.size() > 0 && sslBumpCertKey[0] != '\0');
 
         // Disable caching for bumpPeekAndSplice mode
         if (!(sslServerBump && (sslServerBump->act.step1 == Ssl::bumpPeek || sslServerBump->act.step1 == Ssl::bumpStare))) {
             debugs(33, 5, "Finding SSL certificate for " << sslBumpCertKey << " in cache");
             Ssl::LocalContextStorage * ssl_ctx_cache = Ssl::TheGlobalContextStorage.getLocalStorage(port->s);
             Security::ContextPointer dynCtx = nullptr;
             Ssl::SSL_CTX_Pointer *cachedCtx = ssl_ctx_cache ? ssl_ctx_cache->get(sslBumpCertKey.termedBuf()) : NULL;
             if (cachedCtx && (dynCtx = cachedCtx->get())) {
                 debugs(33, 5, "SSL certificate for " << sslBumpCertKey << " found in cache");
                 if (Ssl::verifySslCertificate(dynCtx, certProperties)) {
                     debugs(33, 5, "Cached SSL certificate for " << sslBumpCertKey << " is valid");
                     getSslContextDone(dynCtx);
                     return;
                 } else {
@@ -4114,41 +4022,41 @@
     debugs(83,5, "Bio for  " << clientConnection << " read " << rbuf.contentSize() << " helo bytes");
     // Do splice:
     fd_table[clientConnection->fd].read_method = &default_read_method;
     fd_table[clientConnection->fd].write_method = &default_write_method;
 
     if (transparent()) {
         // set the current protocol to something sensible (was "HTTPS" for the bumping process)
         // we are sending a faked-up HTTP/1.1 message wrapper, so go with that.
         transferProtocol = Http::ProtocolVersion();
         // XXX: copy from MemBuf reallocates, not a regression since old code did too
         SBuf temp;
         temp.append(rbuf.content(), rbuf.contentSize());
         fakeAConnectRequest("intercepted TLS spliced", temp);
     } else {
         // XXX: assuming that there was an HTTP/1.1 CONNECT to begin with...
 
         // reset the current protocol to HTTP/1.1 (was "HTTPS" for the bumping process)
         transferProtocol = Http::ProtocolVersion();
         // inBuf still has the "CONNECT ..." request data, reset it to SSL hello message
         inBuf.append(rbuf.content(), rbuf.contentSize());
-        ClientSocketContext::Pointer context = getCurrentContext();
+        ClientSocketContext::Pointer context = pipeline.front();
         ClientHttpRequest *http = context->http;
         tunnelStart(http);
     }
 }
 
 void
 ConnStateData::startPeekAndSpliceDone()
 {
     // This is the Step2 of the SSL bumping
     assert(sslServerBump);
     if (sslServerBump->step == Ssl::bumpStep1) {
         sslServerBump->step = Ssl::bumpStep2;
         // Run a accessList check to check if want to splice or continue bumping
 
         ACLFilledChecklist *acl_checklist = new ACLFilledChecklist(Config.accessList.ssl_bump, sslServerBump->request.getRaw(), NULL);
         //acl_checklist->src_addr = params.conn->remote;
         //acl_checklist->my_addr = s->s;
         acl_checklist->banAction(allow_t(ACCESS_ALLOWED, Ssl::bumpNone));
         acl_checklist->banAction(allow_t(ACCESS_ALLOWED, Ssl::bumpClientFirst));
         acl_checklist->banAction(allow_t(ACCESS_ALLOWED, Ssl::bumpServerFirst));
@@ -4170,42 +4078,42 @@
     debugs(33, 5, "PeekAndSplice mode, proceed with client negotiation. Currrent state:" << SSL_state_string_long(ssl));
     bio->hold(false);
 
     Comm::SetSelect(clientConnection->fd, COMM_SELECT_WRITE, clientNegotiateSSL, this, 0);
     switchedToHttps_ = true;
 }
 
 void
 ConnStateData::httpsPeeked(Comm::ConnectionPointer serverConnection)
 {
     Must(sslServerBump != NULL);
 
     if (Comm::IsConnOpen(serverConnection)) {
         pinConnection(serverConnection, NULL, NULL, false);
 
         debugs(33, 5, HERE << "bumped HTTPS server: " << sslConnectHostOrIp);
     } else {
         debugs(33, 5, HERE << "Error while bumping: " << sslConnectHostOrIp);
 
         //  copy error detail from bump-server-first request to CONNECT request
-        if (currentobject != NULL && currentobject->http != NULL && currentobject->http->request)
-            currentobject->http->request->detailError(sslServerBump->request->errType, sslServerBump->request->errDetail);
+        if (!pipeline.empty() && pipeline.front()->http != nullptr && pipeline.front()->http->request)
+            pipeline.front()->http->request->detailError(sslServerBump->request->errType, sslServerBump->request->errDetail);
     }
 
     getSslContextStart();
 }
 
 #endif /* USE_OPENSSL */
 
 void
 ConnStateData::fakeAConnectRequest(const char *reason, const SBuf &payload)
 {
     // fake a CONNECT request to force connState to tunnel
     SBuf connectHost;
 #if USE_OPENSSL
     if (serverBump() && !serverBump()->clientSni.isEmpty()) {
         connectHost.assign(serverBump()->clientSni);
         if (clientConnection->local.port() > 0)
             connectHost.appendf(":%d",clientConnection->local.port());
     } else
 #endif
     {
@@ -4604,61 +4512,61 @@
 {
     Must(bodyPipe != NULL);
     debugs(33, 5, HERE << "start dechunking" << bodyPipe->status());
     assert(!bodyParser);
     bodyParser = new Http1::TeChunkedParser;
 }
 
 /// put parsed content into input buffer and clean up
 void
 ConnStateData::finishDechunkingRequest(bool withSuccess)
 {
     debugs(33, 5, HERE << "finish dechunking: " << withSuccess);
 
     if (bodyPipe != NULL) {
         debugs(33, 7, HERE << "dechunked tail: " << bodyPipe->status());
         BodyPipe::Pointer myPipe = bodyPipe;
         stopProducingFor(bodyPipe, withSuccess); // sets bodyPipe->bodySize()
         Must(!bodyPipe); // we rely on it being nil after we are done with body
         if (withSuccess) {
             Must(myPipe->bodySizeKnown());
-            ClientSocketContext::Pointer context = getCurrentContext();
+            ClientSocketContext::Pointer context = pipeline.front();
             if (context != NULL && context->http && context->http->request)
                 context->http->request->setContentLength(myPipe->bodySize());
         }
     }
 
     delete bodyParser;
     bodyParser = NULL;
 }
 
+// XXX: this is an HTTP/1-only operation
 void
 ConnStateData::sendControlMsg(HttpControlMsg msg)
 {
     if (!isOpen()) {
         debugs(33, 3, HERE << "ignoring 1xx due to earlier closure");
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
-    if (context != NULL) {
-        context->writeControlMsg(msg); // will call msg.cbSuccess
+    if (!pipeline.empty()) {
+        pipeline.front()->writeControlMsg(msg); // will call msg.cbSuccess
         return;
     }
 
     debugs(33, 3, HERE << " closing due to missing context for 1xx");
     clientConnection->close();
 }
 
 /// Our close handler called by Comm when the pinned connection is closed
 void
 ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     // FwdState might repin a failed connection sooner than this close
     // callback is called for the failed connection.
     assert(pinning.serverConnection == io.conn);
     pinning.closeHandler = NULL; // Comm unregisters handlers before calling
     const bool sawZeroReply = pinning.zeroReply; // reset when unpinning
     pinning.serverConnection->noteClosure();
     unpinConnection(false);
 
     if (sawZeroReply && clientConnection != NULL) {
@@ -4784,50 +4692,49 @@
 }
 #endif
 
 /// Our read handler called by Comm when the server either closes an idle pinned connection or
 /// perhaps unexpectedly sends something on that idle (from Squid p.o.v.) connection.
 void
 ConnStateData::clientPinnedConnectionRead(const CommIoCbParams &io)
 {
     pinning.readHandler = NULL; // Comm unregisters handlers before calling
 
     if (io.flag == Comm::ERR_CLOSING)
         return; // close handler will clean up
 
     Must(pinning.serverConnection == io.conn);
 
 #if USE_OPENSSL
     if (handleIdleClientPinnedTlsRead())
         return;
 #endif
 
-    // We could use getConcurrentRequestCount(), but this may be faster.
-    const bool clientIsIdle = !getCurrentContext();
+    const bool clientIsIdle = pipeline.empty();
 
     debugs(33, 3, "idle pinned " << pinning.serverConnection << " read " <<
            io.size << (clientIsIdle ? " with idle client" : ""));
 
     pinning.serverConnection->close();
 
     // If we are still sending data to the client, do not close now. When we are done sending,
-    // ClientSocketContext::keepaliveNextRequest() checks pinning.serverConnection and will close.
+    // ConnStateData::kick() checks pinning.serverConnection and will close.
     // However, if we are idle, then we must close to inform the idle client and minimize races.
     if (clientIsIdle && clientConnection != NULL)
         clientConnection->close();
 }
 
 const Comm::ConnectionPointer
 ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *aPeer)
 {
     debugs(33, 7, HERE << pinning.serverConnection);
 
     bool valid = true;
     if (!Comm::IsConnOpen(pinning.serverConnection))
         valid = false;
     else if (pinning.auth && pinning.host && request && strcasecmp(pinning.host, request->url.host()) != 0)
         valid = false;
     else if (request && pinning.port != request->url.port())
         valid = false;
     else if (pinning.peer && !cbdataReferenceValid(pinning.peer))
         valid = false;
     else if (aPeer != pinning.peer)

=== modified file 'src/client_side.h'
--- src/client_side.h	2015-11-09 16:24:34 +0000
+++ src/client_side.h	2015-11-17 08:18:04 +0000
@@ -20,234 +20,247 @@
 #include "ipc/FdNotes.h"
 #include "SBuf.h"
 #include "servers/Server.h"
 #if USE_AUTH
 #include "auth/UserRequest.h"
 #endif
 #if USE_OPENSSL
 #include "ssl/support.h"
 #endif
 
 class ConnStateData;
 class ClientHttpRequest;
 class clientStreamNode;
 namespace AnyP
 {
 class PortCfg;
 } // namespace Anyp
 
 /**
  * Badly named.
- * This is in fact the processing context for a single HTTP request.
+ * This is in fact the processing context for a single HTTP transaction.
  *
- * Managing what has been done, and what happens next to the data buffer
- * holding what we hope is an HTTP request.
+ * A context lifetime extends from directly after a request has been parsed
+ * off the client connection buffer, until the last byte of both request
+ * and reply payload (if any) have been written.
+ *
+ * (NOTE: it is not certain yet if an early reply to a POST/PUT is sent by
+ * the server whether the context will remain in the pipeline until its
+ * request payload has finished being read. It is supposed to, but may not)
+ *
+ * Contexts self-register with the Pipeline being managed by the Server
+ * for the connection on which the request was received.
+ *
+ * When HTTP/1 pipeline is operating there may be multiple transactions using
+ * the clientConnection. Only the back() context may read from the connection,
+ * and only the front() context may write to it. A context which needs to read
+ * or write to the connection but does not meet those criteria must be shifted
+ * to the deferred state.
+ *
+ * When a context is completed the finished() method needs to be called which
+ * will perform all cleanup and deregistration operations. If the reason for
+ * finishing is an error, then notifyIoError() needs to be called prior to
+ * the finished() method.
+ * The caller should follow finished() with a call to ConnStateData::kick()
+ * to resume processing of other transactions or I/O on the connection.
  *
- * Parsing is still a mess of global functions done in conjunction with the
- * real socket controller which generated ClientHttpRequest.
- * It also generates one of us and passes us control from there based on
- * the results of the parse.
+ * Alternatively the initiateClose() method can be called to terminate the
+ * whole client connection and all other pending contexts.
  *
- * After that all the request interpretation and adaptation is in our scope.
- * Then finally the reply fetcher is created by this and we get the result
- * back. Which we then have to manage writing of it to the ConnStateData.
- *
- * The socket level management is done by a ConnStateData which owns us.
+ * The socket level management is done by a Server which owns us.
  * The scope of this objects control over a socket consists of the data
- * buffer received from ConnStateData with an initially unknown length.
- * When that length is known it sets the end bounary of our acces to the
+ * buffer received from the Server with an initially unknown length.
+ * When that length is known it sets the end boundary of our access to the
  * buffer.
  *
  * The individual processing actions are done by other Jobs which we
  * kick off as needed.
  *
  * XXX: If an async call ends the ClientHttpRequest job, ClientSocketContext
  * (and ConnStateData) may not know about it, leading to segfaults and
- * assertions like areAllContextsForThisConnection(). This is difficult to fix
+ * assertions. This is difficult to fix
  * because ClientHttpRequest lacks a good way to communicate its ongoing
  * destruction back to the ClientSocketContext which pretends to "own" *http.
  */
 class ClientSocketContext : public RefCountable
 {
     CBDATA_CLASS(ClientSocketContext);
 
 public:
     typedef RefCount<ClientSocketContext> Pointer;
     ClientSocketContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
     ~ClientSocketContext();
     bool startOfOutput() const;
     void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag);
-    void keepaliveNextRequest();
 
     Comm::ConnectionPointer clientConnection; /// details about the client connection socket.
     ClientHttpRequest *http;    /* we pretend to own that job */
     HttpReply *reply;
     char reqbuf[HTTP_REQBUF_SZ];
-    Pointer next;
 
     struct {
 
         unsigned deferred:1; /* This is a pipelined request waiting for the current object to complete */
 
         unsigned parsed_ok:1; /* Was this parsed correctly? */
     } flags;
     bool mayUseConnection() const {return mayUseConnection_;}
 
     void mayUseConnection(bool aBool) {
         mayUseConnection_ = aBool;
         debugs(33,3, HERE << "This " << this << " marked " << aBool);
     }
 
     class DeferredParams
     {
 
     public:
         clientStreamNode *node;
         HttpReply *rep;
         StoreIOBuffer queuedBuffer;
     };
 
     DeferredParams deferredparams;
     int64_t writtenToSocket;
     void pullData();
     int64_t getNextRangeOffset() const;
     bool canPackMoreRanges() const;
     clientStream_status_t socketState();
     void sendBody(HttpReply * rep, StoreIOBuffer bodyData);
     void sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData);
     size_t lengthToSend(Range<int64_t> const &available);
     void noteSentBodyBytes(size_t);
     void buildRangeHeader(HttpReply * rep);
     clientStreamNode * getTail() const;
     clientStreamNode * getClientReplyContext() const;
     ConnStateData *getConn() const;
-    void connIsFinished();
-    void removeFromConnectionList(ConnStateData * conn);
+    void finished(); ///< cleanup when the transaction has finished. may destroy 'this'
     void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData);
     bool multipartRangeRequest() const;
     void registerWithConn();
     void noteIoError(const int xerrno); ///< update state to reflect I/O error
 
     /// starts writing 1xx control message to the client
     void writeControlMsg(HttpControlMsg &msg);
 
 protected:
     static IOCB WroteControlMsg;
     void wroteControlMsg(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag, int xerrno);
 
 private:
     void prepareReply(HttpReply * rep);
     void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
     void packRange(StoreIOBuffer const &, MemBuf * mb);
-    void deRegisterWithConn();
     void doClose();
     void initiateClose(const char *reason);
 
     AsyncCall::Pointer cbControlMsgSent; ///< notifies HttpControlMsg Source
 
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
 };
 
 class ConnectionDetail;
 #if USE_OPENSSL
 namespace Ssl
 {
 class ServerBump;
 }
 #endif
+
 /**
- * Manages a connection to a client.
+ * Legacy Server code managing a connection to a client.
+ *
+ * NP: presents AsyncJob API but does not operate autonomously as a Job.
+ *     So Must() is not safe to use.
+ *
+ * Multiple requests (up to pipeline_prefetch) can be pipelined.
+ * This object is responsible for managing which one is currently being
+ * fulfilled and what happens to the queue if the current one causes the client
+ * connection to be closed early.
+ *
+ * Act as a manager for the client connection and passes data in buffer to a
+ * Parser relevant to the state (message headers vs body) that is being
+ * processed.
+ *
+ * Performs HTTP message processing to kick off the actual HTTP request
+ * handling objects (ClientSocketContext, ClientHttpRequest, HttpRequest).
  *
- * Multiple requests (up to pipeline_prefetch) can be pipelined. This object is responsible for managing
- * which one is currently being fulfilled and what happens to the queue if the current one
- * causes the client connection to be closed early.
- *
- * Act as a manager for the connection and passes data in buffer to the current parser.
- * the parser has ambiguous scope at present due to being made from global functions
- * I believe this object uses the parser to identify boundaries and kick off the
- * actual HTTP request handling objects (ClientSocketContext, ClientHttpRequest, HttpRequest)
+ * Performs SSL-Bump processing for switching between HTTP and HTTPS protocols.
  *
- * If the above can be confirmed accurate we can call this object PipelineManager or similar
+ * To terminate a ConnStateData close() the client Comm::Connection it is
+ * managing, or for graceful half-close use the stopReceiving() or
+ * stopSending() methods.
  */
 class ConnStateData : public Server, public HttpControlMsgSink, public RegisteredRunner
 {
 
 public:
     explicit ConnStateData(const MasterXaction::Pointer &xact);
     virtual ~ConnStateData();
 
     /* ::Server API */
-    virtual void notifyAllContexts(const int xerrno);
     virtual void receivedFirstByte();
     virtual bool handleReadData();
     virtual void afterClientRead();
 
-    bool areAllContextsForThisConnection() const;
-    void freeAllContexts();
     /// Traffic parsing
     bool clientParseRequests();
     void readNextRequest();
-    ClientSocketContext::Pointer getCurrentContext() const;
-    void addContextToQueue(ClientSocketContext * context);
-    int getConcurrentRequestCount() const;
+
+    /// try to make progress on a transaction or read more I/O
+    void kick();
+
     bool isOpen() const;
 
     // HttpControlMsgSink API
     virtual void sendControlMsg(HttpControlMsg msg);
 
     Http1::TeChunkedParser *bodyParser; ///< parses HTTP/1.1 chunked request body
 
     /** number of body bytes we need to comm_read for the "current" request
      *
      * \retval 0         We do not need to read any [more] body bytes
      * \retval negative  May need more but do not know how many; could be zero!
      * \retval positive  Need to read exactly that many more body bytes
      */
     int64_t mayNeedToReadMoreBody() const;
 
 #if USE_AUTH
     /**
      * Fetch the user details for connection based authentication
      * NOTE: this is ONLY connection based because NTLM and Negotiate is against HTTP spec.
      */
     const Auth::UserRequest::Pointer &getAuth() const { return auth_; }
 
     /**
      * Set the user details for connection-based authentication to use from now until connection closure.
      *
      * Any change to existing credentials shows that something invalid has happened. Such as:
      * - NTLM/Negotiate auth was violated by the per-request headers missing a revalidation token
      * - NTLM/Negotiate auth was violated by the per-request headers being for another user
      * - SSL-Bump CONNECT tunnel with persistent credentials has ended
      */
     void setAuth(const Auth::UserRequest::Pointer &aur, const char *cause);
 #endif
 
-    /**
-     * used by the owner of the connection, opaque otherwise
-     * TODO: generalise the connection owner concept.
-     */
-    ClientSocketContext::Pointer currentobject;
-
     Ip::Address log_addr;
-    int nrequests;
 
     struct {
         bool readMore; ///< needs comm_read (for this request or new requests)
         bool swanSang; // XXX: temporary flag to check proper cleanup
     } flags;
     struct {
         Comm::ConnectionPointer serverConnection; /* pinned server side connection */
         char *host;             /* host name of pinned connection */
         int port;               /* port of pinned connection */
         bool pinned;             /* this connection was pinned */
         bool auth;               /* pinned for www authentication */
         bool reading;   ///< we are monitoring for peer connection closure
         bool zeroReply; ///< server closed w/o response (ERR_ZERO_SIZE_OBJECT)
         CachePeer *peer;             /* CachePeer the connection goes via */
         AsyncCall::Pointer readHandler; ///< detects serverConnection closure
         AsyncCall::Pointer closeHandler; /*The close handler for pinned server side connection*/
     } pinning;
 
     bool transparent() const;
 

=== modified file 'src/servers/FtpServer.cc'
--- src/servers/FtpServer.cc	2015-11-07 12:08:33 +0000
+++ src/servers/FtpServer.cc	2015-11-17 07:10:06 +0000
@@ -107,68 +107,68 @@
     if (reader != NULL)
         return;
 
     const size_t availSpace = sizeof(uploadBuf) - uploadAvailSize;
     if (availSpace <= 0)
         return;
 
     debugs(33, 4, dataConn << ": reading FTP data...");
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     reader = JobCallback(33, 5, Dialer, this, Ftp::Server::readUploadData);
     comm_read(dataConn, uploadBuf + uploadAvailSize, availSpace,
               reader);
 }
 
 /// react to the freshly parsed request
 void
 Ftp::Server::doProcessRequest()
 {
     // zero pipelinePrefetchMax() ensures that there is only parsed request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
+    ClientSocketContext::Pointer context = pipeline.front();
+    Must(context != nullptr);
 
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
 
     HttpRequest *const request = http->request;
     Must(http->storeEntry() || request);
     const bool mayForward = !http->storeEntry() && handleRequest(request);
 
     if (http->storeEntry() != NULL) {
         debugs(33, 4, "got an immediate response");
         clientSetKeepaliveFlag(http);
         context->pullData();
     } else if (mayForward) {
         debugs(33, 4, "forwarding request to server side");
         assert(http->storeEntry() == NULL);
         clientProcessRequest(this, Http1::RequestParserPointer(), context.getRaw());
     } else {
         debugs(33, 4, "will resume processing later");
     }
 }
 
 void
 Ftp::Server::processParsedRequest(ClientSocketContext *)
 {
-    Must(getConcurrentRequestCount() == 1);
+    Must(pipeline.count() == 1);
 
     // Process FTP request asynchronously to make sure FTP
     // data connection accept callback is fired first.
     CallJobHere(33, 4, CbcPointer<Server>(this),
                 Ftp::Server, doProcessRequest);
 }
 
 /// imports more upload data from the data connection
 void
 Ftp::Server::readUploadData(const CommIoCbParams &io)
 {
     debugs(33, 5, io.conn << " size " << io.size);
     Must(reader != NULL);
     reader = NULL;
 
     assert(Comm::IsConnOpen(dataConn));
     assert(io.conn->fd == dataConn->fd);
 
     if (io.flag == Comm::OK && bodyPipe != NULL) {
         if (io.size > 0) {
@@ -271,42 +271,42 @@
         clientStartListeningOn(s, subCall, Ipc::fdnFtpSocket);
     }
 }
 
 void
 Ftp::StopListening()
 {
     for (AnyP::PortCfgPointer s = FtpPortList; s != NULL; s = s->next) {
         if (s->listenConn != NULL) {
             debugs(1, DBG_IMPORTANT, "Closing FTP port " << s->listenConn->local);
             s->listenConn->close();
             s->listenConn = NULL;
         }
     }
 }
 
 void
 Ftp::Server::notePeerConnection(Comm::ConnectionPointer conn)
 {
     // find request
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
+    ClientSocketContext::Pointer context = pipeline.front();
+    Must(context != nullptr);
     ClientHttpRequest *const http = context->http;
     Must(http != NULL);
     HttpRequest *const request = http->request;
     Must(request != NULL);
 
     // this is not an idle connection, so we do not want I/O monitoring
     const bool monitor = false;
 
     // make FTP peer connection exclusive to our request
     pinConnection(conn, request, conn->getPeer(), false, monitor);
 }
 
 void
 Ftp::Server::clientPinnedConnectionClosed(const CommCloseCbParams &io)
 {
     ConnStateData::clientPinnedConnectionClosed(io);
 
     // if the server control connection is gone, reset state to login again
     resetLogin("control connection closure");
 
@@ -744,80 +744,80 @@
         new ClientSocketContext(clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;
     tempBuffer.length = HTTP_REQBUF_SZ;
 
     ClientStreamData newServer = new clientReplyContext(http);
     ClientStreamData newClient = result;
     clientStreamInit(&http->client_stream, clientGetMoreData, clientReplyDetach,
                      clientReplyStatus, newServer, clientSocketRecipient,
                      clientSocketDetach, newClient, tempBuffer);
 
     result->flags.parsed_ok = 1;
     return result;
 }
 
 void
 Ftp::Server::handleReply(HttpReply *reply, StoreIOBuffer data)
 {
     // the caller guarantees that we are dealing with the current context only
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
+    ClientSocketContext::Pointer context = pipeline.front();
+    assert(context != nullptr);
 
     if (context->http && context->http->al != NULL &&
             !context->http->al->reply && reply) {
         context->http->al->reply = reply;
         HTTPMSGLOCK(context->http->al->reply);
     }
 
     static ReplyHandler handlers[] = {
         NULL, // fssBegin
         NULL, // fssConnected
         &Ftp::Server::handleFeatReply, // fssHandleFeat
         &Ftp::Server::handlePasvReply, // fssHandlePasv
         &Ftp::Server::handlePortReply, // fssHandlePort
         &Ftp::Server::handleDataReply, // fssHandleDataRequest
         &Ftp::Server::handleUploadReply, // fssHandleUploadRequest
         &Ftp::Server::handleEprtReply,// fssHandleEprt
         &Ftp::Server::handleEpsvReply,// fssHandleEpsv
         NULL, // fssHandleCwd
         NULL, // fssHandlePass
         NULL, // fssHandleCdup
         &Ftp::Server::handleErrorReply // fssError
     };
     try {
         const Server &server = dynamic_cast<const Ftp::Server&>(*context->getConn());
         if (const ReplyHandler handler = handlers[server.master->serverState])
             (this->*handler)(reply, data);
         else
             writeForwardedReply(reply);
     } catch (const std::exception &e) {
         callException(e);
         throw TexcHere(e.what());
     }
 }
 
 void
 Ftp::Server::handleFeatReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support FEAT", reply);
         return;
     }
 
     Must(reply);
     HttpReply::Pointer featReply = Ftp::HttpReplyWrapper(211, "End", Http::scNoContent, 0);
     HttpHeader const &serverReplyHeader = reply->header;
 
     HttpHeaderPos pos = HttpHeaderInitPos;
     bool hasEPRT = false;
     bool hasEPSV = false;
     int prependSpaces = 1;
 
     featReply->header.putStr(Http::HdrType::FTP_PRE, "\"211-Features:\"");
     const int scode = serverReplyHeader.getInt(Http::HdrType::FTP_STATUS);
     if (scode == 211) {
         while (const HttpHeaderEntry *e = serverReplyHeader.getEntry(&pos)) {
             if (e->id == Http::HdrType::FTP_PRE) {
                 // assume RFC 2389 FEAT response format, quoted by Squid:
                 // <"> SP NAME [SP PARAMS] <">
@@ -852,81 +852,81 @@
     } // else we got a FEAT error and will only report Squid-supported features
 
     char buf[256];
     if (!hasEPRT) {
         snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPRT");
         featReply->header.putStr(Http::HdrType::FTP_PRE, buf);
     }
     if (!hasEPSV) {
         snprintf(buf, sizeof(buf), "\"%*s\"", prependSpaces + 4, "EPSV");
         featReply->header.putStr(Http::HdrType::FTP_PRE, buf);
     }
 
     featReply->header.refreshMask();
 
     writeForwardedReply(featReply.getRaw());
 }
 
 void
 Ftp::Server::handlePasvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
-    assert(context != NULL);
+    const ClientSocketContext::Pointer context(pipeline.front());
+    assert(context != nullptr);
 
     if (context->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV", reply);
         return;
     }
 
     const unsigned short localPort = listenForDataConnection();
     if (!localPort)
         return;
 
     char addr[MAX_IPSTRLEN];
     // remote server in interception setups and local address otherwise
     const Ip::Address &server = transparent() ?
                                 clientConnection->local : dataListenConn->local;
     server.toStr(addr, MAX_IPSTRLEN, AF_INET);
     addr[MAX_IPSTRLEN - 1] = '\0';
     for (char *c = addr; *c != '\0'; ++c) {
         if (*c == '.')
             *c = ',';
     }
 
     // In interception setups, we combine remote server address with a
     // local port number and hope that traffic will be redirected to us.
     // Do not use "227 =a,b,c,d,p1,p2" format or omit parens: some nf_ct_ftp
     // versions block responses that use those alternative syntax rules!
     MemBuf mb;
     mb.init();
     mb.appendf("227 Entering Passive Mode (%s,%i,%i).\r\n",
                addr,
                static_cast<int>(localPort / 256),
                static_cast<int>(localPort % 256));
     debugs(9, 3, Raw("writing", mb.buf, mb.size));
     writeReply(mb);
 }
 
 void
 Ftp::Server::handlePortReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from PORT)", reply);
         return;
     }
 
     writeCustomReply(200, "PORT successfully converted to PASV.");
 
     // and wait for RETR
 }
 
 void
 Ftp::Server::handleErrorReply(const HttpReply *reply, StoreIOBuffer)
 {
     if (!pinning.pinned) // we failed to connect to server
         uri.clear();
     // 421: we will close due to fssError
     writeErrorReply(reply, 421);
 }
 
 void
 Ftp::Server::handleDataReply(const HttpReply *reply, StoreIOBuffer data)
@@ -950,70 +950,70 @@
         writeCustomReply(425, "Data connection is not established.");
         closeDataConnection();
         return;
     }
 
     debugs(33, 7, data.length);
 
     if (data.length <= 0) {
         replyDataWritingCheckpoint(); // skip the actual write call
         return;
     }
 
     MemBuf mb;
     mb.init(data.length + 1, data.length + 1);
     mb.append(data.data, data.length);
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReplyData);
     Comm::Write(dataConn, &mb, call);
 
-    getCurrentContext()->noteSentBodyBytes(data.length);
+    pipeline.front()->noteSentBodyBytes(data.length);
 }
 
 /// called when we are done writing a chunk of the response data
 void
 Ftp::Server::wroteReplyData(const CommIoCbParams &io)
 {
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply data writing failed: " << xstrerr(io.xerrno));
         closeDataConnection();
         writeCustomReply(426, "Data connection error; transfer aborted");
         return;
     }
 
-    assert(getCurrentContext()->http);
-    getCurrentContext()->http->out.size += io.size;
+    assert(pipeline.front()->http);
+    pipeline.front()->http->out.size += io.size;
     replyDataWritingCheckpoint();
 }
 
 /// ClientStream checks after (actual or skipped) reply data writing
 void
 Ftp::Server::replyDataWritingCheckpoint()
 {
-    switch (getCurrentContext()->socketState()) {
+    switch (pipeline.front()->socketState()) {
     case STREAM_NONE:
         debugs(33, 3, "Keep going");
-        getCurrentContext()->pullData();
+        pipeline.front()->pullData();
         return;
     case STREAM_COMPLETE:
         debugs(33, 3, "FTP reply data transfer successfully complete");
         writeCustomReply(226, "Transfer complete");
         break;
     case STREAM_UNPLANNED_COMPLETE:
         debugs(33, 3, "FTP reply data transfer failed: STREAM_UNPLANNED_COMPLETE");
         writeCustomReply(451, "Server error; transfer aborted");
         break;
     case STREAM_FAILED:
         debugs(33, 3, "FTP reply data transfer failed: STREAM_FAILED");
         writeCustomReply(451, "Server error; transfer aborted");
         break;
     default:
         fatal("unreachable code");
     }
 
     closeDataConnection();
 }
 
@@ -1027,77 +1027,77 @@
 void
 Ftp::Server::writeForwardedReply(const HttpReply *reply)
 {
     Must(reply);
 
     const HttpHeader &header = reply->header;
     // adaptation and forwarding errors lack Http::HdrType::FTP_STATUS
     if (!header.has(Http::HdrType::FTP_STATUS)) {
         writeForwardedForeign(reply); // will get to Ftp::Server::wroteReply
         return;
     }
 
     typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
     AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteReply);
     writeForwardedReplyAndCall(reply, call);
 }
 
 void
 Ftp::Server::handleEprtReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Server does not support PASV (converted from EPRT)", reply);
         return;
     }
 
     writeCustomReply(200, "EPRT successfully converted to PASV.");
 
     // and wait for RETR
 }
 
 void
 Ftp::Server::handleEpsvReply(const HttpReply *reply, StoreIOBuffer)
 {
-    if (getCurrentContext()->http->request->errType != ERR_NONE) {
+    if (pipeline.front()->http->request->errType != ERR_NONE) {
         writeCustomReply(502, "Cannot connect to server", reply);
         return;
     }
 
     const unsigned short localPort = listenForDataConnection();
     if (!localPort)
         return;
 
     // In interception setups, we use a local port number and hope that data
     // traffic will be redirected to us.
     MemBuf mb;
     mb.init();
     mb.appendf("229 Entering Extended Passive Mode (|||%u|)\r\n", localPort);
 
     debugs(9, 3, Raw("writing", mb.buf, mb.size));
     writeReply(mb);
 }
 
 /// writes FTP error response with given status and reply-derived error details
 void
 Ftp::Server::writeErrorReply(const HttpReply *reply, const int scode)
 {
-    const HttpRequest *request = getCurrentContext()->http->request;
+    const HttpRequest *request = pipeline.front()->http->request;
     assert(request);
 
     MemBuf mb;
     mb.init();
 
     if (request->errType != ERR_NONE)
         mb.appendf("%i-%s\r\n", scode, errorPageName(request->errType));
 
     if (request->errDetail > 0) {
         // XXX: > 0 may not always mean that this is an errno
         mb.appendf("%i-Error: (%d) %s\r\n", scode,
                    request->errDetail,
                    strerror(request->errDetail));
     }
 
 #if USE_ADAPTATION
     // XXX: Remove hard coded names. Use an error page template instead.
     const Adaptation::History::Pointer ah = request->adaptHistory();
     if (ah != NULL) { // XXX: add adapt::<all_h but use lastMeta here
         const String info = ah->allMeta.getByName("X-Response-Info");
@@ -1210,88 +1210,89 @@
 
     if (header.has(Http::HdrType::FTP_STATUS)) {
         const char *reason = header.getStr(Http::HdrType::FTP_REASON);
         mb.appendf("%i %s\r\n", header.getInt(Http::HdrType::FTP_STATUS),
                    (reason ? reason : 0));
     }
 }
 
 void
 Ftp::Server::wroteEarlyReply(const CommIoCbParams &io)
 {
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply writing failed: " << xstrerr(io.xerrno));
         io.conn->close();
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
-    if (context != NULL && context->http) {
+    ClientSocketContext::Pointer context = pipeline.front();
+    if (context != nullptr && context->http) {
         context->http->out.size += io.size;
         context->http->out.headers_sz += io.size;
     }
 
     flags.readMore = true;
     readSomeData();
 }
 
 void
 Ftp::Server::wroteReply(const CommIoCbParams &io)
 {
     if (io.flag == Comm::ERR_CLOSING)
         return;
 
     if (io.flag != Comm::OK) {
         debugs(33, 3, "FTP reply writing failed: " << xstrerr(io.xerrno));
         io.conn->close();
         return;
     }
 
-    ClientSocketContext::Pointer context = getCurrentContext();
+    ClientSocketContext::Pointer context = pipeline.front();
     assert(context->http);
     context->http->out.size += io.size;
     context->http->out.headers_sz += io.size;
 
     if (master->serverState == fssError) {
         debugs(33, 5, "closing on FTP server error");
         io.conn->close();
         return;
     }
 
     const clientStream_status_t socketState = context->socketState();
     debugs(33, 5, "FTP client stream state " << socketState);
     switch (socketState) {
     case STREAM_UNPLANNED_COMPLETE:
     case STREAM_FAILED:
         io.conn->close();
         return;
 
     case STREAM_NONE:
     case STREAM_COMPLETE:
         flags.readMore = true;
         changeState(fssConnected, "Ftp::Server::wroteReply");
         if (bodyParser)
             finishDechunkingRequest(false);
-        context->keepaliveNextRequest();
+        context->finished();
+        kick();
         return;
     }
 }
 
 bool
 Ftp::Server::handleRequest(HttpRequest *request)
 {
     debugs(33, 9, request);
     Must(request);
 
     HttpHeader &header = request->header;
     Must(header.has(Http::HdrType::FTP_COMMAND));
     String &cmd = header.findEntry(Http::HdrType::FTP_COMMAND)->value;
     Must(header.has(Http::HdrType::FTP_ARGUMENTS));
     String &params = header.findEntry(Http::HdrType::FTP_ARGUMENTS)->value;
 
     if (do_debug(9, 2)) {
         MemBuf mb;
         mb.init();
         request->pack(&mb);
@@ -1477,41 +1478,41 @@
 }
 
 bool
 Ftp::Server::handleDataRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
     changeState(fssHandleDataRequest, "handleDataRequest");
 
     return true;
 }
 
 bool
 Ftp::Server::handleUploadRequest(String &, String &)
 {
     if (!checkDataConnPre())
         return false;
 
     if (Config.accessList.forceRequestBodyContinuation) {
-        ClientHttpRequest *http = getCurrentContext()->http;
+        ClientHttpRequest *http = pipeline.front()->http;
         HttpRequest *request = http->request;
         ACLFilledChecklist bodyContinuationCheck(Config.accessList.forceRequestBodyContinuation, request, NULL);
         if (bodyContinuationCheck.fastCheck() == ACCESS_ALLOWED) {
             request->forcedBodyContinuation = true;
             if (checkDataConnPost()) {
                 // Write control Msg
                 writeEarlyReply(150, "Data connection opened");
                 maybeReadUploadData();
             } else {
                 // wait for acceptDataConnection but tell it to call wroteEarlyReply
                 // after writing "150 Data connection opened"
                 typedef CommCbMemFunT<Server, CommIoCbParams> Dialer;
                 AsyncCall::Pointer call = JobCallback(33, 5, Dialer, this, Ftp::Server::wroteEarlyReply);
                 onDataAcceptCall = call;
             }
         }
     }
 
     changeState(fssHandleUploadRequest, "handleDataRequest");
 
@@ -1581,41 +1582,41 @@
 
 bool
 Ftp::Server::handlePassRequest(String &, String &)
 {
     changeState(fssHandlePass, "handlePassRequest");
     return true;
 }
 
 bool
 Ftp::Server::handleCdupRequest(String &, String &)
 {
     changeState(fssHandleCdup, "handleCdupRequest");
     return true;
 }
 
 // Convert user PORT, EPRT, PASV, or EPSV data command to Squid PASV command.
 // Squid FTP client decides what data command to use with peers.
 void
 Ftp::Server::setDataCommand()
 {
-    ClientHttpRequest *const http = getCurrentContext()->http;
+    ClientHttpRequest *const http = pipeline.front()->http;
     assert(http != NULL);
     HttpRequest *const request = http->request;
     assert(request != NULL);
     HttpHeader &header = request->header;
     header.delById(Http::HdrType::FTP_COMMAND);
     header.putStr(Http::HdrType::FTP_COMMAND, "PASV");
     header.delById(Http::HdrType::FTP_ARGUMENTS);
     header.putStr(Http::HdrType::FTP_ARGUMENTS, "");
     debugs(9, 5, "client data command converted to fake PASV");
 }
 
 /// check that client data connection is ready for future I/O or at least
 /// has a chance of becoming ready soon.
 bool
 Ftp::Server::checkDataConnPre()
 {
     if (Comm::IsConnOpen(dataConn))
         return true;
 
     if (Comm::IsConnOpen(dataListenConn)) {
@@ -1647,56 +1648,56 @@
 Ftp::Server::checkDataConnPost() const
 {
     if (!Comm::IsConnOpen(dataConn)) {
         debugs(33, 3, "missing client data conn: " << dataConn);
         return false;
     }
     return true;
 }
 
 /// Done establishing a data connection to the user.
 void
 Ftp::Server::connectedForData(const CommConnectCbParams &params)
 {
     connector = NULL;
 
     if (params.flag != Comm::OK) {
         /* it might have been a timeout with a partially open link */
         if (params.conn != NULL)
             params.conn->close();
         setReply(425, "Cannot open data connection.");
-        ClientSocketContext::Pointer context = getCurrentContext();
+        ClientSocketContext::Pointer context = pipeline.front();
         Must(context->http);
         Must(context->http->storeEntry() != NULL);
     } else {
         Must(dataConn == params.conn);
         Must(Comm::IsConnOpen(params.conn));
         fd_note(params.conn->fd, "active client ftp data");
     }
 
     doProcessRequest();
 }
 
 void
 Ftp::Server::setReply(const int code, const char *msg)
 {
-    ClientSocketContext::Pointer context = getCurrentContext();
+    ClientSocketContext::Pointer context = pipeline.front();
     ClientHttpRequest *const http = context->http;
     assert(http != NULL);
     assert(http->storeEntry() == NULL);
 
     HttpReply *const reply = Ftp::HttpReplyWrapper(code, msg, Http::scNoContent, 0);
 
     setLogUri(http, urlCanonicalClean(http->request));
 
     clientStreamNode *const node = context->getClientReplyContext();
     clientReplyContext *const repContext =
         dynamic_cast<clientReplyContext *>(node->data.getRaw());
     assert(repContext != NULL);
 
     RequestFlags reqFlags;
     reqFlags.cachable = false; // force releaseRequest() in storeCreateEntry()
     reqFlags.noCache = true;
     repContext->createStoreEntry(http->request->method, reqFlags);
     http->storeEntry()->replaceHttpReply(reply);
 }
 

=== modified file 'src/servers/Http1Server.cc'
--- src/servers/Http1Server.cc	2015-11-07 12:08:33 +0000
+++ src/servers/Http1Server.cc	2015-11-15 13:03:14 +0000
@@ -225,78 +225,78 @@
                 const AsyncCall::Pointer cb = asyncCall(11, 3,  "Http1::Server::proceedAfterBodyContinuation", CbDialer(this, &Http1::Server::proceedAfterBodyContinuation, ClientSocketContext::Pointer(context)));
                 sendControlMsg(HttpControlMsg(rep, cb));
                 return;
             }
         }
     }
     clientProcessRequest(this, parser_, context);
 }
 
 void
 Http::One::Server::noteBodyConsumerAborted(BodyPipe::Pointer ptr)
 {
     ConnStateData::noteBodyConsumerAborted(ptr);
     stopReceiving("virgin request body consumer aborted"); // closes ASAP
 }
 
 void
 Http::One::Server::handleReply(HttpReply *rep, StoreIOBuffer receivedData)
 {
     // the caller guarantees that we are dealing with the current context only
-    ClientSocketContext::Pointer context = getCurrentContext();
-    Must(context != NULL);
+    ClientSocketContext::Pointer context = pipeline.front();
+    Must(context != nullptr);
     const ClientHttpRequest *http = context->http;
     Must(http != NULL);
 
     // After sending Transfer-Encoding: chunked (at least), always send
     // the last-chunk if there was no error, ignoring responseFinishedOrFailed.
     const bool mustSendLastChunk = http->request->flags.chunkedReply &&
                                    !http->request->flags.streamError &&
                                    !EBIT_TEST(http->storeEntry()->flags, ENTRY_BAD_LENGTH) &&
                                    !context->startOfOutput();
     const bool responseFinishedOrFailed = !rep &&
                                           !receivedData.data &&
                                           !receivedData.length;
     if (responseFinishedOrFailed && !mustSendLastChunk) {
         context->writeComplete(context->clientConnection, NULL, 0, Comm::OK);
         return;
     }
 
     if (!context->startOfOutput()) {
         context->sendBody(rep, receivedData);
         return;
     }
 
     assert(rep);
     http->al->reply = rep;
     HTTPMSGLOCK(http->al->reply);
     context->sendStartOfMessage(rep, receivedData);
 }
 
 void
 Http::One::Server::writeControlMsgAndCall(ClientSocketContext *context, HttpReply *rep, AsyncCall::Pointer &call)
 {
     // apply selected clientReplyContext::buildReplyHeader() mods
     // it is not clear what headers are required for control messages
     rep->header.removeHopByHopEntries();
     rep->header.putStr(Http::HdrType::CONNECTION, "keep-alive");
-    httpHdrMangleList(&rep->header, getCurrentContext()->http->request, ROR_REPLY);
+    httpHdrMangleList(&rep->header, pipeline.front()->http->request, ROR_REPLY);
 
     MemBuf *mb = rep->pack();
 
     debugs(11, 2, "HTTP Client " << clientConnection);
     debugs(11, 2, "HTTP Client CONTROL MSG:\n---------\n" << mb->buf << "\n----------");
 
     Comm::Write(context->clientConnection, mb, call);
 
     delete mb;
 }
 
 ConnStateData *
 Http::NewServer(MasterXactionPointer &xact)
 {
     return new Http1::Server(xact, false);
 }
 
 ConnStateData *
 Https::NewServer(MasterXactionPointer &xact)
 {

=== modified file 'src/servers/Server.cc'
--- src/servers/Server.cc	2015-11-07 12:12:13 +0000
+++ src/servers/Server.cc	2015-11-15 11:10:03 +0000
@@ -1,30 +1,31 @@
 /*
  * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "anyp/PortCfg.h"
+#include "client_side.h"
 #include "comm.h"
 #include "comm/Read.h"
 #include "Debug.h"
 #include "fd.h"
 #include "fde.h"
 #include "MasterXaction.h"
 #include "servers/Server.h"
 #include "SquidConfig.h"
 #include "StatCounters.h"
 #include "tools.h"
 
 Server::Server(const MasterXaction::Pointer &xact) :
     AsyncJob("::Server"), // kids overwrite
     clientConnection(xact->tcpClient),
     transferProtocol(xact->squidPort->transport),
     port(xact->squidPort),
     receivedFirstByte_(false)
 {}
 
 bool
@@ -150,41 +151,41 @@
             clientConnection->close();
             return;
         }
 
         /* It might be half-closed, we can't tell */
         fd_table[io.conn->fd].flags.socket_eof = true;
         commMarkHalfClosed(io.conn->fd);
         fd_note(io.conn->fd, "half-closed");
 
         /* There is one more close check at the end, to detect aborted
          * (partial) requests. At this point we can't tell if the request
          * is partial.
          */
 
         /* Continue to process previously read data */
         break;
 
     // case Comm::COMM_ERROR:
     default: // no other flags should ever occur
         debugs(33, 2, io.conn << ": got flag " << rd.flag << "; " << xstrerr(rd.xerrno));
-        notifyAllContexts(rd.xerrno);
+        pipeline.terminateAll(rd.xerrno);
         io.conn->close();
         return;
     }
 
     afterClientRead();
 }
 
 void
 Server::clientWriteDone(const CommIoCbParams &io)
 {
     debugs(33,5, io.conn);
     Must(writer != NULL);
     writer = NULL;
 
     /* Bail out quickly on Comm::ERR_CLOSING - close handlers will tidy up */
     if (io.flag == Comm::ERR_CLOSING) {
         debugs(33,5, io.conn << " closing Bailout.");
         return;
     }
 

=== modified file 'src/servers/Server.h'
--- src/servers/Server.h	2015-11-07 12:12:13 +0000
+++ src/servers/Server.h	2015-11-15 11:10:19 +0000
@@ -1,59 +1,57 @@
 /*
  * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 /* DEBUG: section 33    Client-side Routines */
 
 #ifndef SQUID_SERVERS_SERVER_H
 #define SQUID_SERVERS_SERVER_H
 
 #include "anyp/forward.h"
 #include "anyp/ProtocolVersion.h"
 #include "base/AsyncJob.h"
 #include "BodyPipe.h"
 #include "comm/forward.h"
 #include "CommCalls.h"
+#include "Pipeline.h"
 #include "SBuf.h"
 
 /**
  * Common base for all Server classes used
  * to manage connections from clients.
  */
 class Server : virtual public AsyncJob, public BodyProducer
 {
 public:
     Server(const MasterXaction::Pointer &xact);
     virtual ~Server() {}
 
     /* AsyncJob API */
     virtual void start();
     virtual bool doneAll() const;
     virtual void swanSong();
 
-    /// tell all active contexts on a connection about an error
-    virtual void notifyAllContexts(const int xerrno) = 0;
-
     /// ??
     virtual bool connFinishedWithConn(int size) = 0;
 
     /// processing to be done after a Comm::Read()
     virtual void afterClientRead() = 0;
 
     /// maybe grow the inBuf and schedule Comm::Read()
     void readSomeData();
 
     /**
      * called when new request data has been read from the socket
      *
      * \retval false called comm_close or setReplyToError (the caller should bail)
      * \retval true  we did not call comm_close or setReplyToError
      */
     virtual bool handleReadData() = 0;
 
     /// whether Comm::Read() is scheduled
     bool reading() const {return reader != NULL;}
 
@@ -79,30 +77,33 @@
     bool maybeMakeSpaceAvailable();
 
     // Client TCP connection details from comm layer.
     Comm::ConnectionPointer clientConnection;
 
     /**
      * The transfer protocol currently being spoken on this connection.
      * HTTP/1.x CONNECT, HTTP/1.1 Upgrade and HTTP/2 SETTINGS offer the
      * ability to change protocols on the fly.
      */
     AnyP::ProtocolVersion transferProtocol;
 
     /// Squid listening port details where this connection arrived.
     AnyP::PortCfgPointer port;
 
     /// read I/O buffer for the client connection
     SBuf inBuf;
 
     bool receivedFirstByte_; ///< true if at least one byte received on this connection
 
+    /// set of requests waiting to be serviced
+    Pipeline pipeline;
+
 protected:
     void doClientRead(const CommIoCbParams &io);
     void clientWriteDone(const CommIoCbParams &io);
 
     AsyncCall::Pointer reader; ///< set when we are reading
     AsyncCall::Pointer writer; ///< set when we are writing
 };
 
 #endif /* SQUID_SERVERS_SERVER_H */
 

=== modified file 'src/stat.cc'
--- src/stat.cc	2015-11-07 12:08:33 +0000
+++ src/stat.cc	2015-11-15 09:01:18 +0000
@@ -1845,42 +1845,41 @@
     char buf[MAX_IPSTRLEN];
 
     for (i = ClientActiveRequests.head; i; i = i->next) {
         const char *p = NULL;
         http = static_cast<ClientHttpRequest *>(i->data);
         assert(http);
         ConnStateData * conn = http->getConn();
         storeAppendPrintf(s, "Connection: %p\n", conn);
 
         if (conn != NULL) {
             const int fd = conn->clientConnection->fd;
             storeAppendPrintf(s, "\tFD %d, read %" PRId64 ", wrote %" PRId64 "\n", fd,
                               fd_table[fd].bytes_read, fd_table[fd].bytes_written);
             storeAppendPrintf(s, "\tFD desc: %s\n", fd_table[fd].desc);
             storeAppendPrintf(s, "\tin: buf %p, used %ld, free %ld\n",
                               conn->inBuf.rawContent(), (long int) conn->inBuf.length(), (long int) conn->inBuf.spaceSize());
             storeAppendPrintf(s, "\tremote: %s\n",
                               conn->clientConnection->remote.toUrl(buf,MAX_IPSTRLEN));
             storeAppendPrintf(s, "\tlocal: %s\n",
                               conn->clientConnection->local.toUrl(buf,MAX_IPSTRLEN));
-            storeAppendPrintf(s, "\tnrequests: %d\n",
-                              conn->nrequests);
+            storeAppendPrintf(s, "\tnrequests: %u\n", conn->pipeline.nrequests);
         }
 
         storeAppendPrintf(s, "uri %s\n", http->uri);
         storeAppendPrintf(s, "logType %s\n", http->logType.c_str());
         storeAppendPrintf(s, "out.offset %ld, out.size %lu\n",
                           (long int) http->out.offset, (unsigned long int) http->out.size);
         storeAppendPrintf(s, "req_sz %ld\n", (long int) http->req_sz);
         e = http->storeEntry();
         storeAppendPrintf(s, "entry %p/%s\n", e, e ? e->getMD5Text() : "N/A");
         storeAppendPrintf(s, "start %ld.%06d (%f seconds ago)\n",
                           (long int) http->al->cache.start_time.tv_sec,
                           (int) http->al->cache.start_time.tv_usec,
                           tvSubDsec(http->al->cache.start_time, current_time));
 #if USE_AUTH
         if (http->request->auth_user_request != NULL)
             p = http->request->auth_user_request->username();
         else
 #endif
             if (http->request->extacl_user.size() > 0) {
                 p = http->request->extacl_user.termedBuf();

=== modified file 'src/tests/stub_client_side.cc'
--- src/tests/stub_client_side.cc	2015-11-07 12:08:33 +0000
+++ src/tests/stub_client_side.cc	2015-11-17 03:30:09 +0000
@@ -1,66 +1,60 @@
 /*
  * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
 #include "client_side.h"
 
 #define STUB_API "client_side.cc"
 #include "tests/STUB.h"
 
 //ClientSocketContext::ClientSocketContext(const ConnectionPointer&, ClientHttpRequest*) STUB
 //ClientSocketContext::~ClientSocketContext() STUB
 bool ClientSocketContext::startOfOutput() const STUB_RETVAL(false)
 void ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, Comm::Flag errflag) STUB
-void ClientSocketContext::keepaliveNextRequest() STUB
 void ClientSocketContext::pullData() STUB
 int64_t ClientSocketContext::getNextRangeOffset() const STUB_RETVAL(0)
 bool ClientSocketContext::canPackMoreRanges() const STUB_RETVAL(false)
 clientStream_status_t ClientSocketContext::socketState() STUB_RETVAL(STREAM_NONE)
 void ClientSocketContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) STUB
 void ClientSocketContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData) STUB
 size_t ClientSocketContext::lengthToSend(Range<int64_t> const &available) STUB_RETVAL(0)
 void ClientSocketContext::noteSentBodyBytes(size_t) STUB
 void ClientSocketContext::buildRangeHeader(HttpReply * rep) STUB
 clientStreamNode * ClientSocketContext::getTail() const STUB_RETVAL(NULL)
 clientStreamNode * ClientSocketContext::getClientReplyContext() const STUB_RETVAL(NULL)
-void ClientSocketContext::connIsFinished() STUB
-void ClientSocketContext::removeFromConnectionList(ConnStateData * conn) STUB
+void ClientSocketContext::finished() STUB
 void ClientSocketContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData) STUB
 bool ClientSocketContext::multipartRangeRequest() const STUB_RETVAL(false)
 void ClientSocketContext::registerWithConn() STUB
 void ClientSocketContext::noteIoError(const int xerrno) STUB
 void ClientSocketContext::writeControlMsg(HttpControlMsg &msg) STUB
 
-bool ConnStateData::areAllContextsForThisConnection() const STUB_RETVAL(false)
-void ConnStateData::freeAllContexts() STUB
-void ConnStateData::notifyAllContexts(const int xerrno) STUB
 bool ConnStateData::clientParseRequests() STUB_RETVAL(false)
 void ConnStateData::readNextRequest() STUB
-void ConnStateData::addContextToQueue(ClientSocketContext * context) STUB
-int ConnStateData::getConcurrentRequestCount() const STUB_RETVAL(0)
 bool ConnStateData::isOpen() const STUB_RETVAL(false)
+void ConnStateData::kick() STUB
 void ConnStateData::sendControlMsg(HttpControlMsg msg) STUB
 int64_t ConnStateData::mayNeedToReadMoreBody() const STUB_RETVAL(0)
 #if USE_AUTH
 void ConnStateData::setAuth(const Auth::UserRequest::Pointer &aur, const char *cause) STUB
 #endif
 bool ConnStateData::transparent() const STUB_RETVAL(false)
 void ConnStateData::stopReceiving(const char *error) STUB
 void ConnStateData::stopSending(const char *error) STUB
 void ConnStateData::expectNoForwarding() STUB
 void ConnStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer) STUB
 void ConnStateData::noteBodyConsumerAborted(BodyPipe::Pointer) STUB
 bool ConnStateData::handleReadData() STUB_RETVAL(false)
 bool ConnStateData::handleRequestBodyData() STUB_RETVAL(false)
 void ConnStateData::pinConnection(const Comm::ConnectionPointer &pinServerConn, HttpRequest *request, CachePeer *peer, bool auth, bool monitor) STUB
 void ConnStateData::unpinConnection(const bool andClose) STUB
 const Comm::ConnectionPointer ConnStateData::validatePinnedConnection(HttpRequest *request, const CachePeer *peer) STUB_RETVAL(NULL)
 void ConnStateData::clientPinnedConnectionClosed(const CommCloseCbParams &io) STUB
 void ConnStateData::connStateClosed(const CommCloseCbParams &io) STUB
 void ConnStateData::requestTimeout(const CommTimeoutCbParams &params) STUB
 void ConnStateData::swanSong() STUB

=== modified file 'src/tunnel.cc'
--- src/tunnel.cc	2015-11-09 21:38:44 +0000
+++ src/tunnel.cc	2015-11-17 10:06:53 +0000
@@ -225,59 +225,67 @@
 static CLCB tunnelClientClosed;
 static CTCB tunnelTimeout;
 static PSC tunnelPeerSelectComplete;
 static EVH tunnelDelayedClientRead;
 static EVH tunnelDelayedServerRead;
 static void tunnelConnected(const Comm::ConnectionPointer &server, void *);
 static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &server, void *);
 
 static void
 tunnelServerClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->server.conn);
     tunnelState->server.conn = NULL;
     tunnelState->server.writer = NULL;
 
     if (tunnelState->request != NULL)
         tunnelState->request->hier.stopPeerClock(false);
 
     if (tunnelState->noConnections()) {
+        // ConnStateData pipeline should contain the CONNECT we are performing
+        auto ctx = tunnelState->http->getConn()->pipeline.front();
+        if (ctx != nullptr)
+            ctx->finished();
         delete tunnelState;
         return;
     }
 
     if (!tunnelState->client.writer) {
         tunnelState->client.conn->close();
         return;
     }
 }
 
 static void
 tunnelClientClosed(const CommCloseCbParams &params)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)params.data;
     debugs(26, 3, HERE << tunnelState->client.conn);
     tunnelState->client.conn = NULL;
     tunnelState->client.writer = NULL;
 
     if (tunnelState->noConnections()) {
+        // ConnStateData pipeline should contain the CONNECT we are performing
+        auto ctx = tunnelState->http->getConn()->pipeline.front();
+        if (ctx != nullptr)
+            ctx->finished();
         delete tunnelState;
         return;
     }
 
     if (!tunnelState->server.writer) {
         tunnelState->server.conn->close();
         return;
     }
 }
 
 TunnelStateData::TunnelStateData() :
     url(NULL),
     http(),
     request(NULL),
     status_ptr(NULL),
     logTag_ptr(NULL),
     connectRespBuf(NULL),
     connectReqWriting(false),
     started(squid_curtime)
 {
@@ -1216,42 +1224,42 @@
     /* Create state structure. */
     TunnelStateData *tunnelState = NULL;
     const SBuf url(request->effectiveRequestUri());
 
     debugs(26, 3, request->method << " " << url << " " << request->http_ver);
     ++statCounter.server.all.requests;
     ++statCounter.server.other.requests;
 
     tunnelState = new TunnelStateData;
     tunnelState->url = SBufToCstring(url);
     tunnelState->request = request;
     tunnelState->server.size_ptr = NULL; //Set later if ClientSocketContext is available
 
     // Temporary static variable to store the unneeded for our case status code
     static int status_code = 0;
     tunnelState->status_ptr = &status_code;
     tunnelState->client.conn = clientConn;
 
     ConnStateData *conn;
     if ((conn = request->clientConnectionManager.get())) {
-        ClientSocketContext::Pointer context = conn->getCurrentContext();
-        if (context != NULL && context->http != NULL) {
+        ClientSocketContext::Pointer context = conn->pipeline.front());
+        if (context != nullptr && context->http != nullptr) {
             tunnelState->logTag_ptr = &context->http->logType;
             tunnelState->server.size_ptr = &context->http->out.size;
 
 #if USE_DELAY_POOLS
             /* no point using the delayIsNoDelay stuff since tunnel is nice and simple */
             if (srvConn->getPeer() && srvConn->getPeer()->options.no_delay)
                 tunnelState->server.setDelayId(DelayId::DelayClient(context->http));
 #endif
         }
     }
 
     comm_add_close_handler(tunnelState->client.conn->fd,
                            tunnelClientClosed,
                            tunnelState);
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
     commSetConnTimeout(tunnelState->client.conn, Config.Timeout.lifetime, timeoutCall);
     fd_table[clientConn->fd].read_method = &default_read_method;
     fd_table[clientConn->fd].write_method = &default_write_method;



More information about the squid-dev mailing list