Browse Source

separate infinibuf.h file

David Mazieres 7 years ago
parent
commit
4a8eaa1d96
5 changed files with 2540 additions and 109 deletions
  1. 2
    2
      Makefile.am
  2. 1
    0
      doc/.gitignore
  3. 2354
    0
      doc/Doxyfile
  4. 19
    107
      infinibuf.cc
  5. 164
    0
      infinibuf.h

+ 2
- 2
Makefile.am View File

@@ -6,8 +6,8 @@ LDADD = $(BOOST_LDFLAGS) $(BOOST_IOSTREAMS_LIB) $(sqlite3_LIBS)	\
6 6
 
7 7
 bin_PROGRAMS = muchsync
8 8
 
9
-muchsync_SOURCES = fdstream.cc maildir.cc muchsync.cc protocol.cc	\
10
-	sqlite.cc xapian_sync.cc muchsync.h fdstream.h
9
+muchsync_SOURCES = fdstream.cc infinibuf.cc maildir.cc muchsync.cc	\
10
+	protocol.cc sqlite.cc xapian_sync.cc muchsync.h fdstream.h
11 11
 
12 12
 CLEANFILES = *~
13 13
 maintainer-clean-local:

+ 1
- 0
doc/.gitignore View File

@@ -0,0 +1 @@
1
+/html

+ 2354
- 0
doc/Doxyfile
File diff suppressed because it is too large
View File


+ 19
- 107
infinibuf.cc View File

@@ -1,106 +1,21 @@
1 1
 #include <array>
2 2
 #include <cassert>
3
-#include <condition_variable>
4 3
 #include <cstring>
5
-#include <deque>
6 4
 #include <iostream>
7
-#include <memory>
8
-#include <mutex>
9 5
 #include <streambuf>
10 6
 #include <unistd.h>
11 7
 #include <sys/socket.h>
12 8
 
9
+#include "infinibuf.h"
10
+
13 11
 #include <thread>
14 12
 
15 13
 using namespace std;
16 14
 
17
-class infinibuf {
18
-protected:
19
-  static constexpr int default_startpos_ = 8;
20
-  static constexpr int chunksize_ = 0x10000;
21
-
22
-  deque<char *> data_;
23
-  int gpos_;
24
-  int ppos_;
25
-  bool eof_{false};
26
-  int errno_{0};
27
-  const int startpos_;		// For putback
28
-
29
-  virtual void notempty() {}
30
-
31
-public:
32
-  explicit infinibuf(int sp = default_startpos_)
33
-    : gpos_(sp), ppos_(sp), startpos_(sp) {
34
-    data_.push_back (new char[chunksize_]);
35
-  }
36
-  infinibuf(const infinibuf &) = delete;
37
-  virtual ~infinibuf() { for (char *p : data_) delete[] p; }
38
-  infinibuf &operator= (const infinibuf &) = delete;
39
-		   
40
-  // These functions are never thread safe:
41
-
42
-  bool empty() { return data_.size() == 1 && gpos_ == ppos_; }
43
-  bool eof() { return eof_; }
44
-  int err() { return errno_; }
45
-  void err(int num) { errno_ = num; peof(); }
46
-
47
-  char *eback() { return data_.front(); }
48
-  char *gptr() { return eback() + gpos_; }
49
-  int gsize() { return (data_.size() > 1 ? chunksize_ : ppos_) - gpos_; }
50
-  char *egptr() { return gptr() + gsize(); }
51
-  void gbump(int n);
52
-  virtual void gwait() {}
53
-
54
-  char *pbase() { return data_.back(); }
55
-  char *pptr() { return pbase() + ppos_; }
56
-  int psize() { return chunksize_ - gpos_; }
57
-  char *epptr() { return pptr() + psize(); }
58
-  void pbump(int n);
59
-  void peof() { eof_ = true; if (empty()) notempty(); }
60
-
61
-  // These functions are thread safe for some subtypes:
62
-
63
-  virtual void lock() {}
64
-  virtual void unlock() {}
65
-  bool output(int fd);
66
-  bool input(int fd);
67
-
68
-  static void output_loop(shared_ptr<infinibuf> ib, int fd);
69
-  static void input_loop(shared_ptr<infinibuf> ib, int fd);
70
-};
71
-
72
-class infinibuf_infd : public infinibuf {
73
-  const int fd_;
74
-public:
75
-  explicit infinibuf_infd (int fd, int sp = default_startpos_)
76
-    : infinibuf(sp), fd_(fd) {}
77
-  void gwait() override { input(fd_); }
78
-};
79
-
80
-class infinibuf_outfd : public infinibuf {
81
-  const int fd_;
82
-public:
83
-  explicit infinibuf_outfd (int fd, int sp = default_startpos_)
84
-    : infinibuf(sp), fd_(fd) {}
85
-  void notempty() override { output(fd_); }
86
-};
87
-
88
-class infinibuf_mt : public infinibuf {
89
-  mutex m_;
90
-  condition_variable cv_;
91
-public:
92
-  explicit infinibuf_mt (int sp = default_startpos_) : infinibuf(sp) {}
93
-  void lock() override { m_.lock(); }
94
-  void unlock() override { m_.unlock(); }
95
-  void notempty() override { cv_.notify_all(); }
96
-  void gwait() override {
97
-    if (empty() && !eof()) {
98
-      unique_lock<mutex> ul (m_, adopt_lock);
99
-      cv_.wait(ul);
100
-      ul.release();
101
-    }
102
-  }
103
-};
15
+infinibuf::~infinibuf()
16
+{
17
+  for (char *p : data_) delete[] p;
18
+}
104 19
 
