[squid-dev] [PATCH] Http::Stream ID numbering

Amos Jeffries squid3 at treenet.co.nz
Wed Feb 3 11:29:01 UTC 2016


On 24/01/2016 2:17 a.m., Amos Jeffries wrote:
> On 23/01/2016 7:59 a.m., Alex Rousskov wrote:
>> On 01/14/2016 05:53 AM, Amos Jeffries wrote:
>>
>> The renaming/moving part of the patch scope changes lots of code. It is
>> very unfortunate that you have decided to combine the above
>> functionality-preserving polishing with the functionality changes below.
>> The former makes it very difficult to see the latter. If I had enough
>> guts, I would -1 this patch for this reason alone. Renaming/moving
>> changes should have been done separately!
> 
> The Pipeline related ID changes are small and only needed for HTTP/2
> (the combo was due to being sponsored as an line-item). I can separate
> them out again.
> 
> With that Pipeline update gone there is no need for an audit of the
> shuffling according to our policy. Once we settle on the naming (above)
> are you happy for the non-logic shuffling bit to go in? and a new patch
> to audit only the Pipeline ID part?
> 

Okay, given a week with no response I merged the non-logic shuffling part.

The logic changes are in this updated patch for review:

>>
>>> Pipeline class is updated to use the ID number to manage its contents
>>> rather than Pointer value matching. It is also updated to drop the
>>> HTTP/1 specific assumptions within the Pipeline implementation. As a
>>> behavioural requirement the sequential flow is now left for the Server
>>> and ClientHttpRequest Jobs to ensure correctness.
>>
>>
>> The new Pipeline::popById() method is a bad idea IMO: Linear search is
>> wrong for both HTTP/1 and HTTP/2.
>>
> 
> Do you have a better algorithm? it needs to pop using only an ID (HTTP/2
> limit due to framing) and works on linear/list storage (optimal for
> HTTP/1 sequentialism).
> 


Amos

-------------- next part --------------
=== modified file 'src/Pipeline.cc'
--- src/Pipeline.cc	2016-01-24 17:41:43 +0000
+++ src/Pipeline.cc	2016-01-31 12:47:55 +0000
@@ -21,6 +21,7 @@
 {
     requests.push_back(c);
     ++nrequests;
+    ++nactive;
     debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c);
 }
 
@@ -49,15 +50,24 @@
 }
 
 void
-Pipeline::popMe(const Http::StreamPointer &which)
+Pipeline::popById(uint32_t which)
 {
     if (requests.empty())
         return;
 
-    debugs(33, 3, "Pipeline " << (void*)this << " drop " << requests.front());
-    // in reality there may be multiple contexts doing processing in parallel.
-    // XXX: pipeline still assumes HTTP/1 FIFO semantics are obeyed.
-    assert(which == requests.front());
-    requests.pop_front();
+    debugs(33, 3, "Pipeline " << (void*)this << " drop id=" << which);
+
+    // find the context and clear its Pointer
+    for (auto &&i : requests) {
+        if (i->id == which) {
+            i = nullptr;
+            --nactive;
+            break;
+        }
+    }
+
+    // trim closed contexts from the list head (if any)
+    while (!requests.empty() && !requests.front())
+        requests.pop_front();
 }
 

=== modified file 'src/Pipeline.h'
--- src/Pipeline.h	2016-01-24 17:41:43 +0000
+++ src/Pipeline.h	2016-01-31 12:47:55 +0000
@@ -37,7 +37,7 @@
     Pipeline & operator =(const Pipeline &) = delete;
 
 public:
-    Pipeline() : nrequests(0) {}
+    Pipeline() : nrequests(0), nactive(0) {}
     ~Pipeline() = default;
 
     /// register a new request context to the pipeline
@@ -47,7 +47,7 @@
     Http::StreamPointer front() const;
 
     /// how many requests are currently pipelined
-    size_t count() const {return requests.size();}
+    size_t count() const {return nactive;}
 
     /// whether there are none or any requests currently pipelined
     bool empty() const {return requests.empty();}
@@ -55,8 +55,8 @@
     /// tell everybody about the err, and abort all waiting requests
     void terminateAll(const int xerrno);
 
-    /// deregister the front request from the pipeline
-    void popMe(const Http::StreamPointer &);
+    /// deregister a request from the pipeline
+    void popById(uint32_t);
 
     /// Number of requests seen in this pipeline (so far).
     /// Includes incomplete transactions.
@@ -65,6 +65,10 @@
 private:
     /// requests parsed from the connection but not yet completed.
     std::list<Http::StreamPointer> requests;
+
+    /// Number of still-active streams in this pipeline (so far).
+    /// Includes incomplete transactions.
+    uint32_t nactive;
 };
 
 #endif /* SQUID_SRC_PIPELINE_H */

=== modified file 'src/client_side.cc'
--- src/client_side.cc	2016-02-01 11:52:03 +0000
+++ src/client_side.cc	2016-02-02 14:07:04 +0000
@@ -1010,7 +1010,7 @@
     http->req_sz = inBuf.length();
     http->uri = xstrdup(uri);
     setLogUri (http, uri);
