EventBaseTest.cpp 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/Memory.h>
  17. #include <folly/ScopeGuard.h>
  18. #include <folly/io/async/AsyncTimeout.h>
  19. #include <folly/io/async/EventBase.h>
  20. #include <folly/io/async/EventHandler.h>
  21. #include <folly/io/async/test/SocketPair.h>
  22. #include <folly/io/async/test/Util.h>
  23. #include <folly/portability/Unistd.h>
  24. #include <folly/futures/Promise.h>
  25. #include <atomic>
  26. #include <iostream>
  27. #include <memory>
  28. #include <thread>
  29. using std::atomic;
  30. using std::cerr;
  31. using std::deque;
  32. using std::endl;
  33. using std::make_pair;
  34. using std::pair;
  35. using std::thread;
  36. using std::unique_ptr;
  37. using std::vector;
  38. using std::chrono::duration_cast;
  39. using std::chrono::microseconds;
  40. using std::chrono::milliseconds;
  41. using namespace std::chrono_literals;
  42. using namespace folly;
  43. ///////////////////////////////////////////////////////////////////////////
  44. // Tests for read and write events
  45. ///////////////////////////////////////////////////////////////////////////
  46. enum { BUF_SIZE = 4096 };
  47. ssize_t writeToFD(int fd, size_t length) {
  48. // write an arbitrary amount of data to the fd
  49. auto bufv = vector<char>(length);
  50. auto buf = bufv.data();
  51. memset(buf, 'a', length);
  52. ssize_t rc = write(fd, buf, length);
  53. CHECK_EQ(rc, length);
  54. return rc;
  55. }
  56. size_t writeUntilFull(int fd) {
  57. // Write to the fd until EAGAIN is returned
  58. size_t bytesWritten = 0;
  59. char buf[BUF_SIZE];
  60. memset(buf, 'a', sizeof(buf));
  61. while (true) {
  62. ssize_t rc = write(fd, buf, sizeof(buf));
  63. if (rc < 0) {
  64. CHECK_EQ(errno, EAGAIN);
  65. break;
  66. } else {
  67. bytesWritten += rc;
  68. }
  69. }
  70. return bytesWritten;
  71. }
  72. ssize_t readFromFD(int fd, size_t length) {
  73. // write an arbitrary amount of data to the fd
  74. auto buf = vector<char>(length);
  75. return read(fd, buf.data(), length);
  76. }
  77. size_t readUntilEmpty(int fd) {
  78. // Read from the fd until EAGAIN is returned
  79. char buf[BUF_SIZE];
  80. size_t bytesRead = 0;
  81. while (true) {
  82. int rc = read(fd, buf, sizeof(buf));
  83. if (rc == 0) {
  84. CHECK(false) << "unexpected EOF";
  85. } else if (rc < 0) {
  86. CHECK_EQ(errno, EAGAIN);
  87. break;
  88. } else {
  89. bytesRead += rc;
  90. }
  91. }
  92. return bytesRead;
  93. }
  94. void checkReadUntilEmpty(int fd, size_t expectedLength) {
  95. ASSERT_EQ(readUntilEmpty(fd), expectedLength);
  96. }
  97. struct ScheduledEvent {
  98. int milliseconds;
  99. uint16_t events;
  100. size_t length;
  101. ssize_t result;
  102. void perform(int fd) {
  103. if (events & EventHandler::READ) {
  104. if (length == 0) {
  105. result = readUntilEmpty(fd);
  106. } else {
  107. result = readFromFD(fd, length);
  108. }
  109. }
  110. if (events & EventHandler::WRITE) {
  111. if (length == 0) {
  112. result = writeUntilFull(fd);
  113. } else {
  114. result = writeToFD(fd, length);
  115. }
  116. }
  117. }
  118. };
  119. void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
  120. for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
  121. eventBase->tryRunAfterDelay(
  122. std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
  123. }
  124. }
  125. class TestHandler : public EventHandler {
  126. public:
  127. TestHandler(EventBase* eventBase, int fd)
  128. : EventHandler(eventBase, fd), fd_(fd) {}
  129. void handlerReady(uint16_t events) noexcept override {
  130. ssize_t bytesRead = 0;
  131. ssize_t bytesWritten = 0;
  132. if (events & READ) {
  133. // Read all available data, so EventBase will stop calling us
  134. // until new data becomes available
  135. bytesRead = readUntilEmpty(fd_);
  136. }
  137. if (events & WRITE) {
  138. // Write until the pipe buffer is full, so EventBase will stop calling
  139. // us until the other end has read some data
  140. bytesWritten = writeUntilFull(fd_);
  141. }
  142. log.emplace_back(events, bytesRead, bytesWritten);
  143. }
  144. struct EventRecord {
  145. EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
  146. : events(events_),
  147. timestamp(),
  148. bytesRead(bytesRead_),
  149. bytesWritten(bytesWritten_) {}
  150. uint16_t events;
  151. TimePoint timestamp;
  152. ssize_t bytesRead;
  153. ssize_t bytesWritten;
  154. };
  155. deque<EventRecord> log;
  156. private:
  157. int fd_;
  158. };
  159. /**
  160. * Test a READ event
  161. */
  162. TEST(EventBaseTest, ReadEvent) {
  163. EventBase eb;
  164. SocketPair sp;
  165. // Register for read events
  166. TestHandler handler(&eb, sp[0]);
  167. handler.registerHandler(EventHandler::READ);
  168. // Register timeouts to perform two write events
  169. ScheduledEvent events[] = {
  170. {10, EventHandler::WRITE, 2345, 0},
  171. {160, EventHandler::WRITE, 99, 0},
  172. {0, 0, 0, 0},
  173. };
  174. scheduleEvents(&eb, sp[1], events);
  175. // Loop
  176. TimePoint start;
  177. eb.loop();
  178. TimePoint end;
  179. // Since we didn't use the EventHandler::PERSIST flag, the handler should
  180. // have received the first read, then unregistered itself. Check that only
  181. // the first chunk of data was received.
  182. ASSERT_EQ(handler.log.size(), 1);
  183. ASSERT_EQ(handler.log[0].events, EventHandler::READ);
  184. T_CHECK_TIMEOUT(
  185. start,
  186. handler.log[0].timestamp,
  187. milliseconds(events[0].milliseconds),
  188. milliseconds(90));
  189. ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
  190. ASSERT_EQ(handler.log[0].bytesWritten, 0);
  191. T_CHECK_TIMEOUT(
  192. start, end, milliseconds(events[1].milliseconds), milliseconds(30));
  193. // Make sure the second chunk of data is still waiting to be read.
  194. size_t bytesRemaining = readUntilEmpty(sp[0]);
  195. ASSERT_EQ(bytesRemaining, events[1].length);
  196. }
  197. /**
  198. * Test (READ | PERSIST)
  199. */
  200. TEST(EventBaseTest, ReadPersist) {
  201. EventBase eb;
  202. SocketPair sp;
  203. // Register for read events
  204. TestHandler handler(&eb, sp[0]);
  205. handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  206. // Register several timeouts to perform writes
  207. ScheduledEvent events[] = {
  208. {10, EventHandler::WRITE, 1024, 0},
  209. {20, EventHandler::WRITE, 2211, 0},
  210. {30, EventHandler::WRITE, 4096, 0},
  211. {100, EventHandler::WRITE, 100, 0},
  212. {0, 0, 0, 0},
  213. };
  214. scheduleEvents(&eb, sp[1], events);
  215. // Schedule a timeout to unregister the handler after the third write
  216. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
  217. // Loop
  218. TimePoint start;
  219. eb.loop();
  220. TimePoint end;
  221. // The handler should have received the first 3 events,
  222. // then been unregistered after that.
  223. ASSERT_EQ(handler.log.size(), 3);
  224. for (int n = 0; n < 3; ++n) {
  225. ASSERT_EQ(handler.log[n].events, EventHandler::READ);
  226. T_CHECK_TIMEOUT(
  227. start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
  228. ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
  229. ASSERT_EQ(handler.log[n].bytesWritten, 0);
  230. }
  231. T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
  232. // Make sure the data from the last write is still waiting to be read
  233. size_t bytesRemaining = readUntilEmpty(sp[0]);
  234. ASSERT_EQ(bytesRemaining, events[3].length);
  235. }
  236. /**
  237. * Test registering for READ when the socket is immediately readable
  238. */
  239. TEST(EventBaseTest, ReadImmediate) {
  240. EventBase eb;
  241. SocketPair sp;
  242. // Write some data to the socket so the other end will
  243. // be immediately readable
  244. size_t dataLength = 1234;
  245. writeToFD(sp[1], dataLength);
  246. // Register for read events
  247. TestHandler handler(&eb, sp[0]);
  248. handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  249. // Register a timeout to perform another write
  250. ScheduledEvent events[] = {
  251. {10, EventHandler::WRITE, 2345, 0},
  252. {0, 0, 0, 0},
  253. };
  254. scheduleEvents(&eb, sp[1], events);
  255. // Schedule a timeout to unregister the handler
  256. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
  257. // Loop
  258. TimePoint start;
  259. eb.loop();
  260. TimePoint end;
  261. ASSERT_EQ(handler.log.size(), 2);
  262. // There should have been 1 event for immediate readability
  263. ASSERT_EQ(handler.log[0].events, EventHandler::READ);
  264. T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
  265. ASSERT_EQ(handler.log[0].bytesRead, dataLength);
  266. ASSERT_EQ(handler.log[0].bytesWritten, 0);
  267. // There should be another event after the timeout wrote more data
  268. ASSERT_EQ(handler.log[1].events, EventHandler::READ);
  269. T_CHECK_TIMEOUT(
  270. start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
  271. ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
  272. ASSERT_EQ(handler.log[1].bytesWritten, 0);
  273. T_CHECK_TIMEOUT(start, end, milliseconds(20));
  274. }
  275. /**
  276. * Test a WRITE event
  277. */
  278. TEST(EventBaseTest, WriteEvent) {
  279. EventBase eb;
  280. SocketPair sp;
  281. // Fill up the write buffer before starting
  282. size_t initialBytesWritten = writeUntilFull(sp[0]);
  283. // Register for write events
  284. TestHandler handler(&eb, sp[0]);
  285. handler.registerHandler(EventHandler::WRITE);
  286. // Register timeouts to perform two reads
  287. ScheduledEvent events[] = {
  288. {10, EventHandler::READ, 0, 0},
  289. {60, EventHandler::READ, 0, 0},
  290. {0, 0, 0, 0},
  291. };
  292. scheduleEvents(&eb, sp[1], events);
  293. // Loop
  294. TimePoint start;
  295. eb.loop();
  296. TimePoint end;
  297. // Since we didn't use the EventHandler::PERSIST flag, the handler should
  298. // have only been able to write once, then unregistered itself.
  299. ASSERT_EQ(handler.log.size(), 1);
  300. ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
  301. T_CHECK_TIMEOUT(
  302. start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
  303. ASSERT_EQ(handler.log[0].bytesRead, 0);
  304. ASSERT_GT(handler.log[0].bytesWritten, 0);
  305. T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
  306. ASSERT_EQ(events[0].result, initialBytesWritten);
  307. ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
  308. }
  309. /**
  310. * Test (WRITE | PERSIST)
  311. */
  312. TEST(EventBaseTest, WritePersist) {
  313. EventBase eb;
  314. SocketPair sp;
  315. // Fill up the write buffer before starting
  316. size_t initialBytesWritten = writeUntilFull(sp[0]);
  317. // Register for write events
  318. TestHandler handler(&eb, sp[0]);
  319. handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
  320. // Register several timeouts to read from the socket at several intervals
  321. ScheduledEvent events[] = {
  322. {10, EventHandler::READ, 0, 0},
  323. {40, EventHandler::READ, 0, 0},
  324. {70, EventHandler::READ, 0, 0},
  325. {100, EventHandler::READ, 0, 0},
  326. {0, 0, 0, 0},
  327. };
  328. scheduleEvents(&eb, sp[1], events);
  329. // Schedule a timeout to unregister the handler after the third read
  330. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
  331. // Loop
  332. TimePoint start;
  333. eb.loop();
  334. TimePoint end;
  335. // The handler should have received the first 3 events,
  336. // then been unregistered after that.
  337. ASSERT_EQ(handler.log.size(), 3);
  338. ASSERT_EQ(events[0].result, initialBytesWritten);
  339. for (int n = 0; n < 3; ++n) {
  340. ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
  341. T_CHECK_TIMEOUT(
  342. start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
  343. ASSERT_EQ(handler.log[n].bytesRead, 0);
  344. ASSERT_GT(handler.log[n].bytesWritten, 0);
  345. ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
  346. }
  347. T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
  348. }
  349. /**
  350. * Test registering for WRITE when the socket is immediately writable
  351. */
  352. TEST(EventBaseTest, WriteImmediate) {
  353. EventBase eb;
  354. SocketPair sp;
  355. // Register for write events
  356. TestHandler handler(&eb, sp[0]);
  357. handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
  358. // Register a timeout to perform a read
  359. ScheduledEvent events[] = {
  360. {10, EventHandler::READ, 0, 0},
  361. {0, 0, 0, 0},
  362. };
  363. scheduleEvents(&eb, sp[1], events);
  364. // Schedule a timeout to unregister the handler
  365. int64_t unregisterTimeout = 40;
  366. eb.tryRunAfterDelay(
  367. std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
  368. // Loop
  369. TimePoint start;
  370. eb.loop();
  371. TimePoint end;
  372. ASSERT_EQ(handler.log.size(), 2);
  373. // Since the socket buffer was initially empty,
  374. // there should have been 1 event for immediate writability
  375. ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
  376. T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
  377. ASSERT_EQ(handler.log[0].bytesRead, 0);
  378. ASSERT_GT(handler.log[0].bytesWritten, 0);
  379. // There should be another event after the timeout wrote more data
  380. ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
  381. T_CHECK_TIMEOUT(
  382. start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
  383. ASSERT_EQ(handler.log[1].bytesRead, 0);
  384. ASSERT_GT(handler.log[1].bytesWritten, 0);
  385. T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
  386. }
  387. /**
  388. * Test (READ | WRITE) when the socket becomes readable first
  389. */
  390. TEST(EventBaseTest, ReadWrite) {
  391. EventBase eb;
  392. SocketPair sp;
  393. // Fill up the write buffer before starting
  394. size_t sock0WriteLength = writeUntilFull(sp[0]);
  395. // Register for read and write events
  396. TestHandler handler(&eb, sp[0]);
  397. handler.registerHandler(EventHandler::READ_WRITE);
  398. // Register timeouts to perform a write then a read.
  399. ScheduledEvent events[] = {
  400. {10, EventHandler::WRITE, 2345, 0},
  401. {40, EventHandler::READ, 0, 0},
  402. {0, 0, 0, 0},
  403. };
  404. scheduleEvents(&eb, sp[1], events);
  405. // Loop
  406. TimePoint start;
  407. eb.loop();
  408. TimePoint end;
  409. // Since we didn't use the EventHandler::PERSIST flag, the handler should
  410. // have only noticed readability, then unregistered itself. Check that only
  411. // one event was logged.
  412. ASSERT_EQ(handler.log.size(), 1);
  413. ASSERT_EQ(handler.log[0].events, EventHandler::READ);
  414. T_CHECK_TIMEOUT(
  415. start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
  416. ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
  417. ASSERT_EQ(handler.log[0].bytesWritten, 0);
  418. ASSERT_EQ(events[1].result, sock0WriteLength);
  419. T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
  420. }
  421. /**
  422. * Test (READ | WRITE) when the socket becomes writable first
  423. */
  424. TEST(EventBaseTest, WriteRead) {
  425. EventBase eb;
  426. SocketPair sp;
  427. // Fill up the write buffer before starting
  428. size_t sock0WriteLength = writeUntilFull(sp[0]);
  429. // Register for read and write events
  430. TestHandler handler(&eb, sp[0]);
  431. handler.registerHandler(EventHandler::READ_WRITE);
  432. // Register timeouts to perform a read then a write.
  433. size_t sock1WriteLength = 2345;
  434. ScheduledEvent events[] = {
  435. {10, EventHandler::READ, 0, 0},
  436. {40, EventHandler::WRITE, sock1WriteLength, 0},
  437. {0, 0, 0, 0},
  438. };
  439. scheduleEvents(&eb, sp[1], events);
  440. // Loop
  441. TimePoint start;
  442. eb.loop();
  443. TimePoint end;
  444. // Since we didn't use the EventHandler::PERSIST flag, the handler should
  445. // have only noticed writability, then unregistered itself. Check that only
  446. // one event was logged.
  447. ASSERT_EQ(handler.log.size(), 1);
  448. ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
  449. T_CHECK_TIMEOUT(
  450. start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
  451. ASSERT_EQ(handler.log[0].bytesRead, 0);
  452. ASSERT_GT(handler.log[0].bytesWritten, 0);
  453. ASSERT_EQ(events[0].result, sock0WriteLength);
  454. ASSERT_EQ(events[1].result, sock1WriteLength);
  455. T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
  456. // Make sure the written data is still waiting to be read.
  457. size_t bytesRemaining = readUntilEmpty(sp[0]);
  458. ASSERT_EQ(bytesRemaining, events[1].length);
  459. }
  460. /**
  461. * Test (READ | WRITE) when the socket becomes readable and writable
  462. * at the same time.
  463. */
  464. TEST(EventBaseTest, ReadWriteSimultaneous) {
  465. EventBase eb;
  466. SocketPair sp;
  467. // Fill up the write buffer before starting
  468. size_t sock0WriteLength = writeUntilFull(sp[0]);
  469. // Register for read and write events
  470. TestHandler handler(&eb, sp[0]);
  471. handler.registerHandler(EventHandler::READ_WRITE);
  472. // Register a timeout to perform a read and write together
  473. ScheduledEvent events[] = {
  474. {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
  475. {0, 0, 0, 0},
  476. };
  477. scheduleEvents(&eb, sp[1], events);
  478. // Loop
  479. TimePoint start;
  480. eb.loop();
  481. TimePoint end;
  482. // It's not strictly required that the EventBase register us about both
  483. // events in the same call. So, it's possible that if the EventBase
  484. // implementation changes this test could start failing, and it wouldn't be
  485. // considered breaking the API. However for now it's nice to exercise this
  486. // code path.
  487. ASSERT_EQ(handler.log.size(), 1);
  488. ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE);
  489. T_CHECK_TIMEOUT(
  490. start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
  491. ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
  492. ASSERT_GT(handler.log[0].bytesWritten, 0);
  493. T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
  494. }
  495. /**
  496. * Test (READ | WRITE | PERSIST)
  497. */
  498. TEST(EventBaseTest, ReadWritePersist) {
  499. EventBase eb;
  500. SocketPair sp;
  501. // Register for read and write events
  502. TestHandler handler(&eb, sp[0]);
  503. handler.registerHandler(
  504. EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
  505. // Register timeouts to perform several reads and writes
  506. ScheduledEvent events[] = {
  507. {10, EventHandler::WRITE, 2345, 0},
  508. {20, EventHandler::READ, 0, 0},
  509. {35, EventHandler::WRITE, 200, 0},
  510. {45, EventHandler::WRITE, 15, 0},
  511. {55, EventHandler::READ, 0, 0},
  512. {120, EventHandler::WRITE, 2345, 0},
  513. {0, 0, 0, 0},
  514. };
  515. scheduleEvents(&eb, sp[1], events);
  516. // Schedule a timeout to unregister the handler
  517. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
  518. // Loop
  519. TimePoint start;
  520. eb.loop();
  521. TimePoint end;
  522. ASSERT_EQ(handler.log.size(), 6);
  523. // Since we didn't fill up the write buffer immediately, there should
  524. // be an immediate event for writability.
  525. ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
  526. T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
  527. ASSERT_EQ(handler.log[0].bytesRead, 0);
  528. ASSERT_GT(handler.log[0].bytesWritten, 0);
  529. // Events 1 through 5 should correspond to the scheduled events
  530. for (int n = 1; n < 6; ++n) {
  531. ScheduledEvent* event = &events[n - 1];
  532. T_CHECK_TIMEOUT(
  533. start, handler.log[n].timestamp, milliseconds(event->milliseconds));
  534. if (event->events == EventHandler::READ) {
  535. ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
  536. ASSERT_EQ(handler.log[n].bytesRead, 0);
  537. ASSERT_GT(handler.log[n].bytesWritten, 0);
  538. } else {
  539. ASSERT_EQ(handler.log[n].events, EventHandler::READ);
  540. ASSERT_EQ(handler.log[n].bytesRead, event->length);
  541. ASSERT_EQ(handler.log[n].bytesWritten, 0);
  542. }
  543. }
  544. // The timeout should have unregistered the handler before the last write.
  545. // Make sure that data is still waiting to be read
  546. size_t bytesRemaining = readUntilEmpty(sp[0]);
  547. ASSERT_EQ(bytesRemaining, events[5].length);
  548. }
  549. class PartialReadHandler : public TestHandler {
  550. public:
  551. PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
  552. : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
  553. void handlerReady(uint16_t events) noexcept override {
  554. assert(events == EventHandler::READ);
  555. ssize_t bytesRead = readFromFD(fd_, readLength_);
  556. log.emplace_back(events, bytesRead, 0);
  557. }
  558. private:
  559. int fd_;
  560. size_t readLength_;
  561. };
  562. /**
  563. * Test reading only part of the available data when a read event is fired.
  564. * When PERSIST is used, make sure the handler gets notified again the next
  565. * time around the loop.
  566. */
  567. TEST(EventBaseTest, ReadPartial) {
  568. EventBase eb;
  569. SocketPair sp;
  570. // Register for read events
  571. size_t readLength = 100;
  572. PartialReadHandler handler(&eb, sp[0], readLength);
  573. handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
  574. // Register a timeout to perform a single write,
  575. // with more data than PartialReadHandler will read at once
  576. ScheduledEvent events[] = {
  577. {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
  578. {0, 0, 0, 0},
  579. };
  580. scheduleEvents(&eb, sp[1], events);
  581. // Schedule a timeout to unregister the handler
  582. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
  583. // Loop
  584. TimePoint start;
  585. eb.loop();
  586. TimePoint end;
  587. ASSERT_EQ(handler.log.size(), 4);
  588. // The first 3 invocations should read readLength bytes each
  589. for (int n = 0; n < 3; ++n) {
  590. ASSERT_EQ(handler.log[n].events, EventHandler::READ);
  591. T_CHECK_TIMEOUT(
  592. start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
  593. ASSERT_EQ(handler.log[n].bytesRead, readLength);
  594. ASSERT_EQ(handler.log[n].bytesWritten, 0);
  595. }
  596. // The last read only has readLength/2 bytes
  597. ASSERT_EQ(handler.log[3].events, EventHandler::READ);
  598. T_CHECK_TIMEOUT(
  599. start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
  600. ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
  601. ASSERT_EQ(handler.log[3].bytesWritten, 0);
  602. }
  603. class PartialWriteHandler : public TestHandler {
  604. public:
  605. PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
  606. : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
  607. void handlerReady(uint16_t events) noexcept override {
  608. assert(events == EventHandler::WRITE);
  609. ssize_t bytesWritten = writeToFD(fd_, writeLength_);
  610. log.emplace_back(events, 0, bytesWritten);
  611. }
  612. private:
  613. int fd_;
  614. size_t writeLength_;
  615. };
  616. /**
  617. * Test writing without completely filling up the write buffer when the fd
  618. * becomes writable. When PERSIST is used, make sure the handler gets
  619. * notified again the next time around the loop.
  620. */
  621. TEST(EventBaseTest, WritePartial) {
  622. EventBase eb;
  623. SocketPair sp;
  624. // Fill up the write buffer before starting
  625. size_t initialBytesWritten = writeUntilFull(sp[0]);
  626. // Register for write events
  627. size_t writeLength = 100;
  628. PartialWriteHandler handler(&eb, sp[0], writeLength);
  629. handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
  630. // Register a timeout to read, so that more data can be written
  631. ScheduledEvent events[] = {
  632. {10, EventHandler::READ, 0, 0},
  633. {0, 0, 0, 0},
  634. };
  635. scheduleEvents(&eb, sp[1], events);
  636. // Schedule a timeout to unregister the handler
  637. eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
  638. // Loop
  639. TimePoint start;
  640. eb.loop();
  641. TimePoint end;
  642. // Depending on how big the socket buffer is, there will be multiple writes
  643. // Only check the first 5
  644. int numChecked = 5;
  645. ASSERT_GE(handler.log.size(), numChecked);
  646. ASSERT_EQ(events[0].result, initialBytesWritten);
  647. // The first 3 invocations should read writeLength bytes each
  648. for (int n = 0; n < numChecked; ++n) {
  649. ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
  650. T_CHECK_TIMEOUT(
  651. start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
  652. ASSERT_EQ(handler.log[n].bytesRead, 0);
  653. ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
  654. }
  655. }
  656. /**
  657. * Test destroying a registered EventHandler
  658. */
  659. TEST(EventBaseTest, DestroyHandler) {
  660. class DestroyHandler : public AsyncTimeout {
  661. public:
  662. DestroyHandler(EventBase* eb, EventHandler* h)
  663. : AsyncTimeout(eb), handler_(h) {}
  664. void timeoutExpired() noexcept override {
  665. delete handler_;
  666. }
  667. private:
  668. EventHandler* handler_;
  669. };
  670. EventBase eb;
  671. SocketPair sp;
  672. // Fill up the write buffer before starting
  673. size_t initialBytesWritten = writeUntilFull(sp[0]);
  674. // Register for write events
  675. TestHandler* handler = new TestHandler(&eb, sp[0]);
  676. handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
  677. // After 10ms, read some data, so that the handler
  678. // will be notified that it can write.
  679. eb.tryRunAfterDelay(
  680. std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
  681. // Start a timer to destroy the handler after 25ms
  682. // This mainly just makes sure the code doesn't break or assert
  683. DestroyHandler dh(&eb, handler);
  684. dh.scheduleTimeout(25);
  685. TimePoint start;
  686. eb.loop();
  687. TimePoint end;
  688. // Make sure the EventHandler was uninstalled properly when it was
  689. // destroyed, and the EventBase loop exited
  690. T_CHECK_TIMEOUT(start, end, milliseconds(25));
  691. // Make sure that the handler wrote data to the socket
  692. // before it was destroyed
  693. size_t bytesRemaining = readUntilEmpty(sp[1]);
  694. ASSERT_GT(bytesRemaining, 0);
  695. }
  696. ///////////////////////////////////////////////////////////////////////////
  697. // Tests for timeout events
  698. ///////////////////////////////////////////////////////////////////////////
  699. TEST(EventBaseTest, RunAfterDelay) {
  700. EventBase eb;
  701. TimePoint timestamp1(false);
  702. TimePoint timestamp2(false);
  703. TimePoint timestamp3(false);
  704. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
  705. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
  706. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 40);
  707. TimePoint start;
  708. eb.loop();
  709. TimePoint end;
  710. T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
  711. T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
  712. T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
  713. T_CHECK_TIMEOUT(start, end, milliseconds(40));
  714. }
  715. /**
  716. * Test the behavior of tryRunAfterDelay() when some timeouts are
  717. * still scheduled when the EventBase is destroyed.
  718. */
  719. TEST(EventBaseTest, RunAfterDelayDestruction) {
  720. TimePoint timestamp1(false);
  721. TimePoint timestamp2(false);
  722. TimePoint timestamp3(false);
  723. TimePoint timestamp4(false);
  724. TimePoint start(false);
  725. TimePoint end(false);
  726. {
  727. EventBase eb;
  728. // Run two normal timeouts
  729. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp1), 10);
  730. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp2), 20);
  731. // Schedule a timeout to stop the event loop after 40ms
  732. eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
  733. // Schedule 2 timeouts that would fire after the event loop stops
  734. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp3), 80);
  735. eb.tryRunAfterDelay(std::bind(&TimePoint::reset, &timestamp4), 160);
  736. start.reset();
  737. eb.loop();
  738. end.reset();
  739. }
  740. T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
  741. T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
  742. T_CHECK_TIMEOUT(start, end, milliseconds(40));
  743. ASSERT_TRUE(timestamp3.isUnset());
  744. ASSERT_TRUE(timestamp4.isUnset());
  745. // Ideally this test should be run under valgrind to ensure that no
  746. // memory is leaked.
  747. }
  748. class TestTimeout : public AsyncTimeout {
  749. public:
  750. explicit TestTimeout(EventBase* eventBase)
  751. : AsyncTimeout(eventBase), timestamp(false) {}
  752. void timeoutExpired() noexcept override {
  753. timestamp.reset();
  754. }
  755. TimePoint timestamp;
  756. };
  757. TEST(EventBaseTest, BasicTimeouts) {
  758. EventBase eb;
  759. TestTimeout t1(&eb);
  760. TestTimeout t2(&eb);
  761. TestTimeout t3(&eb);
  762. t1.scheduleTimeout(10);
  763. t2.scheduleTimeout(20);
  764. t3.scheduleTimeout(40);
  765. TimePoint start;
  766. eb.loop();
  767. TimePoint end;
  768. T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
  769. T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
  770. T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
  771. T_CHECK_TIMEOUT(start, end, milliseconds(40));
  772. }
  773. class ReschedulingTimeout : public AsyncTimeout {
  774. public:
  775. ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
  776. : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
  777. void start() {
  778. reschedule();
  779. }
  780. void timeoutExpired() noexcept override {
  781. timestamps.emplace_back();
  782. reschedule();
  783. }
  784. void reschedule() {
  785. if (iterator_ != timeouts_.end()) {
  786. uint32_t timeout = *iterator_;
  787. ++iterator_;
  788. scheduleTimeout(timeout);
  789. }
  790. }
  791. vector<TimePoint> timestamps;
  792. private:
  793. vector<uint32_t> timeouts_;
  794. vector<uint32_t>::const_iterator iterator_;
  795. };
  796. /**
  797. * Test rescheduling the same timeout multiple times
  798. */
  799. TEST(EventBaseTest, ReuseTimeout) {
  800. EventBase eb;
  801. vector<uint32_t> timeouts;
  802. timeouts.push_back(10);
  803. timeouts.push_back(30);
  804. timeouts.push_back(15);
  805. ReschedulingTimeout t(&eb, timeouts);
  806. t.start();
  807. TimePoint start;
  808. eb.loop();
  809. TimePoint end;
  810. // Use a higher tolerance than usual. We're waiting on 3 timeouts
  811. // consecutively. In general, each timeout may go over by a few
  812. // milliseconds, and we're tripling this error by witing on 3 timeouts.
  813. milliseconds tolerance{6};
  814. ASSERT_EQ(timeouts.size(), t.timestamps.size());
  815. uint32_t total = 0;
  816. for (size_t n = 0; n < timeouts.size(); ++n) {
  817. total += timeouts[n];
  818. T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
  819. }
  820. T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
  821. }
  822. /**
  823. * Test rescheduling a timeout before it has fired
  824. */
  825. TEST(EventBaseTest, RescheduleTimeout) {
  826. EventBase eb;
  827. TestTimeout t1(&eb);
  828. TestTimeout t2(&eb);
  829. TestTimeout t3(&eb);
  830. t1.scheduleTimeout(15);
  831. t2.scheduleTimeout(30);
  832. t3.scheduleTimeout(30);
  833. auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
  834. &AsyncTimeout::scheduleTimeout);
  835. // after 10ms, reschedule t2 to run sooner than originally scheduled
  836. eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
  837. // after 10ms, reschedule t3 to run later than originally scheduled
  838. eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
  839. TimePoint start;
  840. eb.loop();
  841. TimePoint end;
  842. T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
  843. T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
  844. T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
  845. T_CHECK_TIMEOUT(start, end, milliseconds(50));
  846. }
  847. /**
  848. * Test cancelling a timeout
  849. */
  850. TEST(EventBaseTest, CancelTimeout) {
  851. EventBase eb;
  852. vector<uint32_t> timeouts;
  853. timeouts.push_back(10);
  854. timeouts.push_back(30);
  855. timeouts.push_back(25);
  856. ReschedulingTimeout t(&eb, timeouts);
  857. t.start();
  858. eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
  859. TimePoint start;
  860. eb.loop();
  861. TimePoint end;
  862. ASSERT_EQ(t.timestamps.size(), 2);
  863. T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
  864. T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
  865. T_CHECK_TIMEOUT(start, end, milliseconds(50));
  866. }
  867. /**
  868. * Test destroying a scheduled timeout object
  869. */
  870. TEST(EventBaseTest, DestroyTimeout) {
  871. class DestroyTimeout : public AsyncTimeout {
  872. public:
  873. DestroyTimeout(EventBase* eb, AsyncTimeout* t)
  874. : AsyncTimeout(eb), timeout_(t) {}
  875. void timeoutExpired() noexcept override {
  876. delete timeout_;
  877. }
  878. private:
  879. AsyncTimeout* timeout_;
  880. };
  881. EventBase eb;
  882. TestTimeout* t1 = new TestTimeout(&eb);
  883. t1->scheduleTimeout(30);
  884. DestroyTimeout dt(&eb, t1);
  885. dt.scheduleTimeout(10);
  886. TimePoint start;
  887. eb.loop();
  888. TimePoint end;
  889. T_CHECK_TIMEOUT(start, end, milliseconds(10));
  890. }
  891. /**
  892. * Test the scheduled executor impl
  893. */
  894. TEST(EventBaseTest, ScheduledFn) {
  895. EventBase eb;
  896. TimePoint timestamp1(false);
  897. TimePoint timestamp2(false);
  898. TimePoint timestamp3(false);
  899. eb.schedule(std::bind(&TimePoint::reset, &timestamp1), milliseconds(9));
  900. eb.schedule(std::bind(&TimePoint::reset, &timestamp2), milliseconds(19));
  901. eb.schedule(std::bind(&TimePoint::reset, &timestamp3), milliseconds(39));
  902. TimePoint start;
  903. eb.loop();
  904. TimePoint end;
  905. T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
  906. T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
  907. T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
  908. T_CHECK_TIMEOUT(start, end, milliseconds(39));
  909. }
  910. TEST(EventBaseTest, ScheduledFnAt) {
  911. EventBase eb;
  912. TimePoint timestamp0(false);
  913. TimePoint timestamp1(false);
  914. TimePoint timestamp2(false);
  915. TimePoint timestamp3(false);
  916. eb.scheduleAt(
  917. std::bind(&TimePoint::reset, &timestamp1), eb.now() - milliseconds(5));
  918. eb.scheduleAt(
  919. std::bind(&TimePoint::reset, &timestamp1), eb.now() + milliseconds(9));
  920. eb.scheduleAt(
  921. std::bind(&TimePoint::reset, &timestamp2), eb.now() + milliseconds(19));
  922. eb.scheduleAt(
  923. std::bind(&TimePoint::reset, &timestamp3), eb.now() + milliseconds(39));
  924. TimePoint start;
  925. eb.loop();
  926. TimePoint end;
  927. T_CHECK_TIME_LT(start, timestamp0, milliseconds(0));
  928. T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
  929. T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
  930. T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
  931. T_CHECK_TIMEOUT(start, end, milliseconds(39));
  932. }
  933. ///////////////////////////////////////////////////////////////////////////
  934. // Test for runInThreadTestFunc()
  935. ///////////////////////////////////////////////////////////////////////////
  936. struct RunInThreadData {
  937. RunInThreadData(int numThreads, int opsPerThread_)
  938. : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
  939. EventBase evb;
  940. deque<pair<int, int>> values;
  941. int opsPerThread;
  942. int opsToGo;
  943. };
  944. struct RunInThreadArg {
  945. RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
  946. : data(data_), thread(threadId), value(value_) {}
  947. RunInThreadData* data;
  948. int thread;
  949. int value;
  950. };
  951. void runInThreadTestFunc(RunInThreadArg* arg) {
  952. arg->data->values.emplace_back(arg->thread, arg->value);
  953. RunInThreadData* data = arg->data;
  954. delete arg;
  955. if (--data->opsToGo == 0) {
  956. // Break out of the event base loop if we are the last thread running
  957. data->evb.terminateLoopSoon();
  958. }
  959. }
  960. TEST(EventBaseTest, RunInThread) {
  961. constexpr uint32_t numThreads = 50;
  962. constexpr uint32_t opsPerThread = 100;
  963. RunInThreadData data(numThreads, opsPerThread);
  964. deque<std::thread> threads;
  965. SCOPE_EXIT {
  966. // Wait on all of the threads.
  967. for (auto& thread : threads) {
  968. thread.join();
  969. }
  970. };
  971. for (uint32_t i = 0; i < numThreads; ++i) {
  972. threads.emplace_back([i, &data] {
  973. for (int n = 0; n < data.opsPerThread; ++n) {
  974. RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
  975. data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
  976. usleep(10);
  977. }
  978. });
  979. }
  980. // Add a timeout event to run after 3 seconds.
  981. // Otherwise loop() will return immediately since there are no events to run.
  982. // Once the last thread exits, it will stop the loop(). However, this
  983. // timeout also stops the loop in case there is a bug performing the normal
  984. // stop.
  985. data.evb.tryRunAfterDelay(
  986. std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
  987. TimePoint start;
  988. data.evb.loop();
  989. TimePoint end;
  990. // Verify that the loop exited because all threads finished and requested it
  991. // to stop. This should happen much sooner than the 3 second timeout.
  992. // Assert that it happens in under a second. (This is still tons of extra
  993. // padding.)
  994. auto timeTaken =
  995. std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
  996. ASSERT_LT(timeTaken.count(), 1000);
  997. VLOG(11) << "Time taken: " << timeTaken.count();
  998. // Verify that we have all of the events from every thread
  999. int expectedValues[numThreads];
  1000. for (uint32_t n = 0; n < numThreads; ++n) {
  1001. expectedValues[n] = 0;
  1002. }
  1003. for (deque<pair<int, int>>::const_iterator it = data.values.begin();
  1004. it != data.values.end();
  1005. ++it) {
  1006. int threadID = it->first;
  1007. int value = it->second;
  1008. ASSERT_EQ(expectedValues[threadID], value);
  1009. ++expectedValues[threadID];
  1010. }
  1011. for (uint32_t n = 0; n < numThreads; ++n) {
  1012. ASSERT_EQ(expectedValues[n], opsPerThread);
  1013. }
  1014. }
  1015. // This test simulates some calls, and verifies that the waiting happens by
  1016. // triggering what otherwise would be race conditions, and trying to detect
  1017. // whether any of the race conditions happened.
  1018. TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
  1019. const size_t c = 256;
  1020. vector<unique_ptr<atomic<size_t>>> atoms(c);
  1021. for (size_t i = 0; i < c; ++i) {
  1022. auto& atom = atoms.at(i);
  1023. atom = std::make_unique<atomic<size_t>>(0);
  1024. }
  1025. vector<thread> threads;
  1026. for (size_t i = 0; i < c; ++i) {
  1027. threads.emplace_back([&atoms, i] {
  1028. EventBase eb;
  1029. auto& atom = *atoms.at(i);
  1030. auto ebth = thread([&] { eb.loopForever(); });
  1031. eb.waitUntilRunning();
  1032. eb.runInEventBaseThreadAndWait([&] {
  1033. size_t x = 0;
  1034. atom.compare_exchange_weak(
  1035. x, 1, std::memory_order_release, std::memory_order_relaxed);
  1036. });
  1037. size_t x = 0;
  1038. atom.compare_exchange_weak(
  1039. x, 2, std::memory_order_release, std::memory_order_relaxed);
  1040. eb.terminateLoopSoon();
  1041. ebth.join();
  1042. });
  1043. }
  1044. for (size_t i = 0; i < c; ++i) {
  1045. auto& th = threads.at(i);
  1046. th.join();
  1047. }
  1048. size_t sum = 0;
  1049. for (auto& atom : atoms) {
  1050. sum += *atom;
  1051. }
  1052. EXPECT_EQ(c, sum);
  1053. }
  1054. TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
  1055. EventBase eb;
  1056. thread th(&EventBase::loopForever, &eb);
  1057. SCOPE_EXIT {
  1058. eb.terminateLoopSoon();
  1059. th.join();
  1060. };
  1061. auto mutated = false;
  1062. eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
  1063. EXPECT_TRUE(mutated);
  1064. }
  1065. TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
  1066. EventBase eb;
  1067. thread th(&EventBase::loopForever, &eb);
  1068. SCOPE_EXIT {
  1069. eb.terminateLoopSoon();
  1070. th.join();
  1071. };
  1072. eb.runInEventBaseThreadAndWait([&] {
  1073. auto mutated = false;
  1074. eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
  1075. EXPECT_TRUE(mutated);
  1076. });
  1077. }
  1078. TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
  1079. EventBase eb;
  1080. auto mutated = false;
  1081. eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
  1082. EXPECT_TRUE(mutated);
  1083. }
  1084. ///////////////////////////////////////////////////////////////////////////
  1085. // Tests for runInLoop()
  1086. ///////////////////////////////////////////////////////////////////////////
  1087. class CountedLoopCallback : public EventBase::LoopCallback {
  1088. public:
  1089. CountedLoopCallback(
  1090. EventBase* eventBase,
  1091. unsigned int count,
  1092. std::function<void()> action = std::function<void()>())
  1093. : eventBase_(eventBase), count_(count), action_(action) {}
  1094. void runLoopCallback() noexcept override {
  1095. --count_;
  1096. if (count_ > 0) {
  1097. eventBase_->runInLoop(this);
  1098. } else if (action_) {
  1099. action_();
  1100. }
  1101. }
  1102. unsigned int getCount() const {
  1103. return count_;
  1104. }
  1105. private:
  1106. EventBase* eventBase_;
  1107. unsigned int count_;
  1108. std::function<void()> action_;
  1109. };
  1110. // Test that EventBase::loop() doesn't exit while there are
  1111. // still LoopCallbacks remaining to be invoked.
  1112. TEST(EventBaseTest, RepeatedRunInLoop) {
  1113. EventBase eventBase;
  1114. CountedLoopCallback c(&eventBase, 10);
  1115. eventBase.runInLoop(&c);
  1116. // The callback shouldn't have run immediately
  1117. ASSERT_EQ(c.getCount(), 10);
  1118. eventBase.loop();
  1119. // loop() should loop until the CountedLoopCallback stops
  1120. // re-installing itself.
  1121. ASSERT_EQ(c.getCount(), 0);
  1122. }
  1123. // Test that EventBase::loop() works as expected without time measurements.
  1124. TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
  1125. EventBase eventBase(false);
  1126. CountedLoopCallback c(&eventBase, 10);
  1127. eventBase.runInLoop(&c);
  1128. // The callback shouldn't have run immediately
  1129. ASSERT_EQ(c.getCount(), 10);
  1130. eventBase.loop();
  1131. // loop() should loop until the CountedLoopCallback stops
  1132. // re-installing itself.
  1133. ASSERT_EQ(c.getCount(), 0);
  1134. }
  1135. // Test runInLoop() calls with terminateLoopSoon()
  1136. TEST(EventBaseTest, RunInLoopStopLoop) {
  1137. EventBase eventBase;
  1138. CountedLoopCallback c1(&eventBase, 20);
  1139. CountedLoopCallback c2(
  1140. &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
  1141. eventBase.runInLoop(&c1);
  1142. eventBase.runInLoop(&c2);
  1143. ASSERT_EQ(c1.getCount(), 20);
  1144. ASSERT_EQ(c2.getCount(), 10);
  1145. eventBase.loopForever();
  1146. // c2 should have stopped the loop after 10 iterations
  1147. ASSERT_EQ(c2.getCount(), 0);
  1148. // We allow the EventBase to run the loop callbacks in whatever order it
  1149. // chooses. We'll accept c1's count being either 10 (if the loop terminated
  1150. // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
  1151. // before c1 ran).
  1152. //
  1153. // (With the current code, c1 will always run 10 times, but we don't consider
  1154. // this a hard API requirement.)
  1155. ASSERT_GE(c1.getCount(), 10);
  1156. ASSERT_LE(c1.getCount(), 11);
  1157. }
  1158. TEST(EventBaseTest, messageAvailableException) {
  1159. auto deadManWalking = [] {
  1160. EventBase eventBase;
  1161. std::thread t([&] {
  1162. // Call this from another thread to force use of NotificationQueue in
  1163. // runInEventBaseThread
  1164. eventBase.runInEventBaseThread(
  1165. []() { throw std::runtime_error("boom"); });
  1166. });
  1167. t.join();
  1168. eventBase.loopForever();
  1169. };
  1170. EXPECT_DEATH(deadManWalking(), ".*");
  1171. }
  1172. TEST(EventBaseTest, TryRunningAfterTerminate) {
  1173. EventBase eventBase;
  1174. CountedLoopCallback c1(
  1175. &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
  1176. eventBase.runInLoop(&c1);
  1177. eventBase.loopForever();
  1178. bool ran = false;
  1179. eventBase.runInEventBaseThread([&]() { ran = true; });
  1180. ASSERT_FALSE(ran);
  1181. }
  1182. // Test cancelling runInLoop() callbacks
  1183. TEST(EventBaseTest, CancelRunInLoop) {
  1184. EventBase eventBase;
  1185. CountedLoopCallback c1(&eventBase, 20);
  1186. CountedLoopCallback c2(&eventBase, 20);
  1187. CountedLoopCallback c3(&eventBase, 20);
  1188. std::function<void()> cancelC1Action =
  1189. std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
  1190. std::function<void()> cancelC2Action =
  1191. std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
  1192. CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
  1193. CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
  1194. // Install cancelC1 after c1
  1195. eventBase.runInLoop(&c1);
  1196. eventBase.runInLoop(&cancelC1);
  1197. // Install cancelC2 before c2
  1198. eventBase.runInLoop(&cancelC2);
  1199. eventBase.runInLoop(&c2);
  1200. // Install c3
  1201. eventBase.runInLoop(&c3);
  1202. ASSERT_EQ(c1.getCount(), 20);
  1203. ASSERT_EQ(c2.getCount(), 20);
  1204. ASSERT_EQ(c3.getCount(), 20);
  1205. ASSERT_EQ(cancelC1.getCount(), 10);
  1206. ASSERT_EQ(cancelC2.getCount(), 10);
  1207. // Run the loop
  1208. eventBase.loop();
  1209. // cancelC1 and cancelC2 should have both fired after 10 iterations and
  1210. // stopped re-installing themselves
  1211. ASSERT_EQ(cancelC1.getCount(), 0);
  1212. ASSERT_EQ(cancelC2.getCount(), 0);
  1213. // c3 should have continued on for the full 20 iterations
  1214. ASSERT_EQ(c3.getCount(), 0);
  1215. // c1 and c2 should have both been cancelled on the 10th iteration.
  1216. //
  1217. // Callbacks are always run in the order they are installed,
  1218. // so c1 should have fired 10 times, and been canceled after it ran on the
  1219. // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
  1220. // have run before it on the 10th iteration, and cancelled it before it
  1221. // fired.
  1222. ASSERT_EQ(c1.getCount(), 10);
  1223. ASSERT_EQ(c2.getCount(), 11);
  1224. }
  1225. class TerminateTestCallback : public EventBase::LoopCallback,
  1226. public EventHandler {
  1227. public:
  1228. TerminateTestCallback(EventBase* eventBase, int fd)
  1229. : EventHandler(eventBase, fd),
  1230. eventBase_(eventBase),
  1231. loopInvocations_(0),
  1232. maxLoopInvocations_(0),
  1233. eventInvocations_(0),
  1234. maxEventInvocations_(0) {}
  1235. void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
  1236. loopInvocations_ = 0;
  1237. maxLoopInvocations_ = maxLoopInvocations;
  1238. eventInvocations_ = 0;
  1239. maxEventInvocations_ = maxEventInvocations;
  1240. cancelLoopCallback();
  1241. unregisterHandler();
  1242. }
  1243. void handlerReady(uint16_t /* events */) noexcept override {
  1244. // We didn't register with PERSIST, so we will have been automatically
  1245. // unregistered already.
  1246. ASSERT_FALSE(isHandlerRegistered());
  1247. ++eventInvocations_;
  1248. if (eventInvocations_ >= maxEventInvocations_) {
  1249. return;
  1250. }
  1251. eventBase_->runInLoop(this);
  1252. }
  1253. void runLoopCallback() noexcept override {
  1254. ++loopInvocations_;
  1255. if (loopInvocations_ >= maxLoopInvocations_) {
  1256. return;
  1257. }
  1258. registerHandler(READ);
  1259. }
  1260. uint32_t getLoopInvocations() const {
  1261. return loopInvocations_;
  1262. }
  1263. uint32_t getEventInvocations() const {
  1264. return eventInvocations_;
  1265. }
  1266. private:
  1267. EventBase* eventBase_;
  1268. uint32_t loopInvocations_;
  1269. uint32_t maxLoopInvocations_;
  1270. uint32_t eventInvocations_;
  1271. uint32_t maxEventInvocations_;
  1272. };
  1273. /**
  1274. * Test that EventBase::loop() correctly detects when there are no more events
  1275. * left to run.
  1276. *
  1277. * This uses a single callback, which alternates registering itself as a loop
  1278. * callback versus a EventHandler callback. This exercises a regression where
  1279. * EventBase::loop() incorrectly exited if there were no more fd handlers
  1280. * registered, but a loop callback installed a new fd handler.
  1281. */
  1282. TEST(EventBaseTest, LoopTermination) {
  1283. EventBase eventBase;
  1284. // Open a pipe and close the write end,
  1285. // so the read endpoint will be readable
  1286. int pipeFds[2];
  1287. int rc = pipe(pipeFds);
  1288. ASSERT_EQ(rc, 0);
  1289. close(pipeFds[1]);
  1290. TerminateTestCallback callback(&eventBase, pipeFds[0]);
  1291. // Test once where the callback will exit after a loop callback
  1292. callback.reset(10, 100);
  1293. eventBase.runInLoop(&callback);
  1294. eventBase.loop();
  1295. ASSERT_EQ(callback.getLoopInvocations(), 10);
  1296. ASSERT_EQ(callback.getEventInvocations(), 9);
  1297. // Test once where the callback will exit after an fd event callback
  1298. callback.reset(100, 7);
  1299. eventBase.runInLoop(&callback);
  1300. eventBase.loop();
  1301. ASSERT_EQ(callback.getLoopInvocations(), 7);
  1302. ASSERT_EQ(callback.getEventInvocations(), 7);
  1303. close(pipeFds[0]);
  1304. }
  1305. ///////////////////////////////////////////////////////////////////////////
  1306. // Tests for latency calculations
  1307. ///////////////////////////////////////////////////////////////////////////
  1308. class IdleTimeTimeoutSeries : public AsyncTimeout {
  1309. public:
  1310. explicit IdleTimeTimeoutSeries(
  1311. EventBase* base,
  1312. std::deque<std::size_t>& timeout)
  1313. : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
  1314. scheduleTimeout(1);
  1315. }
  1316. ~IdleTimeTimeoutSeries() override {}
  1317. void timeoutExpired() noexcept override {
  1318. ++timeouts_;
  1319. if (timeout_.empty()) {
  1320. cancelTimeout();
  1321. } else {
  1322. std::size_t sleepTime = timeout_.front();
  1323. timeout_.pop_front();
  1324. if (sleepTime) {
  1325. usleep(sleepTime);
  1326. }
  1327. scheduleTimeout(1);
  1328. }
  1329. }
  1330. int getTimeouts() const {
  1331. return timeouts_;
  1332. }
  1333. private:
  1334. int timeouts_;
  1335. std::deque<std::size_t>& timeout_;
  1336. };
  1337. /**
  1338. * Verify that idle time is correctly accounted for when decaying our loop
  1339. * time.
  1340. *
  1341. * This works by creating a high loop time (via usleep), expecting a latency
  1342. * callback with known value, and then scheduling a timeout for later. This
  1343. * later timeout is far enough in the future that the idle time should have
  1344. * caused the loop time to decay.
  1345. */
  1346. TEST(EventBaseTest, IdleTime) {
  1347. EventBase eventBase;
  1348. std::deque<std::size_t> timeouts0(4, 8080);
  1349. timeouts0.push_front(8000);
  1350. timeouts0.push_back(14000);
  1351. IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
  1352. std::deque<std::size_t> timeouts(20, 20);
  1353. std::unique_ptr<IdleTimeTimeoutSeries> tos;
  1354. bool hostOverloaded = false;
  1355. // Loop once before starting the main test. This will run NotificationQueue
  1356. // callbacks that get automatically installed when the EventBase is first
  1357. // created. We want to make sure they don't interfere with the timing
  1358. // operations below.
  1359. eventBase.loopOnce(EVLOOP_NONBLOCK);
  1360. eventBase.setLoadAvgMsec(1000ms);
  1361. eventBase.resetLoadAvg(5900.0);
  1362. auto testStart = std::chrono::steady_clock::now();
  1363. int latencyCallbacks = 0;
  1364. eventBase.setMaxLatency(6000us, [&]() {
  1365. ++latencyCallbacks;
  1366. if (latencyCallbacks != 1) {
  1367. FAIL() << "Unexpected latency callback";
  1368. }
  1369. if (tos0.getTimeouts() < 6) {
  1370. // This could only happen if the host this test is running
  1371. // on is heavily loaded.
  1372. int64_t usElapsed = duration_cast<microseconds>(
  1373. std::chrono::steady_clock::now() - testStart)
  1374. .count();
  1375. EXPECT_LE(43800, usElapsed);
  1376. hostOverloaded = true;
  1377. return;
  1378. }
  1379. EXPECT_EQ(6, tos0.getTimeouts());
  1380. EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
  1381. EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
  1382. tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
  1383. });
  1384. // Kick things off with an "immediate" timeout
  1385. tos0.scheduleTimeout(1);
  1386. eventBase.loop();
  1387. if (hostOverloaded) {
  1388. SKIP() << "host too heavily loaded to execute test";
  1389. }
  1390. ASSERT_EQ(1, latencyCallbacks);
  1391. ASSERT_EQ(7, tos0.getTimeouts());
  1392. ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
  1393. ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
  1394. ASSERT_TRUE(!!tos);
  1395. ASSERT_EQ(21, tos->getTimeouts());
  1396. }
  1397. /**
  1398. * Test that thisLoop functionality works with terminateLoopSoon
  1399. */
  1400. TEST(EventBaseTest, ThisLoop) {
  1401. EventBase eb;
  1402. bool runInLoop = false;
  1403. bool runThisLoop = false;
  1404. eb.runInLoop(
  1405. [&]() {
  1406. eb.terminateLoopSoon();
  1407. eb.runInLoop([&]() { runInLoop = true; });
  1408. eb.runInLoop([&]() { runThisLoop = true; }, true);
  1409. },
  1410. true);
  1411. eb.loopForever();
  1412. // Should not work
  1413. ASSERT_FALSE(runInLoop);
  1414. // Should work with thisLoop
  1415. ASSERT_TRUE(runThisLoop);
  1416. }
  1417. TEST(EventBaseTest, EventBaseThreadLoop) {
  1418. EventBase base;
  1419. bool ran = false;
  1420. base.runInEventBaseThread([&]() { ran = true; });
  1421. base.loop();
  1422. ASSERT_TRUE(ran);
  1423. }
  1424. TEST(EventBaseTest, EventBaseThreadName) {
  1425. EventBase base;
  1426. base.setName("foo");
  1427. base.loop();
  1428. #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
  1429. char name[16];
  1430. pthread_getname_np(pthread_self(), name, 16);
  1431. ASSERT_EQ(0, strcmp("foo", name));
  1432. #endif
  1433. }
  1434. TEST(EventBaseTest, RunBeforeLoop) {
  1435. EventBase base;
  1436. CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
  1437. base.runBeforeLoop(&cb);
  1438. base.loopForever();
  1439. ASSERT_EQ(cb.getCount(), 0);
  1440. }
  1441. TEST(EventBaseTest, RunBeforeLoopWait) {
  1442. EventBase base;
  1443. CountedLoopCallback cb(&base, 1);
  1444. base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
  1445. base.runBeforeLoop(&cb);
  1446. base.loopForever();
  1447. // Check that we only ran once, and did not loop multiple times.
  1448. ASSERT_EQ(cb.getCount(), 0);
  1449. }
  1450. class PipeHandler : public EventHandler {
  1451. public:
  1452. PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {}
  1453. void handlerReady(uint16_t /* events */) noexcept override {
  1454. abort();
  1455. }
  1456. };
  1457. TEST(EventBaseTest, StopBeforeLoop) {
  1458. EventBase evb;
  1459. // Give the evb something to do.
  1460. int p[2];
  1461. ASSERT_EQ(0, pipe(p));
  1462. PipeHandler handler(&evb, p[0]);
  1463. handler.registerHandler(EventHandler::READ);
  1464. // It's definitely not running yet
  1465. evb.terminateLoopSoon();
  1466. // let it run, it should exit quickly.
  1467. std::thread t([&] { evb.loop(); });
  1468. t.join();
  1469. handler.unregisterHandler();
  1470. close(p[0]);
  1471. close(p[1]);
  1472. SUCCEED();
  1473. }
  1474. TEST(EventBaseTest, RunCallbacksOnDestruction) {
  1475. bool ran = false;
  1476. {
  1477. EventBase base;
  1478. base.runInEventBaseThread([&]() { ran = true; });
  1479. }
  1480. ASSERT_TRUE(ran);
  1481. }
  1482. TEST(EventBaseTest, LoopKeepAlive) {
  1483. EventBase evb;
  1484. bool done = false;
  1485. std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
  1486. /* sleep override */ std::this_thread::sleep_for(
  1487. std::chrono::milliseconds(100));
  1488. evb.runInEventBaseThread(
  1489. [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
  1490. });
  1491. evb.loop();
  1492. ASSERT_TRUE(done);
  1493. t.join();
  1494. }
  1495. TEST(EventBaseTest, LoopKeepAliveInLoop) {
  1496. EventBase evb;
  1497. bool done = false;
  1498. std::thread t;
  1499. evb.runInEventBaseThread([&] {
  1500. t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
  1501. /* sleep override */ std::this_thread::sleep_for(
  1502. std::chrono::milliseconds(100));
  1503. evb.runInEventBaseThread(
  1504. [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
  1505. });
  1506. });
  1507. evb.loop();
  1508. ASSERT_TRUE(done);
  1509. t.join();
  1510. }
  1511. TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
  1512. std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
  1513. bool done = false;
  1514. std::thread evThread([&] {
  1515. evb->loopForever();
  1516. evb.reset();
  1517. done = true;
  1518. });
  1519. {
  1520. auto* ev = evb.get();
  1521. Executor::KeepAlive<EventBase> keepAlive;
  1522. ev->runInEventBaseThreadAndWait(
  1523. [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
  1524. ASSERT_FALSE(done) << "Loop finished before we asked it to";
  1525. ev->terminateLoopSoon();
  1526. /* sleep override */
  1527. std::this_thread::sleep_for(std::chrono::milliseconds(30));
  1528. ASSERT_FALSE(done) << "Loop terminated early";
  1529. ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
  1530. }
  1531. evThread.join();
  1532. ASSERT_TRUE(done);
  1533. }
  1534. TEST(EventBaseTest, LoopKeepAliveShutdown) {
  1535. auto evb = std::make_unique<EventBase>();
  1536. bool done = false;
  1537. std::thread t([&done,
  1538. loopKeepAlive = getKeepAliveToken(evb.get()),
  1539. evbPtr = evb.get()]() mutable {
  1540. /* sleep override */ std::this_thread::sleep_for(
  1541. std::chrono::milliseconds(100));
  1542. evbPtr->runInEventBaseThread(
  1543. [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
  1544. });
  1545. evb.reset();
  1546. ASSERT_TRUE(done);
  1547. t.join();
  1548. }
  1549. TEST(EventBaseTest, LoopKeepAliveAtomic) {
  1550. auto evb = std::make_unique<EventBase>();
  1551. static constexpr size_t kNumThreads = 100;
  1552. static constexpr size_t kNumTasks = 100;
  1553. std::vector<std::thread> ts;
  1554. std::vector<std::unique_ptr<Baton<>>> batons;
  1555. size_t done{0};
  1556. for (size_t i = 0; i < kNumThreads; ++i) {
  1557. batons.emplace_back(std::make_unique<Baton<>>());
  1558. }
  1559. for (size_t i = 0; i < kNumThreads; ++i) {
  1560. ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
  1561. std::vector<Executor::KeepAlive<EventBase>> keepAlives;
  1562. for (size_t j = 0; j < kNumTasks; ++j) {
  1563. keepAlives.emplace_back(getKeepAliveToken(evbPtr));
  1564. }
  1565. batonPtr->post();
  1566. /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
  1567. for (auto& keepAlive : keepAlives) {
  1568. evbPtr->runInEventBaseThread(
  1569. [&done, keepAlive = std::move(keepAlive)]() { ++done; });
  1570. }
  1571. });
  1572. }
  1573. for (auto& baton : batons) {
  1574. baton->wait();
  1575. }
  1576. evb.reset();
  1577. EXPECT_EQ(kNumThreads * kNumTasks, done);
  1578. for (auto& t : ts) {
  1579. t.join();
  1580. }
  1581. }
  1582. TEST(EventBaseTest, LoopKeepAliveCast) {
  1583. EventBase evb;
  1584. Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
  1585. }
  1586. TEST(EventBaseTest, DrivableExecutorTest) {
  1587. folly::Promise<bool> p;
  1588. auto f = p.getFuture();
  1589. EventBase base;
  1590. bool finished = false;
  1591. std::thread t([&] {
  1592. /* sleep override */
  1593. std::this_thread::sleep_for(std::chrono::microseconds(10));
  1594. finished = true;
  1595. base.runInEventBaseThread([&]() { p.setValue(true); });
  1596. });
  1597. // Ensure drive does not busy wait
  1598. base.drive(); // TODO: fix notification queue init() extra wakeup
  1599. base.drive();
  1600. EXPECT_TRUE(finished);
  1601. folly::Promise<bool> p2;
  1602. auto f2 = p2.getFuture();
  1603. // Ensure waitVia gets woken up properly, even from
  1604. // a separate thread.
  1605. base.runAfterDelay([&]() { p2.setValue(true); }, 10);
  1606. f2.waitVia(&base);
  1607. EXPECT_TRUE(f2.isReady());
  1608. t.join();
  1609. }
  1610. TEST(EventBaseTest, IOExecutorTest) {
  1611. EventBase base;
  1612. // Ensure EventBase manages itself as an IOExecutor.
  1613. EXPECT_EQ(base.getEventBase(), &base);
  1614. }
  1615. TEST(EventBaseTest, RequestContextTest) {
  1616. EventBase evb;
  1617. auto defaultCtx = RequestContext::get();
  1618. std::weak_ptr<RequestContext> rctx_weak_ptr;
  1619. {
  1620. RequestContextScopeGuard rctx;
  1621. rctx_weak_ptr = RequestContext::saveContext();
  1622. auto context = RequestContext::get();
  1623. EXPECT_NE(defaultCtx, context);
  1624. evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
  1625. evb.loop();
  1626. }
  1627. // Ensure that RequestContext created for the scope has been released and
  1628. // deleted.
  1629. EXPECT_EQ(rctx_weak_ptr.expired(), true);
  1630. EXPECT_EQ(defaultCtx, RequestContext::get());
  1631. }
  1632. TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
  1633. EventBase evb;
  1634. CountedLoopCallback c(&evb, 1);
  1635. auto defaultCtx = RequestContext::get();
  1636. EXPECT_EQ(defaultCtx, RequestContext::get());
  1637. std::weak_ptr<RequestContext> rctx_weak_ptr;
  1638. {
  1639. RequestContextScopeGuard rctx;
  1640. rctx_weak_ptr = RequestContext::saveContext();
  1641. auto context = RequestContext::get();
  1642. EXPECT_NE(defaultCtx, context);
  1643. evb.runInLoop(&c);
  1644. c.cancelLoopCallback();
  1645. }
  1646. // Ensure that RequestContext created for the scope has been released and
  1647. // deleted.
  1648. EXPECT_EQ(rctx_weak_ptr.expired(), true);
  1649. EXPECT_EQ(defaultCtx, RequestContext::get());
  1650. }