[squid-dev] [REVIEW] SBuf I/O Comm::Write API

Amos Jeffries squid3 at treenet.co.nz
Thu Oct 23 11:42:33 UTC 2014


This patch is the proposed Comm::Write API update to handle SBuf writes.
It is PREVIEW because I have not yet altered or written any code to
actually use it yet as proof of correct operation. I intend to followup
with this as part of a larger tunnel conversion later if the design is
agreed.


It follows the same model as implented for Comm::Read:

* The code requiring a write(2) uses Comm::Write() to register interest
in writes with a callback.

* When write(2) is possible the registerd callback will be scheduled.

* The recipient of a write callback is permitted to use Comm::WriteNow()
to perform a synchronous write(2) operation (or not, if it chooses).

* The results of Comm::WriteNow() call are:
 - a Comm::Flag indicating OK/ERROR/INPROGRESS - where INPROGRESS
represents delay pool preventing write(2), or some minor error requiring
a retry,
 - the buffer passed for immediate writing is consume()'d by the amount
actually written (which may leave it with remaining bytes).

* Comm::MonitorsWrite(fd) test is provided to check if a specific FD is
already active with a waiting callback.

Amos
-------------- next part --------------
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc	2014-09-13 13:59:43 +0000
+++ src/comm/Write.cc	2014-10-23 09:42:40 +0000
@@ -1,162 +1,264 @@
 /*
  * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #include "squid.h"
+#include "comm/comm_internal.h"
 #include "comm/Connection.h"
 #include "comm/IoCallback.h"
+#include "comm/Loops.h"
 #include "comm/Write.h"
 #include "fd.h"
 #include "fde.h"
 #include "globals.h"
 #include "MemBuf.h"
 #include "profiler/Profiler.h"
 #include "SquidTime.h"
 #include "StatCounters.h"
 #if USE_DELAY_POOLS
 #include "ClientInfo.h"
 #endif
 
 #include <cerrno>
 
+// Does comm check this fd for write readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
+bool
+Comm::MonitorsWrite(int fd)
+{
+    assert(isOpen(fd) && COMMIO_FD_WRITECB(fd));
+    // Being active is usually the same as monitoring because we always
+    // start monitoring the FD when we configure Comm::IoCallback for I/O
+    // and we usually configure Comm::IoCallback for I/O when we starting
+    // monitoring a FD for writing.
+    return COMMIO_FD_WRITECB(fd)->active();
+}
+
+static void
+handleWriteNow(int fd, void *)
+{
+    PROF_start(commHandleWrite);
+    ScheduleCallHere(COMMIO_FD_WRITECB(fd)->callback);
+    PROF_stop(commHandleWrite);
+}
+
+void
+Comm::Write(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback)
+{
+    debugs(5, 5, conn << ": sz (undefined): asynCall " << callback);
+
+    /* Make sure we are open, not closing, and not writing */
+    assert(fd_table[conn->fd].flags.open);
+    assert(!fd_table[conn->fd].closing());
+    Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+    assert(!ccb->active());
+
+    fd_table[conn->fd].writeStart = squid_curtime;
+    ccb->conn = conn;
+    /* Queue the write */
+    ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, 0);
+    SetSelect(conn->fd, COMM_SELECT_WRITE, handleWriteNow, NULL, 0);
+}
+
 void
 Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
 {
     Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
 }
 
 void
 Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
 {
     debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
 
     /* Make sure we are open, not closing, and not writing */
     assert(fd_table[conn->fd].flags.open);
     assert(!fd_table[conn->fd].closing());
     Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
     assert(!ccb->active());
 
     fd_table[conn->fd].writeStart = squid_curtime;
     ccb->conn = conn;
     /* Queue the write */
     ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
     ccb->selectOrQueueWrite();
 }
 
-/** Write to FD.
- * This function is used by the lowest level of IO loop which only has access to FD numbers.
- * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
- * Once the write has been concluded we schedule the waiting call with success/fail results.
+/**
+ * Adjust the amount to write for available delay pool bandwidth.
  */
