Synchronize notmuch mail across machines
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

xapian_sync.cc 20KB


  1. #include <cstring>
  2. #include <functional>
  3. #include <iomanip>
  4. #include <iostream>
  5. #include <sstream>
  6. #include <unistd.h>
  7. #include <dirent.h>
  8. #include <fcntl.h>
  9. #include <sys/stat.h>
  10. #include <sys/types.h>
  11. #include <xapian.h>
  12. #include "muchsync.h"
  13. #include "misc.h"
  14. using namespace std;
  15. // XXX - these things have to match notmuch-private.h
  16. constexpr int NOTMUCH_VALUE_TIMESTAMP = 0;
  17. constexpr int NOTMUCH_VALUE_MESSAGE_ID = 1;
  18. const string notmuch_ghost_term = "Tghost";
  19. const string notmuch_tag_prefix = "K";
  20. const string notmuch_directory_prefix = "XDIRECTORY";
  21. const string notmuch_file_direntry_prefix = "XFDIRENTRY";
  22. static void
  23. drop_triggers(sqlite3 *db)
  24. {
  25. for (const char *trigger
  26. : { "tag_delete", "tag_insert", "link_delete", "link_insert" })
  27. sqlexec (db, "DROP TRIGGER IF EXISTS %s;", trigger);
  28. for (const char *table
  29. : { "modified_docids", "modified_xapian_dirs", "modified_hashes" })
  30. sqlexec(db, "DROP TABLE IF EXISTS %s;", table);
  31. }
  32. static void
  33. set_triggers(sqlite3 *db)
  34. {
  35. drop_triggers (db);
  36. sqlexec(db, R"(
  37. CREATE TEMP TABLE IF NOT EXISTS modified_docids (
  38. docid INTEGER PRIMARY KEY,
  39. new INTEGER);
  40. CREATE TEMP TRIGGER tag_delete AFTER DELETE ON main.tags
  41. WHEN old.docid NOT IN (SELECT docid FROM modified_docids)
  42. BEGIN INSERT INTO modified_docids (docid, new) VALUES (old.docid, 0); END;
  43. CREATE TEMP TRIGGER tag_insert AFTER INSERT ON main.tags
  44. WHEN new.docid NOT IN (SELECT docid FROM modified_docids)
  45. BEGIN INSERT INTO modified_docids (docid, new) VALUES (new.docid, 0); END;
  46. CREATE TEMP TABLE IF NOT EXISTS modified_xapian_dirs (
  47. dir_docid INTEGER PRIMARY KEY);
  48. CREATE TEMP TABLE IF NOT EXISTS modified_hashes (hash_id INTEGER PRIMARY KEY);
  49. CREATE TEMP TRIGGER link_delete AFTER DELETE ON xapian_files
  50. WHEN old.hash_id NOT IN (SELECT hash_id FROM modified_hashes)
  51. BEGIN INSERT INTO modified_hashes (hash_id) VALUES (old.hash_id); END;
  52. CREATE TEMP TRIGGER link_insert AFTER INSERT ON xapian_files
  53. WHEN new.hash_id NOT IN (SELECT hash_id FROM modified_hashes)
  54. BEGIN INSERT INTO modified_hashes (hash_id) VALUES (new.hash_id); END;
  55. )");
  56. }
  57. // Non-thread-safe unility to work around missing openat & friends.
  58. template<typename R> R
  59. with_cwd(int dfd, R errval, function<R()> work)
  60. {
  61. int dot = open(".", O_RDONLY);
  62. if (dot < 0 || fchdir(dfd) < 0)
  63. return errval;
  64. cleanup _c ([dot]() { fchdir(dot); close(dot); });
  65. return work();
  66. }
  67. #if !HAVE_OPENAT
  68. #define openat fake_openat
  69. static int
  70. openat(int dfd, const char *entry, int mode)
  71. {
  72. return with_cwd<int>(dfd, -1, [=]() { return open(entry, mode); });
  73. }
  74. #define fstatat fake_fstatat
  75. static int
  76. fstatat(int dfd, const char *entry, struct stat *buf, int flag)
  77. {
  78. return with_cwd<int>(dfd, -1, [=]() { return stat(entry, buf); });
  79. }
  80. #endif // !HAVE_OPENAT
  81. #if !HAVE_FDOPENDIR
  82. #define fdopendir fake_fdopendir
  83. static DIR *
  84. fdopendir(int dfd)
  85. {
  86. return with_cwd<DIR *>(dfd, nullptr, []() { return opendir("."); });
  87. }
  88. #endif // !HAVE_FDOPENDIR
  89. static string
  90. get_sha (int dfd, const char *direntry, i64 *sizep)
  91. {
  92. int fd = openat(dfd, direntry, O_RDONLY);
  93. if (fd < 0)
  94. throw runtime_error (string() + direntry + ": " + strerror (errno));
  95. cleanup _c (close, fd);
  96. hash_ctx ctx;
  97. char buf[32768];
  98. int n;
  99. i64 sz = 0;
  100. while ((n = read (fd, buf, sizeof (buf))) > 0) {
  101. ctx.update (buf, n);
  102. sz += n;
  103. }
  104. if (n < 0)
  105. throw runtime_error (string() + direntry + ": " + strerror (errno));
  106. if (sizep)
  107. *sizep = sz;
  108. return ctx.final();
  109. }
  110. template<typename T> void
  111. sync_table (sqlstmt_t &s, T &t, T &te,
  112. function<int(sqlstmt_t &s, T &t)> cmpfn,
  113. function<void(sqlstmt_t *s, T *t)> update)
  114. {
  115. s.step();
  116. while (s.row()) {
  117. int cmp {t == te ? -1 : cmpfn (s, t)};
  118. if (cmp == 0) {
  119. update (&s, &t);
  120. s.step();
  121. ++t;
  122. }
  123. else if (cmp < 0) {
  124. update (&s, nullptr);
  125. s.step();
  126. }
  127. else {
  128. update (nullptr, &t);
  129. ++t;
  130. }
  131. }
  132. while (t != te) {
  133. update (nullptr, &t);
  134. ++t;
  135. }
  136. }
  137. static string
  138. tag_from_term (const string &term)
  139. {
  140. assert(!strncmp(term.c_str(), notmuch_tag_prefix.c_str(),
  141. notmuch_tag_prefix.length()));
  142. return term.substr(notmuch_tag_prefix.length());
  143. }
  144. static void
  145. xapian_scan_tags (sqlite3 *sqldb, Xapian::Database &xdb, const writestamp &ws)
  146. {
  147. sqlexec(sqldb, "DROP TABLE IF EXISTS dead_tags; "
  148. "CREATE TEMP TABLE dead_tags (tag TEXT PRIMARY KEY); "
  149. "INSERT INTO dead_tags SELECT DISTINCT tag FROM tags;");
  150. sqlstmt_t
  151. scan (sqldb, "SELECT docid, rowid FROM tags"
  152. " WHERE tag = ? ORDER BY docid ASC;"),
  153. add_tag (sqldb, "INSERT INTO tags (docid, tag) VALUES (?, ?);"),
  154. del_tag (sqldb, "DELETE FROM tags WHERE rowid = ?;"),
  155. record_tag (sqldb, "DELETE FROM dead_tags WHERE tag = ?;");
  156. for (Xapian::TermIterator ti = xdb.allterms_begin(notmuch_tag_prefix),
  157. te = xdb.allterms_end(notmuch_tag_prefix); ti != te; ti++) {
  158. string tag = tag_from_term (*ti);
  159. if (opt_verbose > 1)
  160. cerr << " " << tag << "\n";
  161. record_tag.reset().param(tag).step();
  162. scan.reset().bind_text(1, tag);
  163. add_tag.reset().bind_text(2, tag);
  164. Xapian::PostingIterator pi = xdb.postlist_begin (*ti),
  165. pe = xdb.postlist_end (*ti);
  166. sync_table<Xapian::PostingIterator>
  167. (scan, pi, pe,
  168. [] (sqlstmt_t &s, Xapian::PostingIterator &p) -> int {
  169. return s.integer(0) - *p;
  170. },
  171. [&] (sqlstmt_t *sp, Xapian::PostingIterator *pp) {
  172. if (!sp)
  173. add_tag.reset().bind_int(1, **pp).step();
  174. else if (!pp)
  175. del_tag.reset().bind_value(1, sp->value(1)).step();
  176. });
  177. }
  178. sqlexec(sqldb, "DELETE FROM tags WHERE tag IN (SELECT * FROM dead_tags);");
  179. sqlexec(sqldb, "UPDATE message_ids SET replica = %lld, version = %lld"
  180. " WHERE docid IN (SELECT docid FROM modified_docids WHERE new = 0);",
  181. ws.first, ws.second);
  182. }
  183. static void
  184. xapian_scan_message_ids (sqlite3 *sqldb, const writestamp &ws,
  185. Xapian::Database xdb)
  186. {
  187. sqlstmt_t
  188. scan(sqldb,
  189. "SELECT message_id, docid FROM message_ids ORDER BY docid ASC;"),
  190. add_message(sqldb,
  191. "INSERT INTO message_ids (message_id, docid, replica, version)"
  192. " VALUES (?, ?, %lld, %lld);", ws.first, ws.second),
  193. flag_new_message(sqldb, "INSERT INTO modified_docids (docid, new)"
  194. " VALUES (?, 1);"),
  195. del_message(sqldb, "DELETE FROM message_ids WHERE docid = ?;");
  196. Xapian::PostingIterator
  197. gi = xdb.postlist_begin(notmuch_ghost_term),
  198. ge = xdb.postlist_end(notmuch_ghost_term);
  199. Xapian::ValueIterator
  200. vi = xdb.valuestream_begin (NOTMUCH_VALUE_MESSAGE_ID),
  201. ve = xdb.valuestream_end (NOTMUCH_VALUE_MESSAGE_ID);
  202. sync_table<Xapian::ValueIterator>
  203. (scan, vi, ve,
  204. [] (sqlstmt_t &s, Xapian::ValueIterator &vi) -> int {
  205. return s.integer(1) - vi.get_docid();
  206. },
  207. [&add_message,&del_message,&flag_new_message,&gi,&ge,&ve]
  208. (sqlstmt_t *sp, Xapian::ValueIterator *vip) {
  209. if (vip) {
  210. while (gi != ge && *gi < vip->get_docid())
  211. ++gi;
  212. if (gi != ge && *gi == vip->get_docid()) {
  213. if (!sp)
  214. return;
  215. vip = nullptr;
  216. }
  217. }
  218. if (!sp) {
  219. i64 docid = vip->get_docid();
  220. add_message.reset().param(**vip, docid).step();
  221. flag_new_message.reset().param(docid).step();
  222. }
  223. else if (!vip)
  224. del_message.reset().param(sp->value(1)).step();
  225. else if (sp->str(0) != **vip) {
  226. // This should be really unusual
  227. cerr << "warning: message id changed from <"
  228. << sp->str(0) << "> to <" << **vip << ">\n";
  229. del_message.reset().param(sp->value(1)).step();
  230. add_message.reset().param(**vip, i64(vip->get_docid())).step();
  231. }
  232. });
  233. }
  234. static Xapian::docid
  235. xapian_get_unique_posting (const Xapian::Database &xdb, const string &term)
  236. {
  237. Xapian::PostingIterator pi = xdb.postlist_begin (term),
  238. pe = xdb.postlist_end (term);
  239. if (pi == pe)
  240. throw range_error (string("xapian term ") + term + " has no postings");
  241. i64 ret = *pi;
  242. if (++pi != pe)
  243. cerr << "warning: xapian term " << term << " has multiple postings\n";
  244. return ret;
  245. }
  246. static void
  247. xapian_scan_directories (sqlite3 *sqldb, Xapian::Database &xdb)
  248. {
  249. sqlstmt_t
  250. scandirs(sqldb, "SELECT dir_path, dir_docid, dir_mtime FROM xapian_dirs"
  251. " ORDER BY dir_path;"),
  252. deldir(sqldb, "DELETE FROM xapian_dirs WHERE dir_docid = ?;"),
  253. delfiles(sqldb, "DELETE FROM xapian_files WHERE dir_docid = ?;"),
  254. adddir(sqldb, "INSERT INTO xapian_dirs (dir_path, dir_docid, dir_mtime)"
  255. " VALUES (?, ?, ?);"),
  256. upddir(sqldb, "UPDATE xapian_dirs SET dir_mtime = ? WHERE dir_docid = ?;"),
  257. flagdir(sqldb, "INSERT INTO modified_xapian_dirs (dir_docid) VALUES (?);");
  258. Xapian::TermIterator
  259. ti = xdb.allterms_begin(notmuch_directory_prefix),
  260. te = xdb.allterms_end(notmuch_directory_prefix);
  261. scandirs.step();
  262. while (ti != te || scandirs.row()) {
  263. int d; // >0 if only sqlite valid, <0 if only xapian valid
  264. string dir;
  265. if (!scandirs.row()) {
  266. dir = (*ti).substr(notmuch_directory_prefix.length());
  267. d = -1;
  268. }
  269. else if (ti == te)
  270. d = 1;
  271. else {
  272. dir = (*ti).substr(notmuch_directory_prefix.length());
  273. d = dir.compare(scandirs.c_str(0));
  274. }
  275. if (d > 0) {
  276. deldir.reset().param(scandirs.value(1)).step();
  277. delfiles.reset().param(scandirs.value(1)).step();
  278. scandirs.step();
  279. continue;
  280. }
  281. if (dir.empty())
  282. dir = ".";
  283. Xapian::docid dir_docid = xapian_get_unique_posting(xdb, *ti);
  284. if (d == 0 && dir_docid != scandirs.integer(1)) {
  285. deldir.reset().param(scandirs.value(1)).step();
  286. delfiles.reset().param(scandirs.value(1)).step();
  287. scandirs.step();
  288. continue;
  289. }
  290. time_t mtime = Xapian::sortable_unserialise
  291. (xdb.get_document(dir_docid).get_value(NOTMUCH_VALUE_TIMESTAMP));
  292. if (d < 0) {
  293. deldir.reset().param(i64(dir_docid)).step();
  294. delfiles.reset().param(i64(dir_docid)).step();
  295. adddir.reset().param(dir, i64(dir_docid), i64(mtime)).step();
  296. flagdir.reset().param(i64(dir_docid)).step();
  297. ++ti;
  298. continue;
  299. }
  300. if (mtime != scandirs.integer(2)) {
  301. flagdir.reset().param(i64(dir_docid)).step();
  302. upddir.reset().param(i64(mtime), i64(dir_docid)).step();
  303. }
  304. ++ti;
  305. scandirs.step();
  306. }
  307. }
  308. class fileops {
  309. public:
  310. sqlstmt_t scan_dir_;
  311. private:
  312. sqlstmt_t get_msgid_;
  313. sqlstmt_t del_file_;
  314. sqlstmt_t add_file_;
  315. sqlstmt_t upd_file_;
  316. sqlstmt_t get_hashid_;
  317. sqlstmt_t get_hash_;
  318. sqlstmt_t add_hash_;
  319. sqlstmt_t upd_hash_;
  320. string get_msgid(i64 docid);
  321. i64 get_file_hash_id(int dfd, const string &file, i64 docid);
  322. public:
  323. fileops(sqlite3 *db, const writestamp &ws);
  324. void del_file(i64 rowid) { del_file_.reset().param(rowid).step(); }
  325. void add_file(const string &dir, int dfd, i64 dir_docid,
  326. string name, i64 docid);
  327. void check_file(const string &dir, int dfd, i64 dir_docid);
  328. };
  329. fileops::fileops(sqlite3 *db, const writestamp &ws)
  330. : scan_dir_(db, "SELECT rowid, name, docid%s"
  331. " FROM xapian_files WHERE dir_docid = ? ORDER BY name;",
  332. opt_fullscan ? ", mtime, inode, hash_id" : ""),
  333. get_msgid_(db, "SELECT message_id FROM message_ids WHERE docid = ?;"),
  334. del_file_(db, "DELETE FROM xapian_files WHERE rowid = ?;"),
  335. add_file_(db, "INSERT INTO xapian_files"
  336. " (dir_docid, name, docid, mtime, inode, hash_id)"
  337. " VALUES (?, ?, ?, ?, ?, ?);"),
  338. upd_file_(db, "UPDATE xapian_files SET mtime = ?, inode = ?"
  339. " WHERE rowid = ?;"),
  340. get_hashid_(db, opt_fullscan
  341. ? "SELECT hash_id, size, message_id FROM maildir_hashes"
  342. " WHERE hash = ?;"
  343. : "SELECT hash_id FROM maildir_hashes WHERE hash = ?;"),
  344. get_hash_(db, "SELECT hash, size FROM maildir_hashes WHERE hash_id = ?;"),
  345. add_hash_(db, "INSERT OR REPLACE INTO maildir_hashes "
  346. " (hash, size, message_id, replica, version)"
  347. " VALUES (?, ?, ?, %lld, %lld);", ws.first, ws.second),
  348. upd_hash_(db, "UPDATE maildir_hashes SET size = ?, message_id = ?"
  349. " WHERE hash_id = ?;",
  350. ws.first, ws.second)
  351. {
  352. }
  353. string
  354. fileops::get_msgid(i64 docid)
  355. {
  356. get_msgid_.reset().param(docid).step();
  357. if (!get_msgid_.row())
  358. throw runtime_error ("xapian_fileops: unknown docid " + to_string(docid));
  359. return get_msgid_.str(0);
  360. }
  361. i64
  362. fileops::get_file_hash_id(int dfd, const string &name, i64 docid)
  363. {
  364. i64 sz;
  365. if (opt_verbose > 2)
  366. cerr << " " << name << '\n';
  367. string hash = get_sha(dfd, name.c_str(), &sz);
  368. if (get_hashid_.reset().param(hash).step().row()) {
  369. i64 hash_id = get_hashid_.integer(0);
  370. if (!opt_fullscan)
  371. return hash_id;
  372. string msgid = get_msgid(docid);
  373. if (sz == get_hashid_.integer(1) && msgid == get_hashid_.str(2))
  374. return hash_id;
  375. // This should almost never happen
  376. cerr << "size or message-id changed for hash " << hash << '\n';
  377. upd_hash_.reset().param(sz, msgid, hash_id).step();
  378. return hash_id;
  379. }
  380. add_hash_.reset().param(hash, sz, get_msgid(docid)).step();
  381. return sqlite3_last_insert_rowid(add_hash_.getdb());
  382. }
  383. void
  384. fileops::add_file(const string &dir, int dfd, i64 dir_docid,
  385. string name, i64 docid)
  386. {
  387. struct stat sb;
  388. if (fstatat(dfd, name.c_str(), &sb, 0)) {
  389. if (errno == ENOENT)
  390. return;
  391. throw runtime_error (dir + ": " + strerror(errno));
  392. }
  393. if (!S_ISREG(sb.st_mode))
  394. return;
  395. i64 hash_id = get_file_hash_id(dfd, name, docid);
  396. add_file_.reset()
  397. .param(dir_docid, name, docid, ts_to_double(sb.ST_MTIM),
  398. i64(sb.st_ino), hash_id).step();
  399. }
  400. void
  401. fileops::check_file(const string &dir, int dfd, i64 dir_docid)
  402. {
  403. if (!opt_fullscan)
  404. return;
  405. string name = scan_dir_.str(1);
  406. struct stat sb;
  407. if (fstatat(dfd, name.c_str(), &sb, 0)) {
  408. if (errno == ENOENT)
  409. return;
  410. throw runtime_error (dir + ": " + strerror(errno));
  411. }
  412. if (!S_ISREG(sb.st_mode))
  413. return;
  414. double fs_mtim = ts_to_double(sb.ST_MTIM);
  415. i64 fs_inode = sb.st_ino, fs_size = sb.st_size;
  416. double db_mtim = scan_dir_.real(3);
  417. i64 db_inode = scan_dir_.integer(4);
  418. i64 db_hashid = scan_dir_.integer(5);
  419. if (!get_hash_.reset().param(db_hashid).step().row())
  420. throw runtime_error ("invalid hash_id: " + to_string(db_hashid));
  421. i64 db_size = get_hash_.integer(1);
  422. if (fs_mtim == db_mtim && fs_inode == db_inode && fs_size == db_size)
  423. return;
  424. i64 rowid = scan_dir_.integer(0), docid = scan_dir_.integer(2);
  425. i64 fs_hashid = get_file_hash_id(dfd, name, docid);
  426. if (db_hashid == fs_hashid)
  427. upd_file_.reset().param(fs_mtim, fs_inode, rowid).step();
  428. else {
  429. del_file_.reset().param(rowid).step();
  430. add_file_.reset().param(dir_docid, name, docid, fs_mtim, fs_inode,
  431. fs_hashid);
  432. }
  433. }
  434. static void
  435. xapian_scan_filenames (sqlite3 *db, const string &maildir,
  436. const writestamp &ws, Xapian::Database xdb)
  437. {
  438. sqlstmt_t dirscan (db, "SELECT dir_path, dir_docid FROM xapian_dirs%s;",
  439. opt_fullscan ? ""
  440. : " NATURAL JOIN modified_xapian_dirs");
  441. fileops f (db, ws);
  442. while (dirscan.step().row()) {
  443. string dir = dirscan.str(0);
  444. if (opt_verbose > 1)
  445. cerr << " " << dir << '\n';
  446. string dirpath = maildir + "/" + dir;
  447. int dfd = open(dirpath.c_str(), O_RDONLY);
  448. if (dfd == -1 && errno != ENOENT) {
  449. cerr << dirpath << ": " << strerror (errno) << '\n';
  450. continue;
  451. }
  452. cleanup _close (close, dfd);
  453. i64 dir_docid = dirscan.integer(1);
  454. f.scan_dir_.reset().param(dir_docid).step();
  455. string dirtermprefix = (notmuch_file_direntry_prefix
  456. + to_string (dir_docid) + ":");
  457. Xapian::TermIterator ti = xdb.allterms_begin(dirtermprefix),
  458. te = xdb.allterms_end(dirtermprefix);
  459. size_t dirtermprefixlen = dirtermprefix.size();
  460. unordered_map<string,Xapian::docid> to_add;
  461. while (f.scan_dir_.row() && ti != te) {
  462. const char *dbname = f.scan_dir_.c_str(1);
  463. string term = *ti;
  464. const char *xname = &term[dirtermprefixlen];
  465. int cmp = strcmp(dbname,xname);
  466. if (!cmp) {
  467. if (opt_fullscan)
  468. f.check_file(dir, dfd, dir_docid);
  469. f.scan_dir_.step();
  470. ++ti;
  471. }
  472. else if (cmp < 0) {
  473. f.del_file(f.scan_dir_.integer(0));
  474. f.scan_dir_.step();
  475. }
  476. else {
  477. to_add.emplace(term.substr(dirtermprefixlen),
  478. xapian_get_unique_posting(xdb, term));
  479. ++ti;
  480. }
  481. }
  482. while (f.scan_dir_.row()) {
  483. f.del_file(f.scan_dir_.integer(0));
  484. f.scan_dir_.step();
  485. }
  486. while (ti != te) {
  487. string term = *ti;
  488. to_add.emplace(term.substr(dirtermprefixlen),
  489. xapian_get_unique_posting(xdb, term));
  490. ++ti;
  491. }
  492. // With a cold buffer cache, reading files to compute hashes goes
  493. // shockingly faster in the order of directory entries.
  494. if (!to_add.empty()) {
  495. _close.release();
  496. DIR *d = fdopendir(dfd);
  497. cleanup _closedir (closedir, d);
  498. struct dirent *e;
  499. auto notfound = to_add.end();
  500. while ((e = readdir(d)) && !to_add.empty()) {
  501. string name (e->d_name);
  502. auto action = to_add.find(name);
  503. if (action != notfound) {
  504. f.add_file(dir, dfd, dir_docid, action->first, action->second);
  505. to_add.erase(action);
  506. }
  507. }
  508. }
  509. }
  510. }
  511. static void
  512. xapian_adjust_nlinks(sqlite3 *db, writestamp ws)
  513. {
  514. sqlstmt_t
  515. newcount(db, "SELECT hash_id, dir_docid, count(*)"
  516. " FROM xapian_files NATURAL JOIN modified_hashes"
  517. " GROUP BY hash_id, dir_docid ORDER BY hash_id, dir_docid;"),
  518. oldcount(db, "SELECT hash_id, dir_docid, link_count, xapian_nlinks.rowid"
  519. " FROM xapian_nlinks NATURAL JOIN modified_hashes"
  520. " ORDER BY hash_id, dir_docid;"),
  521. updcount(db, "UPDATE xapian_nlinks SET link_count = ? WHERE rowid = ?;"),
  522. delcount(db, "DELETE FROM xapian_nlinks WHERE rowid = ?;"),
  523. addcount(db, "INSERT INTO xapian_nlinks (hash_id, dir_docid, link_count)"
  524. " VALUES (?, ?, ?);"),
  525. updhash(db, "UPDATE maildir_hashes SET replica = %lld, version = %lld"
  526. " WHERE hash_id = ?;", ws.first, ws.second);
  527. newcount.step();
  528. oldcount.step();
  529. while (newcount.row() || oldcount.row()) {
  530. i64 d; // < 0 only oldcount valid, > 0 only newcount valid
  531. if (!newcount.row())
  532. d = -1;
  533. else if (!oldcount.row())
  534. d = 1;
  535. else if (!(d = oldcount.integer(0) - newcount.integer(0)))
  536. d = oldcount.integer(1) - newcount.integer(1);
  537. if (d == 0) {
  538. i64 cnt = newcount.integer(2);
  539. if (cnt != oldcount.integer(2)) {
  540. updhash.reset().param(newcount.value(0)).step();
  541. updcount.reset().param(cnt, oldcount.value(3)).step();
  542. }
  543. oldcount.step();
  544. newcount.step();
  545. }
  546. else if (d < 0) {
  547. // file deleted and (hash_id, dir_id) not present newcount
  548. if (oldcount.integer(2))
  549. updhash.reset().param(oldcount.value(0)).step();
  550. delcount.reset().param(oldcount.value(3)).step();
  551. oldcount.step();
  552. }
  553. else {
  554. // file added and (hash_id, dir_id) not present in oldcount
  555. updhash.reset().param(newcount.value(0)).step();
  556. addcount.reset().param(newcount.value(0), newcount.value(1),
  557. newcount.value(2)).step();
  558. newcount.step();
  559. }
  560. }
  561. }
  562. void
  563. xapian_scan(sqlite3 *sqldb, writestamp ws, string maildir)
  564. {
  565. while (maildir.size() > 1 && maildir.back() == '/')
  566. maildir.resize (maildir.size() - 1);
  567. if (maildir.empty())
  568. maildir = ".";
  569. print_time ("starting scan of Xapian database");
  570. Xapian::Database xdb (maildir + "/.notmuch/xapian");
  571. set_triggers(sqldb);
  572. print_time ("opened Xapian");
  573. xapian_scan_message_ids (sqldb, ws, xdb);
  574. print_time ("scanned message IDs");
  575. xapian_scan_tags (sqldb, xdb, ws);
  576. print_time ("scanned tags");
  577. xapian_scan_directories (sqldb, xdb);
  578. print_time ("scanned directories in xapian");
  579. xapian_scan_filenames (sqldb, maildir, ws, xdb);
  580. print_time ("scanned filenames in xapian");
  581. xapian_adjust_nlinks(sqldb, ws);
  582. print_time ("adjusted link counts");
  583. }
  584. void
  585. sync_local_data (sqlite3 *sqldb, const string &maildir)
  586. {
  587. print_time ("synchronizing muchsync database with Xapian");
  588. sqlexec (sqldb, "SAVEPOINT localsync;");
  589. try {
  590. i64 self = getconfig<i64>(sqldb, "self");
  591. sqlexec (sqldb, "UPDATE sync_vector"
  592. " SET version = version + 1 WHERE replica = %lld;", self);
  593. if (sqlite3_changes (sqldb) != 1)
  594. throw runtime_error ("My replica id (" + to_string (self)
  595. + ") not in sync vector");
  596. versvector vv = get_sync_vector (sqldb);
  597. i64 vers = vv.at(self);
  598. writestamp ws { self, vers };
  599. xapian_scan (sqldb, ws, maildir);
  600. }
  601. catch (exception &e) {
  602. sqlexec (sqldb, "ROLLBACK TO localsync;");
  603. throw;
  604. }
  605. sqlexec (sqldb, "RELEASE localsync;");
  606. print_time ("finished synchronizing muchsync database with Xapian");
  607. }