105 20
 void
106 21
 infinibuf::gbump(int n)
@@ -167,7 +82,7 @@ infinibuf::output(int fd)
167 82
 }
168 83
 
169 84
 bool
170
-infinibuf::input (int fd)
85
+infinibuf::input(int fd)
171 86
 {
172 87
   unique_lock<infinibuf> lk (*this);
173 88
   char *p = pptr();
@@ -195,7 +110,7 @@ infinibuf::input (int fd)
195 110
 }
196 111
 
197 112
 void
198
-infinibuf::output_loop (shared_ptr<infinibuf> ib, int fd)
113
+infinibuf::output_loop(shared_ptr<infinibuf> ib, int fd)
199 114
 {
200 115
   while (ib->output(fd)) {
201 116
     lock_guard<infinibuf> _lk (*ib);
@@ -204,24 +119,21 @@ infinibuf::output_loop (shared_ptr<infinibuf> ib, int fd)
204 119
 }
205 120
 
206 121
 void
207
-infinibuf::input_loop (shared_ptr<infinibuf> ib, int fd)
122
+infinibuf::input_loop(shared_ptr<infinibuf> ib, int fd)
208 123
 {
209 124
   while (ib->input(fd))
210 125
     ;
211 126
 }
212 127
 
213
-class infinistreambuf : public streambuf {
214
-protected:
215
-  shared_ptr<infinibuf> ib_;
216
-  int_type underflow() override;
217
-  int_type overflow(int_type ch) override;
218
-  int sync() override;
219
-public:
220
-  explicit infinistreambuf (shared_ptr<infinibuf> ib);
221
-  explicit infinistreambuf (infinibuf *ib)
222
-    : infinistreambuf(shared_ptr<infinibuf>(ib)) {}
223
-  shared_ptr<infinibuf> get_infinibuf() { return ib_; }
224
-};
128
+infinibuf_infd::~infinibuf_infd()
129
+{
130
+  close(fd_);
131
+}
132
+
133
+infinibuf_outfd::~infinibuf_outfd()
134
+{
135
+  close(fd_);
136
+}
225 137
 
226 138
 infinistreambuf::int_type
227 139
 infinistreambuf::underflow()
@@ -255,7 +167,7 @@ infinistreambuf::sync()
255 167
   return err ? -1 : 0;
256 168
 }
257 169
 
258
-infinistreambuf::infinistreambuf (shared_ptr<infinibuf> ib)
170
+infinistreambuf::infinistreambuf(shared_ptr<infinibuf> ib)
259 171
   : ib_(ib)
