[squid-dev] [REVIEW] SBuf I/O Comm::Write API
Amos Jeffries
squid3 at treenet.co.nz
Thu Oct 23 11:42:33 UTC 2014
This patch is the proposed Comm::Write API update to handle SBuf writes.
It is PREVIEW because I have not yet altered or written any code to
actually use it yet as proof of correct operation. I intend to followup
with this as part of a larger tunnel conversion later if the design is
agreed.
It follows the same model as implented for Comm::Read:
* The code requiring a write(2) uses Comm::Write() to register interest
in writes with a callback.
* When write(2) is possible the registerd callback will be scheduled.
* The recipient of a write callback is permitted to use Comm::WriteNow()
to perform a synchronous write(2) operation (or not, if it chooses).
* The results of Comm::WriteNow() call are:
- a Comm::Flag indicating OK/ERROR/INPROGRESS - where INPROGRESS
represents delay pool preventing write(2), or some minor error requiring
a retry,
- the buffer passed for immediate writing is consume()'d by the amount
actually written (which may leave it with remaining bytes).
* Comm::MonitorsWrite(fd) test is provided to check if a specific FD is
already active with a waiting callback.
Amos
-------------- next part --------------
=== modified file 'src/comm/Write.cc'
--- src/comm/Write.cc 2014-09-13 13:59:43 +0000
+++ src/comm/Write.cc 2014-10-23 09:42:40 +0000
@@ -1,162 +1,264 @@
/*
* Copyright (C) 1996-2014 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#include "squid.h"
+#include "comm/comm_internal.h"
#include "comm/Connection.h"
#include "comm/IoCallback.h"
+#include "comm/Loops.h"
#include "comm/Write.h"
#include "fd.h"
#include "fde.h"
#include "globals.h"
#include "MemBuf.h"
#include "profiler/Profiler.h"
#include "SquidTime.h"
#include "StatCounters.h"
#if USE_DELAY_POOLS
#include "ClientInfo.h"
#endif
#include <cerrno>
+// Does comm check this fd for write readiness?
+// Note that when comm is not monitoring, there can be a pending callback
+// call, which may resume comm monitoring once fired.
+bool
+Comm::MonitorsWrite(int fd)
+{
+ assert(isOpen(fd) && COMMIO_FD_WRITECB(fd));
+ // Being active is usually the same as monitoring because we always
+ // start monitoring the FD when we configure Comm::IoCallback for I/O
+ // and we usually configure Comm::IoCallback for I/O when we starting
+ // monitoring a FD for writing.
+ return COMMIO_FD_WRITECB(fd)->active();
+}
+
+static void
+handleWriteNow(int fd, void *)
+{
+ PROF_start(commHandleWrite);
+ ScheduleCallHere(COMMIO_FD_WRITECB(fd)->callback);
+ PROF_stop(commHandleWrite);
+}
+
+void
+Comm::Write(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback)
+{
+ debugs(5, 5, conn << ": sz (undefined): asynCall " << callback);
+
+ /* Make sure we are open, not closing, and not writing */
+ assert(fd_table[conn->fd].flags.open);
+ assert(!fd_table[conn->fd].closing());
+ Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
+ assert(!ccb->active());
+
+ fd_table[conn->fd].writeStart = squid_curtime;
+ ccb->conn = conn;
+ /* Queue the write */
+ ccb->setCallback(IOCB_WRITE, callback, NULL, NULL, 0);
+ SetSelect(conn->fd, COMM_SELECT_WRITE, handleWriteNow, NULL, 0);
+}
+
void
Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
{
Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc());
}
void
Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
{
debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
/* Make sure we are open, not closing, and not writing */
assert(fd_table[conn->fd].flags.open);
assert(!fd_table[conn->fd].closing());
Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd);
assert(!ccb->active());
fd_table[conn->fd].writeStart = squid_curtime;
ccb->conn = conn;
/* Queue the write */
ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
ccb->selectOrQueueWrite();
}
-/** Write to FD.
- * This function is used by the lowest level of IO loop which only has access to FD numbers.
- * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
- * Once the write has been concluded we schedule the waiting call with success/fail results.
+/**
+ * Adjust the amount to write for available delay pool bandwidth.
*/
-void
-Comm::HandleWrite(int fd, void *data)
+static int
+delayPoolAdjustWriteSz(int wanted, int fd)
{
- Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
- int len = 0;
- int nleft;
-
- assert(state->conn != NULL && state->conn->fd == fd);
-
- PROF_start(commHandleWrite);
- debugs(5, 5, HERE << state->conn << ": off " <<
- (long int) state->offset << ", sz " << (long int) state->size << ".");
-
- nleft = state->size - state->offset;
-
#if USE_DELAY_POOLS
ClientInfo * clientInfo=fd_table[fd].clientInfo;
if (clientInfo && !clientInfo->writeLimitingActive)
clientInfo = NULL; // we only care about quota limits here
if (clientInfo) {
assert(clientInfo->selectWaiting);
clientInfo->selectWaiting = false;
assert(clientInfo->hasQueue());
assert(clientInfo->quotaPeekFd() == fd);
clientInfo->quotaDequeue(); // we will write or requeue below
- if (nleft > 0) {
+ if (wanted > 0) {
const int quota = clientInfo->quotaForDequed();
if (!quota) { // if no write quota left, queue this fd
state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
clientInfo->kickQuotaQueue();
- PROF_stop(commHandleWrite);
- return;
+ return -1;
}
- const int nleft_corrected = min(nleft, quota);
- if (nleft != nleft_corrected) {
- debugs(5, 5, HERE << state->conn << " writes only " <<
- nleft_corrected << " out of " << nleft);
- nleft = nleft_corrected;
+ const int corrected = min(wanted, quota);
+ if (wanted != corrected) {
+ debugs(5, 5, "FD " << fd << " writes only " << corrected << " out of " << wanted);
+ wanted = corrected;
}
-
}
}
#endif /* USE_DELAY_POOLS */
- /* actually WRITE data */
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
- debugs(5, 5, HERE << "write() returns " << len);
+ return wanted;
+}
+
+static void
+countWritten(int fd, int len)
+{
+ fd_bytes(fd, len, FD_WRITE);
+ ++statCounter.syscalls.sock.writes;
+ // After each successful partial write,
+ // reset fde::writeStart to the current time.
+ fd_table[fd].writeStart = squid_curtime;
#if USE_DELAY_POOLS
+ ClientInfo * clientInfo=fd_table[fd].clientInfo;
+
+ if (clientInfo && !clientInfo->writeLimitingActive)
+ clientInfo = NULL; // we only care about quota limits here
+
if (clientInfo) {
if (len > 0) {
/* we wrote data - drain them from bucket */
clientInfo->bucketSize -= len;
if (clientInfo->bucketSize < 0.0) {
- debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen
+ debugs(5, DBG_IMPORTANT, "FD " << fd << " drained too much"); // should not happen
clientInfo->bucketSize = 0;
}
}
// even if we wrote nothing, we were served; give others a chance
clientInfo->kickQuotaQueue();
}
#endif /* USE_DELAY_POOLS */
+}
- fd_bytes(fd, len, FD_WRITE);
- ++statCounter.syscalls.sock.writes;
- // After each successful partial write,
- // reset fde::writeStart to the current time.
- fd_table[fd].writeStart = squid_curtime;
+/** Write to FD.
+ * This function is used by the lowest level of IO loop which only has access to FD numbers.
+ * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections.
+ * Once the write has been concluded we schedule the waiting call with success/fail results.
+ */
+void
+Comm::HandleWrite(int fd, void *data)
+{
+ Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
+ int len = 0;
+ int nleft;
+
+ assert(state->conn != NULL && state->conn->fd == fd);
+
+ PROF_start(commHandleWrite);
+ debugs(5, 5, HERE << state->conn << ": off " <<
+ (long int) state->offset << ", sz " << (long int) state->size << ".");
+
+ nleft = state->size - state->offset;
+
+ /* For legacy callers : Attempt a write */
+ // Keep in sync with Comm::WriteNow()!
+ nleft = delayPoolAdjustWriteSz(nleft, fd);
+ if (nleft < 0)
+ return;
+
+ /* actually WRITE data */
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ debugs(5, 5, HERE << "write() returns " << len);
+ countWritten(fd, len);
if (len == 0) {
/* Note we even call write if nleft == 0 */
/* We're done */
if (nleft != 0)
debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
} else if (len < 0) {
/* An error */
- if (fd_table[fd].flags.socket_eof) {
- debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
- state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
- } else if (ignoreErrno(errno)) {
+ if (!fd_table[fd].flags.socket_eof && ignoreErrno(errno)) {
debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
state->selectOrQueueWrite();
} else {
debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
state->finish(nleft ? Comm::COMM_ERROR : Comm::OK, errno);
}
} else {
/* A successful write, continue */
state->offset += len;
if (state->offset < state->size) {
/* Not done, reinstall the write handler and write some more */
state->selectOrQueueWrite();
} else {
state->finish(nleft ? Comm::OK : Comm::COMM_ERROR, errno);
}
}
PROF_stop(commHandleWrite);
}
+
+Comm::Flag
+Comm::WriteNow(CommIoCbParams ¶ms, SBuf &buf)
+{
+ int nleft = delayPoolAdjustWriteSz(buf.length(), params.fd);
+ if (nleft < 0) {
+ return (params.flag = Comm::INPROGRESS);
+ }
+
+ /* actually WRITE data */
+ errno = 0;
+ int len = FD_WRITE_METHOD(params.fd, buf.rawContent(), nleft);
+ params.xerrno = errno;
+ debugs(5, 5, "write() returns " << len);
+ countWritten(params.fd, len);
+
+ if (len == 0) {
+ /* Note we even call write if nleft == 0 */
+ /* We're done */
+ if (nleft != 0)
+ debugs(5, DBG_IMPORTANT, "FD " << params.fd << " write failure: connection closed with " << nleft << " bytes remaining.");
+
+ params.flag = (nleft ? Comm::COMM_ERROR : Comm::OK);
+
+ } else if (len < 0) {
+ /* An error */
+ if (!fd_table[params.fd].flags.socket_eof && ignoreErrno(params.xerrno)) {
+ debugs(50, 9, "FD " << params.fd << " write failure: " << xstrerror() << ".");
+ params.flag = Comm::INPROGRESS;
+ } else {
+ debugs(50, 2, "FD " << params.fd << " write failure: " << xstrerror() << ".");
+ params.flag = (nleft ? Comm::COMM_ERROR : Comm::OK);
+ }
+ } else {
+ /* A successful write, consume buffer content */
+ params.flag = Comm::OK;
+ buf.consume(len);
+ }
+
+ return params.flag;
+}
=== modified file 'src/comm/Write.h'
--- src/comm/Write.h 2014-09-13 13:59:43 +0000
+++ src/comm/Write.h 2014-10-23 04:45:25 +0000
@@ -1,42 +1,70 @@
/*
* Copyright (C) 1996-2014 The Squid Software Foundation and contributors
*
* Squid software is distributed under GPLv2+ license and includes
* contributions from numerous individuals and organizations.
* Please see the COPYING and CONTRIBUTORS files for details.
*/
#ifndef _SQUID_COMM_IOWRITE_H
#define _SQUID_COMM_IOWRITE_H
#include "base/AsyncCall.h"
+#include "comm/Flag.h"
#include "comm/forward.h"
#include "typedefs.h"
+class CommIoCbParams;
class MemBuf;
+class SBuf;
+
namespace Comm
{
/**
+ * Start monitoring for write.
+ *
+ * callback is scheduled when the write is possible,
+ * or on file descriptor close.
+ */
+void Write(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback);
+
+/// whether the FD socket is being monitored for write
+bool MonitorsWrite(int fd);
+
+/**
+ * Perform a write(2) on a connection immediately.
+ *
+ * The returned flag is also placed in params.flag.
+ *
+ * \retval Comm::OK data has been written and placed in buf, amount in params.size
+ * \retval Comm::COMM_ERROR an error occured, the code is placed in params.xerrno
+ * \retval Comm::INPROGRESS unable to write at this time, or a minor error occured
+ */
+Comm::Flag WriteNow(CommIoCbParams ¶ms, SBuf &buf);
+
+/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
*
* free_func is used to free the passed buffer when the write has completed.
+ * \deprecated use bufferless Comm::Write() API instead.
*/
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func);
/**
* Queue a write. callback is scheduled when the write
* completes, on error, or on file descriptor close.
+ * \deprecated use bufferless Comm::Write() API instead.
*/
void Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
/// Cancel the write pending on FD. No action if none pending.
void WriteCancel(const Comm::ConnectionPointer &conn, const char *reason);
// callback handler to process an FD which is available for writing.
extern PF HandleWrite;
} // namespace Comm
#endif /* _SQUID_COMM_IOWRITE_H */
More information about the squid-dev
mailing list