Synchronize notmuch mail across machines
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

infinibuf.cc 5.8KB


  1. #include <array>
  2. #include <cassert>
  3. #include <cstring>
  4. #include <iostream>
  5. #include <functional>
  6. #include <streambuf>
  7. #include <fcntl.h>
  8. #include <unistd.h>
  9. #include <poll.h>
  10. #include <sys/socket.h>
  11. #include "infinibuf.h"
  12. #include <thread>
  13. using namespace std;
  14. infinibuf::~infinibuf()
  15. {
  16. for (char *p : data_) delete[] p;
  17. }
  18. void
  19. infinibuf::gbump(int n)
  20. {
  21. gpos_ += n;
  22. assert (gpos_ > 0 && gpos_ <= chunksize_);
  23. if (gpos_ == chunksize_) {
  24. assert (data_.size() > 1);
  25. delete[] data_.front();
  26. data_.pop_front();
  27. gpos_ = startpos_;
  28. notfull();
  29. }
  30. }
  31. void
  32. infinibuf::pbump(int n)
  33. {
  34. if (n == 0)
  35. return;
  36. assert (n >= 0);
  37. assert (n <= psize());
  38. assert (!eof_);
  39. bool wasempty (empty());
  40. ppos_ += n;
  41. if (ppos_ == chunksize_) {
  42. char *chunk = new char[chunksize_];
  43. memcpy(chunk, data_.back() + chunksize_ - startpos_, startpos_);
  44. data_.push_back(chunk);
  45. ppos_ = startpos_;
  46. }
  47. if (wasempty)
  48. notempty();
  49. }
  50. static int
  51. set_nonblock(int fd)
  52. {
  53. int n;
  54. if ((n = fcntl (fd, F_GETFL)) == -1
  55. || fcntl (fd, F_SETFL, n | O_NONBLOCK) == -1)
  56. return -1;
  57. return 0;
  58. }
  59. static void
  60. waitfd(int fd, int events)
  61. {
  62. struct pollfd pfd;
  63. pfd.fd = fd;
  64. pfd.events = events;
  65. poll(&pfd, 1, -1);
  66. }
  67. int
  68. infinibuf::output(int fd)
  69. {
  70. unique_lock<infinibuf> lk (*this);
  71. for (;;) {
  72. char *p = gptr();
  73. size_t nmax = gsize();
  74. bool iseof = eof();
  75. int error = err();
  76. if (error)
  77. throw runtime_error (string("infinibuf::output: ") + strerror(error));
  78. else if (!nmax && iseof) {
  79. assert (empty());
  80. shutdown(fd, SHUT_WR);
  81. return 0;
  82. }
  83. if (!nmax)
  84. return 1;
  85. lk.unlock();
  86. ssize_t n = write(fd, p, nmax);
  87. lk.lock();
  88. if (n > 0)
  89. gbump(n);
  90. else {
  91. if (errno == EAGAIN)
  92. return -1;
  93. err(errno);
  94. }
  95. }
  96. }
  97. int
  98. infinibuf::input(int fd)
  99. {
  100. unique_lock<infinibuf> lk (*this);
  101. char *p = pptr();
  102. size_t nmax = psize();
  103. if (int error = err())
  104. throw runtime_error (string("infinibuf::input: ") + strerror(error));
  105. lk.unlock();
  106. ssize_t n = read(fd, p, nmax);
  107. lk.lock();
  108. if (n < 0) {
  109. if (errno == EAGAIN)
  110. return -1;
  111. err(errno);
  112. throw runtime_error (string("infinibuf::input: ") + strerror(errno));
  113. }
  114. if (n > 0)
  115. pbump(n);
  116. else
  117. peof();
  118. return n > 0;
  119. }
  120. struct fd_closer {
  121. int fd_;
  122. fd_closer(int fd) : fd_(fd) {}
  123. ~fd_closer() { close(fd_); }
  124. };
  125. void
  126. infinibuf::output_loop(shared_ptr<infinibuf> ib, int fd,
  127. std::function<void(bool)> oblocked)
  128. {
  129. fd_closer _c(fd);
  130. if (oblocked)
  131. set_nonblock(fd);
  132. try {
  133. for (;;) {
  134. int res = ib->output(fd);
  135. if (res > 0) {
  136. lock_guard<infinibuf> _lk (*ib);
  137. ib->gwait();
  138. }
  139. else if (res == 0)
  140. return;
  141. else { // EINTR
  142. if (oblocked)
  143. oblocked(true);
  144. waitfd(fd, POLLOUT);
  145. if (oblocked)
  146. oblocked(false);
  147. }
  148. }
  149. } catch (const runtime_error &) {}
  150. }
  151. void
  152. infinibuf::input_loop(shared_ptr<infinibuf> ib, int fd)
  153. {
  154. fd_closer _c(fd);
  155. try {
  156. for (;;) {
  157. int res = ib->input(fd);
  158. if (res < 0)
  159. waitfd(fd, POLLIN);
  160. else if (res == 0)
  161. return;
  162. // Don't even bother checking flow control if less than 1MB allocated
  163. lock_guard<infinibuf> lk (*ib);
  164. if (ib->buffer_size() >= 100000)
  165. ib->pwait();
  166. }
  167. } catch (const runtime_error &) {}
  168. }
  169. infinibuf_infd::~infinibuf_infd()
  170. {
  171. close(fd_);
  172. }
  173. infinibuf_outfd::infinibuf_outfd (int fd, std::function<void(bool)> oblocked)
  174. : infinibuf(0), fd_(fd), oblocked_(oblocked) {
  175. if (oblocked_)
  176. set_nonblock(fd_);
  177. }
  178. infinibuf_outfd::~infinibuf_outfd()
  179. {
  180. close(fd_);
  181. }
  182. void
  183. infinibuf_outfd::notempty()
  184. {
  185. while (output(fd_) < 0) { // EINTR
  186. if (oblocked_)
  187. oblocked_(true);
  188. waitfd(fd_, POLLOUT);
  189. if (oblocked_)
  190. oblocked_(false);
  191. }
  192. }
  193. infinistreambuf::int_type
  194. infinistreambuf::underflow()
  195. {
  196. lock_guard<infinibuf> _lk (*ib_);
  197. ib_->gbump(gptr() - ib_->gptr());
  198. while (ib_->gsize() == 0 && !ib_->eof())
  199. ib_->gwait();
  200. setg(ib_->eback(), ib_->gptr(), ib_->egptr());
  201. bool eof = ib_->eof() && ib_->gsize() == 0;
  202. return eof ? traits_type::eof() : traits_type::to_int_type (*gptr());
  203. }
  204. infinistreambuf::int_type
  205. infinistreambuf::overflow(int_type ch)
  206. {
  207. if (sync() == -1)
  208. return traits_type::eof();
  209. *pptr() = ch;
  210. pbump(1);
  211. return traits_type::not_eof(ch);
  212. }
  213. int
  214. infinistreambuf::sync()
  215. {
  216. lock_guard<infinibuf> _lk (*ib_);
  217. ib_->pbump(pptr() - ib_->pptr());
  218. setp(ib_->pptr(), ib_->epptr());
  219. int err = ib_->err();
  220. return err ? -1 : 0;
  221. }
  222. infinistreambuf::infinistreambuf(shared_ptr<infinibuf> ib)
  223. : ib_(ib)
  224. {
  225. lock_guard<infinibuf> _lk (*ib_);
  226. setg(ib_->eback(), ib_->gptr(), ib_->egptr());
  227. setp(ib_->pptr(), ib_->epptr());
  228. }
  229. void
  230. infinistreambuf::sputeof()
  231. {
  232. sync();
  233. lock_guard<infinibuf> _lk (*ib_);
  234. ib_->peof();
  235. }
  236. #if 0
  237. int
  238. main (int argc, char **argv)
  239. {
  240. infinistreambuf inb (new infinibuf_mt);
  241. istream xin (&inb);
  242. thread it (infinibuf::input_loop, inb.get_infinibuf(), 0);
  243. infinistreambuf outb (new infinibuf_mt);
  244. ostream xout (&outb);
  245. thread ot (infinibuf::output_loop, outb.get_infinibuf(), 1);
  246. xin.tie (&xout);
  247. #if 0
  248. char c;
  249. long count = 0;
  250. while (xin.get (c)) {
  251. count++;
  252. xout.put (c);
  253. }
  254. cerr << "flushing " << count << " bytes\n";
  255. xout.flush();
  256. #endif
  257. xout << xin.rdbuf() << flush;
  258. /*
  259. xout << "waiting for input\n";
  260. string x;
  261. xin >> x;
  262. xout << "got " << x << "\n" << flush;
  263. */
  264. auto oib = outb.get_infinibuf();
  265. oib->lock();
  266. oib->peof();
  267. oib->unlock();
  268. ot.join();
  269. it.join();
  270. return 0;
  271. }
  272. #endif
  273. #if 0
  274. int
  275. main (int argc, char **argv)
  276. {
  277. ifdstream xin (0);
  278. ofdstream xout (1);
  279. xin.tie(&xout);
  280. //xout << xin.rdbuf();
  281. #if 1
  282. long count = 0;
  283. char c;
  284. while (xin.get (c)) {
  285. xout.put (c);
  286. count++;
  287. }
  288. cerr << "Total count " << count << '\n';
  289. #endif
  290. xout << flush;
  291. }
  292. #endif
  293. /*
  294. c++ -g -std=c++11 -Wall -Werror -pthread infinibuf.cc
  295. */