260 172
 {
261 173
   lock_guard<infinibuf> _lk (*ib_);

+ 164
- 0
infinibuf.h View File

@@ -0,0 +1,164 @@
1
+// -*- C++ -*-
2
+
3
+#include <condition_variable>
4
+#include <deque>
5
+#include <memory>
6
+#include <mutex>
7
+
8
+
9
+
10
+/** \file infinibuf.h
11
+ *  \brief iostreams-friendly buffers that can grow without bounds
12
+ */
13
+
14
+/**
15
+ * \brief Abstract buffer-management class for unbounded buffers.
16
+ *
17
+ * A derived class must at a minimum override either `notempty()` (for
18
+ * output buffers) or `gwait()` (for input buffers).
19
+ *
20
+ * Most methods are not thread-safe.
21
+ */
22
+class infinibuf {
23
+protected:
24
+  static constexpr int default_startpos_ = 8;
25
+  static constexpr int chunksize_ = 0x10000;
26
+
27
+  std::deque<char *> data_;
28
+  int gpos_;
29
+  int ppos_;
30
+  bool eof_{false};
31
+  int errno_{0};
32
+  const int startpos_;		// For putback
33
+
34
+  /** Called to signal when the buffer transitions from empty to
35
+   *  non-empty. */
36
+  virtual void notempty() {}
37
+
38
+public:
39
+  explicit infinibuf(int sp = default_startpos_)
40
+    : gpos_(sp), ppos_(sp), startpos_(sp) {
41
+    data_.push_back (new char[chunksize_]);
42
+  }
43
+  infinibuf(const infinibuf &) = delete;
44
+  virtual ~infinibuf() = 0;
45
+  infinibuf &operator= (const infinibuf &) = delete;
46
+		   
47
+  // These functions are never thread safe:
48
+
49
+  bool empty() { return data_.size() == 1 && gpos_ == ppos_; }
50
+  bool eof() { return eof_; }
51
+  int err() { return errno_; }
52
+  void err(int num) { errno_ = num; peof(); }
53
+
54
+  char *eback() { return data_.front(); }
55
+  char *gptr() { return eback() + gpos_; }
56
+  int gsize() { return (data_.size() > 1 ? chunksize_ : ppos_) - gpos_; }
57
+  char *egptr() { return gptr() + gsize(); }
58
+  void gbump(int n);
59
+  /** Called to wait for the buffer to be non-empty. */
60
+  virtual void gwait() {}
61
+
62
+  char *pbase() { return data_.back(); }
63
+  char *pptr() { return pbase() + ppos_; }
64
+  int psize() { return chunksize_ - gpos_; }
65
+  char *epptr() { return pptr() + psize(); }
66
+  void pbump(int n);
67
+  void peof() { eof_ = true; if (empty()) notempty(); }
68
+
69
+  // These functions are thread safe for some subtypes:
70
+
71
+  /** By default `lock()` and `unlock()` do nothing, but threadsafe
72
+   *  derived classes must override these functions. */
73
+  virtual void lock() {}
74
+  /** See comment at unlock. */
75
+  virtual void unlock() {}
76
+
77
+  /** Drain the current contents of the buffer.
78
+   * \param fd The file descriptor to write to.
79
+   * \return `false` at EOF if there is no point in ever calling
80
+   * `output` again.
81
+   * \throws runtime_error if the `write` system call fails and
82
+   * `errno` is not `EAGAIN`. */
83
+  bool output(int fd);
84
+
85
+  /** Fill the buffer from a file descriptor.
86
+   * \param fd The file descriptor to read from.
87
+   * \return `false` at EOF if there is no point in ever calling
88
+   * `output` again.
89
+   * \throws runtime_error if the `read` system call fails and
90
+   * `errno` is not `EAGAIN`. */
91
+  bool input(int fd);
92
+
93
+  static void output_loop(std::shared_ptr<infinibuf> ib, int fd);
94
+  static void input_loop(std::shared_ptr<infinibuf> ib, int fd);
95
+};
96
+
97
+/** \brief An `infinibuf` that synchronously reads from a file
98
+ *  descriptor when the buffer underflows.
99
+ *
100
+ *  Closes the file descriptor upon destruction. */
101
+class infinibuf_infd : public infinibuf {
102
+  const int fd_;
103
+public:
104
+  explicit infinibuf_infd (int fd, bool closeit = true,
105
+			 int sp = default_startpos_)
106
+    : infinibuf(sp), fd_(fd) {}
107
+  ~infinibuf_infd();
108
+  void gwait() override { input(fd_); }
109
+};
110
+
111
+/** \brief An `infinibuf` that synchronously writes to a file
112
+ *  descriptor when the buffer overflows or is synced.
113
+ *
114
+ *  Closes the file descriptor upon destruction. */
115
+class infinibuf_outfd : public infinibuf {
116
+  const int fd_;
117
+public:
118
+  explicit infinibuf_outfd (int fd)
119
+    : infinibuf(0), fd_(fd) {}
120
+  ~infinibuf_outfd();
121
+  void notempty() override { output(fd_); }
122
+};
123
+
124
+/** \brief Thread-safe infinibuf.
125
+ *
126
+ * This infinibuf can safely be used in an `iostream` by one thread,
127
+ * while a different thread fills or drains the buffer (for instance
128
+ * executing `infinibuf::output_loop` or `infinibuf::input_loop`).
129
+ */
130
+class infinibuf_mt : public infinibuf {
131
+  std::mutex m_;
132
+  std::condition_variable cv_;
133
+public:
134
+  explicit infinibuf_mt (int sp = default_startpos_) : infinibuf(sp) {}
135
+  void lock() override { m_.lock(); }
136
+  void unlock() override { m_.unlock(); }
137
+  void notempty() override { cv_.notify_all(); }
138
+  void gwait() override {
139
+    if (empty() && !eof()) {
140
+      std::unique_lock<std::mutex> ul (m_, std::adopt_lock);
141
+      cv_.wait(ul);
142
+      ul.release();
143
+    }
144
+  }
145
+};
146
+
147
+/** \brief `infinibuf`-based `streambuf`.
148
+ *
149
+ * This streambuf can make use of any buffer type derived from
150
+ * `infinibuf`.  The `infinibuf` is always converted to a
151
+ * `shared_ptr`, even if it is passed in as a raw `infinibuf*`.
152
+ */
153
+class infinistreambuf : public std::streambuf {
154
+protected:
155
+  std::shared_ptr<infinibuf> ib_;
156
+  int_type underflow() override;
157
+  int_type overflow(int_type ch) override;
158
+  int sync() override;
159
+public:
160
+  explicit infinistreambuf (std::shared_ptr<infinibuf> ib);
161
+  explicit infinistreambuf (infinibuf *ib)
162
+    : infinistreambuf(std::shared_ptr<infinibuf>(ib)) {}
163
+  std::shared_ptr<infinibuf> get_infinibuf() { return ib_; }
164
+};

Loading…
Cancel
Save