-void
-Comm::HandleWrite(int fd, void *data)
+static int
+delayPoolAdjustWriteSz(int wanted, int fd)
 {
-    Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
-    int len = 0;
-    int nleft;
-
-    assert(state->conn != NULL && state->conn->fd == fd);
-
-    PROF_start(commHandleWrite);
-    debugs(5, 5, HERE << state->conn << ": off " <<
-           (long int) state->offset << ", sz " << (long int) state->size << ".");
-
-    nleft = state->size - state->offset;
-
 #if USE_DELAY_POOLS
     ClientInfo * clientInfo=fd_table[fd].clientInfo;
 
     if (clientInfo && !clientInfo->writeLimitingActive)
         clientInfo = NULL; // we only care about quota limits here
 
     if (clientInfo) {
         assert(clientInfo->selectWaiting);
         clientInfo->selectWaiting = false;
 
         assert(clientInfo->hasQueue());
         assert(clientInfo->quotaPeekFd() == fd);
         clientInfo->quotaDequeue(); // we will write or requeue below
 
-        if (nleft > 0) {
+        if (wanted > 0) {
             const int quota = clientInfo->quotaForDequed();
             if (!quota) {  // if no write quota left, queue this fd
                 state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
                 clientInfo->kickQuotaQueue();
-                PROF_stop(commHandleWrite);
-                return;
+                return -1;
             }
 
-            const int nleft_corrected = min(nleft, quota);
-            if (nleft != nleft_corrected) {
-                debugs(5, 5, HERE << state->conn << " writes only " <<
-                       nleft_corrected << " out of " << nleft);
-                nleft = nleft_corrected;
+            const int corrected = min(wanted, quota);
+            if (wanted != corrected) {
+                debugs(5, 5, "FD " << fd << " writes only " << corrected << " out of " << wanted);
+                wanted = corrected;
             }
-
         }
     }
 #endif /* USE_DELAY_POOLS */
 
-    /* actually WRITE data */
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
-    debugs(5, 5, HERE << "write() returns " << len);
+    return wanted;
+}
+
+static void
+countWritten(int fd, int len)
+{
+    fd_bytes(fd, len, FD_WRITE);
+    ++statCounter.syscalls.sock.writes;
+    // After each successful partial write,
+    // reset fde::writeStart to the current time.
+    fd_table[fd].writeStart = squid_curtime;
 
 #if USE_DELAY_POOLS
+    ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+    if (clientInfo && !clientInfo->writeLimitingActive)
+        clientInfo = NULL; // we only care about quota limits here
+
     if (clientInfo) {
         if (len > 0) {
             /* we wrote data - drain them from bucket */
             clientInfo->bucketSize -= len;
             if (clientInfo->bucketSize < 0.0) {
-                debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
+                debugs(5, DBG_IMPORTANT, "FD " << fd << " drained too much"); // should not happen
                 clientInfo->bucketSize = 0;
             }
         }
 
         // even if we wrote nothing, we were served; give others a chance
         clientInfo->kickQuotaQueue();
     }
 #endif /* USE_DELAY_POOLS */
+}
 
-    fd_bytes(fd, len, FD_WRITE);
-    ++statCounter.syscalls.sock.writes;
-    // After each successful partial write,
-    // reset fde::writeStart to the current time.
-    fd_table[fd].writeStart = squid_curtime;
+/** Write to FD.
+ * This function is used by the lowest level of IO loop which only has access to FD numbers.
+ * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
+ * Once the write has been concluded we schedule the waiting call with success/fail results.
+ */
+void
+Comm::HandleWrite(int fd, void *data)
+{
+    Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
+    int len = 0;
+    int nleft;
+
+    assert(state->conn != NULL && state->conn->fd == fd);
+
+    PROF_start(commHandleWrite);
+    debugs(5, 5, HERE << state->conn << ": off " <<
+           (long int) state->offset << ", sz " << (long int) state->size << ".");
+
+    nleft = state->size - state->offset;
+
+    /* For legacy callers : Attempt a write */
+    // Keep in sync with Comm::WriteNow()!
+    nleft = delayPoolAdjustWriteSz(nleft, fd);
+    if (nleft < 0)
+        return;
+
+    /* actually WRITE data */
+    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    debugs(5, 5, HERE << "write() returns " << len);
+    countWritten(fd, len);
 
     if (len == 0) {
         /* Note we even call write if nleft == 0 */
         /* We're done */
         if (nleft != 0)
             debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
 
         state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
     } else if (len < 0) {
         /* An error */
-        if (fd_table[fd].flags.socket_eof) {
-            debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
-            state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
-        } else if (ignoreErrno(errno)) {
+        if (!fd_table[fd].flags.socket_eof && ignoreErrno(errno)) {
             debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
             state->selectOrQueueWrite();
         } else {
             debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
             state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
         }
     } else {
         /* A successful write, continue */
         state->offset += len;
 
         if (state->offset < state->size) {
             /* Not done, reinstall the write handler and write some more */
             state->selectOrQueueWrite();
         } else {
             state->finish(nleft ? Comm::OK : Comm::COMM_ERROR, errno);
         }
     }
 
     PROF_stop(commHandleWrite);
 }
+
+Comm::Flag
+Comm::WriteNow(CommIoCbParams &params, SBuf &buf)
+{
+    int nleft = delayPoolAdjustWriteSz(buf.length(), params.fd);
+    if (nleft < 0) {
+        return (params.flag = Comm::INPROGRESS);
+    }
+
+    /* actually WRITE data */
+    errno = 0;
+    int len = FD_WRITE_METHOD(params.fd, buf.rawContent(), nleft);
+    params.xerrno = errno;
+    debugs(5, 5, "write() returns " << len);
+    countWritten(params.fd, len);
+
+    if (len == 0) {
+        /* Note we even call write if nleft == 0 */
+        /* We're done */
+        if (nleft != 0)
+            debugs(5, DBG_IMPORTANT, "FD " << params.fd << " write failure: connection closed with " << nleft << " bytes remaining.");
+
+        params.flag = (nleft ? Comm::COMM_ERROR : Comm::OK);
+
+    } else if (len < 0) {
+        /* An error */
+        if (!fd_table[params.fd].flags.socket_eof && ignoreErrno(params.xerrno)) {
+            debugs(50, 9, "FD " << params.fd << " write failure: " << xstrerror() << ".");
+            params.flag = Comm::INPROGRESS;
+        } else {
+            debugs(50, 2, "FD " << params.fd << " write failure: " << xstrerror() << ".");
+            params.flag = (nleft ? Comm::COMM_ERROR : Comm::OK);
+        }
+    } else {
+        /* A successful write, consume buffer content */
+        params.flag = Comm::OK;
+        buf.consume(len);
+    }
+
+    return params.flag;
+}

=== modified file 'src/comm/Write.h'
--- src/comm/Write.h	2014-09-13 13:59:43 +0000
+++ src/comm/Write.h	2014-10-23 04:45:25 +0000
@@ -1,42 +1,70 @@
 /*
  * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
  *
  * Squid software is distributed under GPLv2+ license and includes
  * contributions from numerous individuals and organizations.
  * Please see the COPYING and CONTRIBUTORS files for details.
  */
 
 #ifndef _SQUID_COMM_IOWRITE_H
 #define _SQUID_COMM_IOWRITE_H
 
 #include "base/AsyncCall.h"
+#include "comm/Flag.h"
 #include "comm/forward.h"
 #include "typedefs.h"
 
+class CommIoCbParams;
 class MemBuf;
+class SBuf;
+
 namespace Comm
 {
 
 /**
+ * Start monitoring for write.
+ *
+ * callback is scheduled when the write is possible,
+ * or on file descriptor close.
+ */
+void Write(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback);
+
+/// whether the FD socket is being monitored for write
+bool MonitorsWrite(int fd);
+
+/**
+ * Perform a write(2) on a connection immediately.
+ *
+ * The returned flag is also placed in params.flag.
+ *
+ * \retval Comm::OK          data has been written and placed in buf, amount in params.size
+ * \retval Comm::COMM_ERROR  an error occured, the code is placed in params.xerrno
+ * \retval Comm::INPROGRESS  unable to write at this time, or a minor error occured
+ */
+Comm::Flag WriteNow(CommIoCbParams &params, SBuf &buf);
+
+/**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
  *
  * free_func is used to free the passed buffer when the write has completed.
+ * \deprecated use bufferless Comm::Write() API instead.
  */
 void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
 
 /**
  * Queue a write. callback is scheduled when the write
  * completes, on error, or on file descriptor close.
+ * \deprecated use bufferless Comm::Write() API instead.
  */
 void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
 
 /// Cancel the write pending on FD. No action if none pending.
 void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
 
 // callback handler to process an FD which is available for writing.
 extern PF HandleWrite;
 
 } // namespace Comm
 
 #endif /* _SQUID_COMM_IOWRITE_H */



More information about the squid-dev mailing list