-    auto *context = new Http::Stream(clientConnection, http);
+    auto *context = new Http::Stream(nextStreamId(), clientConnection, http);
     StoreIOBuffer tempBuffer;
     tempBuffer.data = context->reqbuf;
     tempBuffer.length = HTTP_REQBUF_SZ;
@@ -1356,7 +1356,7 @@
     ClientHttpRequest *http = new ClientHttpRequest(csd);
 
     http->req_sz = hp->messageHeaderSize();
-    Http::Stream *result = new Http::Stream(csd->clientConnection, http);
+    Http::Stream *result = new Http::Stream(csd->nextStreamId(), csd->clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;
@@ -1573,8 +1573,7 @@
                 // XXX: Either the context is finished() or it should stay queued.
                 // The below may leak client streams BodyPipe objects. BUT, we need
                 // to check if client-streams detatch is safe to do here (finished() will detatch).
-                assert(conn->pipeline.front() == context); // XXX: still assumes HTTP/1 semantics
-                conn->pipeline.popMe(Http::StreamPointer(context));
+                conn->pipeline.popById(context->id);
             }
             Comm::SetSelect(conn->clientConnection->fd, COMM_SELECT_READ, NULL, NULL, 0);
             conn->fakeAConnectRequest("unknown-protocol", conn->preservedClientData);

=== modified file 'src/client_side.h'
--- src/client_side.h	2016-01-24 17:41:43 +0000
+++ src/client_side.h	2016-01-31 12:47:55 +0000
@@ -69,6 +69,7 @@
     virtual ~ConnStateData();
 
     /* ::Server API */
+    virtual uint32_t nextStreamId() {return ++nextStreamId_;}
     virtual void receivedFirstByte();
     virtual bool handleReadData();
     virtual void afterClientRead();

=== modified file 'src/http/Stream.cc'
--- src/http/Stream.cc	2016-01-24 17:41:43 +0000
+++ src/http/Stream.cc	2016-01-31 12:50:20 +0000
@@ -14,7 +14,8 @@
 #include "Store.h"
 #include "TimeOrTag.h"
 
-Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
+Http::Stream::Stream(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
+    id(anId),
     clientConnection(aConn),
     http(aReq),
     reply(nullptr),
@@ -546,7 +547,7 @@
 
     assert(connRegistered_);
     connRegistered_ = false;
-    conn->pipeline.popMe(Http::StreamPointer(this));
+    conn->pipeline.popById(id);
 }
 
 /// called when we encounter a response-related error

=== modified file 'src/http/Stream.h'
--- src/http/Stream.h	2016-01-31 12:05:30 +0000
+++ src/http/Stream.h	2016-01-31 12:48:43 +0000
@@ -69,7 +69,7 @@
 
 public:
     /// construct with HTTP/1.x details
-    Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
+    Stream(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq);
     ~Stream();
 
     /// register this stream with the Server
@@ -120,6 +120,10 @@
 
     void deferRecipientForLater(clientStreamNode *, HttpReply *, StoreIOBuffer receivedData);
 
+public:
+    // NP: stream ID is relative to the connection, not global.
+    uint32_t id; ///< stream ID within the client connection.
+
 public: // HTTP/1.x state data
 
     Comm::ConnectionPointer clientConnection; ///< details about the client connection socket

=== modified file 'src/servers/FtpServer.cc'
--- src/servers/FtpServer.cc	2016-01-31 12:05:30 +0000
+++ src/servers/FtpServer.cc	2016-01-31 12:47:55 +0000
@@ -749,7 +749,7 @@
     http->uri = newUri;
 
     Http::Stream *const result =
-        new Http::Stream(clientConnection, http);
+        new Http::Stream(nextStreamId(), clientConnection, http);
 
     StoreIOBuffer tempBuffer;
     tempBuffer.data = result->reqbuf;

=== modified file 'src/servers/Server.cc'
--- src/servers/Server.cc	2016-01-24 17:41:43 +0000
+++ src/servers/Server.cc	2016-01-31 12:47:55 +0000
@@ -26,7 +26,8 @@
     clientConnection(xact->tcpClient),
     transferProtocol(xact->squidPort->transport),
     port(xact->squidPort),
-    receivedFirstByte_(false)
+    receivedFirstByte_(false),
+    nextStreamId_(0)
 {}
 
 bool

=== modified file 'src/servers/Server.h'
--- src/servers/Server.h	2016-01-24 17:21:02 +0000
+++ src/servers/Server.h	2016-01-31 12:47:55 +0000
@@ -35,6 +35,9 @@
     virtual bool doneAll() const;
     virtual void swanSong();
 
+    /// fetch the next available stream ID
+    virtual uint32_t nextStreamId() = 0;
+
     /// ??
     virtual bool connFinishedWithConn(int size) = 0;
 
@@ -117,6 +120,7 @@
     void doClientRead(const CommIoCbParams &io);
     void clientWriteDone(const CommIoCbParams &io);
 
+    uint32_t nextStreamId_;    ///< incremented as streams are initiated
     AsyncCall::Pointer reader; ///< set when we are reading
     AsyncCall::Pointer writer; ///< set when we are writing
 };



More information about the squid-dev mailing list