Synchronize notmuch mail across machines
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

infinibuf.h 11KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // -*- C++ -*-
  2. #ifndef _INFINIBUF_H_
  3. #define _INFINIBUF_H_ 1
  4. /** \file infinibuf.h
  5. * \brief iostreams-friendly buffers that can grow without bounds.
  6. */
  7. #include <condition_variable>
  8. #include <list>
  9. #include <memory>
  10. #include <thread>
  11. /**
  12. * \brief Abstract buffer-management class for unbounded buffers.
  13. *
  14. * A derived class must at a minimum override either `notempty()` (for
  15. * output buffers) or `gwait()` (for input buffers).
  16. *
  17. * Most methods are not thread-safe.
  18. */
  19. class infinibuf {
  20. protected:
  21. static constexpr int default_startpos_ = 8;
  22. static constexpr int chunksize_ = 0x10000;
  23. std::list<char *> data_;
  24. int gpos_;
  25. int ppos_;
  26. bool eof_{false};
  27. int errno_{0};
  28. const int startpos_; // For putback
  29. /** Called to signal when the buffer transitions from empty to
  30. * non-empty. */
  31. virtual void notempty() {}
  32. /** Called when sufficient bytes are consumed to free some memory. */
  33. virtual void notfull() {}
  34. public:
  35. explicit infinibuf(int sp = default_startpos_)
  36. : gpos_(sp), ppos_(sp), startpos_(sp) {
  37. data_.push_back(new char[chunksize_]);
  38. }
  39. infinibuf(const infinibuf &) = delete;
  40. virtual ~infinibuf() = 0;
  41. infinibuf &operator= (const infinibuf &) = delete;
  42. // These functions are never thread safe:
  43. bool empty() { return data_.front() == data_.back() && gpos_ == ppos_; }
  44. bool eof() { return eof_; }
  45. std::size_t buffer_size() { return data_.size() * chunksize_; }
  46. int err() { return errno_; }
  47. void err(int num) { if (!errno_) errno_ = num; peof(); }
  48. char *eback() { return data_.front(); }
  49. char *gptr() { return eback() + gpos_; }
  50. int gsize() {
  51. return (data_.front() == data_.back() ? ppos_ : chunksize_) - gpos_;
  52. }
  53. char *egptr() { return gptr() + gsize(); }
  54. void gbump(int n);
  55. /** Called to wait for the buffer to be non-empty. */
  56. virtual void gwait() {}
  57. char *pbase() { return data_.back(); }
  58. char *pptr() { return pbase() + ppos_; }
  59. int psize() { return chunksize_ - ppos_; }
  60. char *epptr() { return pptr() + psize(); }
  61. void pbump(int n);
  62. void peof() { eof_ = true; if (empty()) notempty(); }
  63. /** Called to sleep if the buffer is too full. */
  64. virtual void pwait() {}
  65. // These functions are thread safe for some subtypes:
  66. /** By default `lock()` and `unlock()` do nothing, but threadsafe
  67. * derived classes must override these functions. */
  68. virtual void lock() {}
  69. /** See comment at lock. */
  70. virtual void unlock() {}
  71. /** \brief Drain the current contents of the buffer.
  72. *
  73. * This function is thread safe and must be called *without* locking
  74. * the `infinibuf`. If the `infinibuf` is already locked, deadlock
  75. * will ensue.
  76. *
  77. * \param fd The file descriptor to write to.
  78. * \return 0 at EOF if there is no point in ever calling `output`
  79. * again, -1 after EAGAIN, and 1 after successful output.
  80. * \throws runtime_error if the `write` system call fails and
  81. * `errno` is not `EAGAIN`. */
  82. int output(int fd);
  83. /** Fill the buffer from a file descriptor.
  84. *
  85. * This function is thread safe and must be called *without* locking
  86. * the `infinibuf`.
  87. *
  88. * \param fd The file descriptor to read from.
  89. * \return 0 at EOF if there is no point in ever calling
  90. * `input` again, 1 after successful input, and -1 after EAGAIN.
  91. * \throws runtime_error if the `read` system call fails and
  92. * `errno` is not `EAGAIN`. */
  93. int input(int fd);
  94. /** Calls `output` over and over in a loop on an `infinibuf`.
  95. *
  96. * \param ib The `infinibuf` on which to call `output`.
  97. *
  98. * \param fd The file descriptor to which to write consumed data.
  99. *
  100. * \param oblocked If non-null is called with `true` whenever the
  101. * output is blocked by flow control, and then called again with
  102. * `false` when the output becomes unblocked.
  103. */
  104. static void output_loop(std::shared_ptr<infinibuf> ib, int fd,
  105. std::function<void(bool)> oblocked = nullptr);
  106. static void input_loop(std::shared_ptr<infinibuf> ib, int fd);
  107. };
  108. /** \brief An `infinibuf` that synchronously reads from a file
  109. * descriptor when the buffer underflows.
  110. *
  111. * Closes the file descriptor upon destruction. */
  112. class infinibuf_infd : public infinibuf {
  113. const int fd_;
  114. public:
  115. explicit infinibuf_infd (int fd, int sp = default_startpos_)
  116. : infinibuf(sp), fd_(fd) {}
  117. ~infinibuf_infd();
  118. void gwait() override { input(fd_); }
  119. };
  120. /** \brief An `infinibuf` that synchronously writes to a file
  121. * descriptor when the buffer overflows or is synced.
  122. *
  123. * Closes the file descriptor upon destruction. */
  124. class infinibuf_outfd : public infinibuf {
  125. const int fd_;
  126. std::function<void(bool)> oblocked_;
  127. public:
  128. explicit infinibuf_outfd (int fd,
  129. std::function<void(bool)> oblocked = nullptr);
  130. ~infinibuf_outfd();
  131. void notempty() override;
  132. };
  133. /** \brief Thread-safe infinibuf.
  134. *
  135. * This infinibuf can safely be used in an `iostream` by one thread,
  136. * while a different thread fills or drains the buffer (for instance
  137. * executing `infinibuf::output_loop` or `infinibuf::input_loop`).
  138. */
  139. class infinibuf_mt : public infinibuf {
  140. std::mutex m_;
  141. std::condition_variable cv_;
  142. std::condition_variable flow_ctrl_cv_;
  143. std::size_t max_buf_size_{0};
  144. public:
  145. explicit infinibuf_mt (int sp = default_startpos_) : infinibuf(sp) {}
  146. void lock() override { m_.lock(); }
  147. void unlock() override { m_.unlock(); }
  148. void notempty() override { cv_.notify_all(); }
  149. void notfull() override { flow_ctrl_cv_.notify_all(); }
  150. void set_max_buf_size(std::size_t val) {
  151. std::lock_guard<infinibuf> _lk(*this);
  152. if (!val || val > max_buf_size_)
  153. notfull();
  154. max_buf_size_ = val;
  155. }
  156. void gwait() override {
  157. if (empty() && !eof()) {
  158. std::unique_lock<std::mutex> ul (m_, std::adopt_lock);
  159. while (empty() && !eof())
  160. cv_.wait(ul);
  161. ul.release();
  162. }
  163. }
  164. void pwait() override {
  165. if (max_buf_size_ && buffer_size() > max_buf_size_) {
  166. if (max_buf_size_ && buffer_size() > max_buf_size_) {
  167. std::unique_lock<std::mutex> ul (m_, std::adopt_lock);
  168. flow_ctrl_cv_.wait(ul);
  169. ul.release();
  170. }
  171. }
  172. }
  173. };
  174. /** \brief `infinibuf`-based `streambuf`.
  175. *
  176. * This streambuf can make use of any buffer type derived from
  177. * `infinibuf`. The `infinibuf` is always converted to a
  178. * `shared_ptr`, even if it is passed in as a raw `infinibuf*`.
  179. */
  180. class infinistreambuf : public std::streambuf {
  181. protected:
  182. std::shared_ptr<infinibuf> ib_;
  183. int_type underflow() override;
  184. int_type overflow(int_type ch) override;
  185. int sync() override;
  186. public:
  187. explicit infinistreambuf(std::shared_ptr<infinibuf> ib);
  188. explicit infinistreambuf(infinibuf *ib)
  189. : infinistreambuf(std::shared_ptr<infinibuf>(ib)) {}
  190. infinistreambuf(infinistreambuf &&isb)
  191. : infinistreambuf(isb.ib_) {}
  192. std::shared_ptr<infinibuf> get_infinibuf() { return ib_; }
  193. void sputeof();
  194. };
  195. class ifdstream : public std::istream {
  196. infinistreambuf isb_;
  197. public:
  198. ifdstream(int fd)
  199. : std::istream (nullptr), isb_ (new infinibuf_infd(fd)) {
  200. init(&isb_);
  201. }
  202. ~ifdstream() {
  203. std::lock_guard<infinibuf> _lk (*isb_.get_infinibuf());
  204. isb_.get_infinibuf()->err(EPIPE);
  205. }
  206. };
  207. class ofdstream : public std::ostream {
  208. infinistreambuf isb_;
  209. public:
  210. ofdstream(int fd, std::function<void(bool)> oblocked = nullptr)
  211. : std::ostream (nullptr), isb_(new infinibuf_outfd(fd, oblocked)) {
  212. init(&isb_);
  213. }
  214. ~ofdstream() {
  215. if (std::uncaught_exception())
  216. try { isb_.sputeof(); } catch(...) {}
  217. else
  218. isb_.sputeof();
  219. }
  220. };
  221. /** \brief std::istream from file descriptor with unbounded buffer.
  222. *
  223. * Continously reads from and buffers input from a file descriptor in
  224. * another thread. Closes the file descriptor after receiving EOF.
  225. * Kill the input thread if any further input is received, but the
  226. * input thread could get stuck if no input and no EOF happens.
  227. * Maximum buffer size defaults to infinity but can be adjusted with
  228. * `ifdinfinistream::set_max_buf_size`.
  229. */
  230. class ifdinfinistream : public std::istream {
  231. std::shared_ptr<infinibuf_mt> ib_ { new infinibuf_mt() };
  232. infinistreambuf isb_ { ib_ };
  233. public:
  234. explicit ifdinfinistream (int fd, std::size_t size = 0)
  235. : std::istream (nullptr) {
  236. set_max_buf_size(size);
  237. std::thread t (infinibuf::input_loop, isb_.get_infinibuf(), fd);
  238. t.detach();
  239. init(&isb_);
  240. }
  241. /** Sets maximum buffer size, above which it will stop reading from
  242. * the file descriptor until more is consumed locally.
  243. *
  244. * A value of 0 means no maximum buffer size. */
  245. void set_max_buf_size(std::size_t size) { ib_->set_max_buf_size(size); }
  246. ~ifdinfinistream() {
  247. std::lock_guard<infinibuf> _lk (*isb_.get_infinibuf());
  248. // Sadly, there appears to be no portable way of waking up the
  249. // thread waiting in read. I tried using dup2 to replace the file
  250. // descriptor with /dev/null, or using fcntl to set the O_NONBLOCK
  251. // flag after the read has already started, and neither works on
  252. // linux. What does work is setting an empty function (not
  253. // SIG_IGN) as the signal handler on SIGCONT, then setting
  254. // O_NONBLOCK on the file descriptor, and finally calling
  255. // pthread_kill(t.native_handle(), SIGCONT)--but that could have
  256. // unintended consequences on other parts of the program following
  257. // a Ctrl-Z. The only truly clean solution is to use a
  258. // "self-pipe" to wake up a poll call, thereby using three file
  259. // descriptors for the job of one (yuck). Since we don't really
  260. // need to clean up the file descriptor, I'm not going to add the
  261. // complexity and cost of polling a second "self-pipe" file
  262. // descriptor or dropping down to native_handle.
  263. isb_.get_infinibuf()->err(EPIPE);
  264. }
  265. };
  266. #if 0
  267. /** \brief `ostream` from file descriptor with unbounded buffer.
  268. *
  269. * Buffers unbounded amounts of data which are drained to a file
  270. * descriptor in another thread. The file descriptor is closed when
  271. * the draining thread exits. The class destructor waits for the
  272. * writer thread to flush the buffer and exit.
  273. */
  274. class ofdinfinistream : public std::ostream {
  275. infinistreambuf isb_ { new infinibuf_mt(0) };
  276. std::thread t_;
  277. public:
  278. ofdinfinistream (int fd) {
  279. std::thread t (infinibuf::output_loop, isb_.get_infinibuf(), fd, nullptr);
  280. t_ = std::move(t);
  281. rdbuf(&isb_);
  282. }
  283. // Doesn't work because std::ostream's virtual destructor is noexcept.
  284. ~ofdinfinistream() noexcept(false) {
  285. isb_.sputeof();
  286. if (!std::uncaught_exception()) {
  287. t_.join();
  288. std::lock_guard<infinibuf> lk (*isb_.get_infinibuf());
  289. if (isb_.get_infinibuf()->err())
  290. throw std::runtime_error (std::string("~ofdinfinistream: ") +
  291. strerror(isb_.get_infinibuf()->err()));
  292. }
  293. }
  294. };
  295. #endif
  296. #endif /* !_INFINIBUF_H_ */