12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994 |
- /*
- * Copyright 2014-present Facebook, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- #include <folly/Memory.h>
- #include <folly/ScopeGuard.h>
- #include <folly/io/async/AsyncTimeout.h>
- #include <folly/io/async/EventBase.h>
- #include <folly/io/async/EventHandler.h>
- #include <folly/io/async/test/SocketPair.h>
- #include <folly/io/async/test/Util.h>
- #include <folly/portability/Unistd.h>
- #include <folly/futures/Promise.h>
- #include <atomic>
- #include <iostream>
- #include <memory>
- #include <thread>
- using std::atomic;
- using std::cerr;
- using std::deque;
- using std::endl;
- using std::make_pair;
- using std::pair;
- using std::thread;
- using std::unique_ptr;
- using std::vector;
- using std::chrono::duration_cast;
- using std::chrono::microseconds;
- using std::chrono::milliseconds;
- using namespace std::chrono_literals;
- using namespace folly;
- ///////////////////////////////////////////////////////////////////////////
- // Tests for read and write events
- ///////////////////////////////////////////////////////////////////////////
- enum { BUF_SIZE = 4096 };
- ssize_t writeToFD(int fd, size_t length) {
- // write an arbitrary amount of data to the fd
- auto bufv = vector<char>(length);
- auto buf = bufv.data();
- memset(buf, 'a', length);
- ssize_t rc = write(fd, buf, length);
- CHECK_EQ(rc, length);
- return rc;
- }
- size_t writeUntilFull(int fd) {
- // Write to the fd until EAGAIN is returned
- size_t bytesWritten = 0;
- char buf[BUF_SIZE];
- memset(buf, 'a', sizeof(buf));
- while (true) {
- ssize_t rc = write(fd, buf, sizeof(buf));
- if (rc < 0) {
- CHECK_EQ(errno, EAGAIN);
- break;
- } else {
- bytesWritten += rc;
- }
- }
- return bytesWritten;
- }
- ssize_t readFromFD(int fd, size_t length) {
- // write an arbitrary amount of data to the fd
- auto buf = vector<char>(length);
- return read(fd, buf.data(), length);
- }
- size_t readUntilEmpty(int fd) {
- // Read from the fd until EAGAIN is returned
- char buf[BUF_SIZE];
- size_t bytesRead = 0;
- while (true) {
- int rc = read(fd, buf, sizeof(buf));
- if (rc == 0) {
- CHECK(false) << "unexpected EOF";
- } else if (rc < 0) {
- CHECK_EQ(errno, EAGAIN);
- break;
- } else {
- bytesRead += rc;
- }
- }
- return bytesRead;
- }
- void checkReadUntilEmpty(int fd, size_t expectedLength) {
- ASSERT_EQ(readUntilEmpty(fd), expectedLength);
- }
- struct ScheduledEvent {
- int milliseconds;
- uint16_t events;
- size_t length;
- ssize_t result;
- void perform(int fd) {
- if (events & EventHandler::READ) {
- if (length == 0) {
- result = readUntilEmpty(fd);
- } else {
- result = readFromFD(fd, length);
- }
- }
- if (events & EventHandler::WRITE) {
- if (length == 0) {
- result = writeUntilFull(fd);
- } else {
- result = writeToFD(fd, length);
- }
- }
- }
- };
- void scheduleEvents(EventBase* eventBase, int fd, ScheduledEvent* events) {
- for (ScheduledEvent* ev = events; ev->milliseconds > 0; ++ev) {
- eventBase->tryRunAfterDelay(
- std::bind(&ScheduledEvent::perform, ev, fd), ev->milliseconds);
- }
- }
- class TestHandler : public EventHandler {
- public:
- TestHandler(EventBase* eventBase, int fd)
- : EventHandler(eventBase, fd), fd_(fd) {}
- void handlerReady(uint16_t events) noexcept override {
- ssize_t bytesRead = 0;
- ssize_t bytesWritten = 0;
- if (events & READ) {
- // Read all available data, so EventBase will stop calling us
- // until new data becomes available
- bytesRead = readUntilEmpty(fd_);
- }
- if (events & WRITE) {
- // Write until the pipe buffer is full, so EventBase will stop calling
- // us until the other end has read some data
- bytesWritten = writeUntilFull(fd_);
- }
- log.emplace_back(events, bytesRead, bytesWritten);
- }
- struct EventRecord {
- EventRecord(uint16_t events_, size_t bytesRead_, size_t bytesWritten_)
- : events(events_),
- timestamp(),
- bytesRead(bytesRead_),
- bytesWritten(bytesWritten_) {}
- uint16_t events;
- TimePoint timestamp;
- ssize_t bytesRead;
- ssize_t bytesWritten;
- };
- deque<EventRecord> log;
- private:
- int fd_;
- };
- /**
- * Test a READ event
- */
- TEST(EventBaseTest, ReadEvent) {
- EventBase eb;
- SocketPair sp;
- // Register for read events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ);
- // Register timeouts to perform two write events
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, 2345, 0},
- {160, EventHandler::WRITE, 99, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Since we didn't use the EventHandler::PERSIST flag, the handler should
- // have received the first read, then unregistered itself. Check that only
- // the first chunk of data was received.
- ASSERT_EQ(handler.log.size(), 1);
- ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start,
- handler.log[0].timestamp,
- milliseconds(events[0].milliseconds),
- milliseconds(90));
- ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
- ASSERT_EQ(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(
- start, end, milliseconds(events[1].milliseconds), milliseconds(30));
- // Make sure the second chunk of data is still waiting to be read.
- size_t bytesRemaining = readUntilEmpty(sp[0]);
- ASSERT_EQ(bytesRemaining, events[1].length);
- }
- /**
- * Test (READ | PERSIST)
- */
- TEST(EventBaseTest, ReadPersist) {
- EventBase eb;
- SocketPair sp;
- // Register for read events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- // Register several timeouts to perform writes
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, 1024, 0},
- {20, EventHandler::WRITE, 2211, 0},
- {30, EventHandler::WRITE, 4096, 0},
- {100, EventHandler::WRITE, 100, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler after the third write
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // The handler should have received the first 3 events,
- // then been unregistered after that.
- ASSERT_EQ(handler.log.size(), 3);
- for (int n = 0; n < 3; ++n) {
- ASSERT_EQ(handler.log[n].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
- ASSERT_EQ(handler.log[n].bytesRead, events[n].length);
- ASSERT_EQ(handler.log[n].bytesWritten, 0);
- }
- T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
- // Make sure the data from the last write is still waiting to be read
- size_t bytesRemaining = readUntilEmpty(sp[0]);
- ASSERT_EQ(bytesRemaining, events[3].length);
- }
- /**
- * Test registering for READ when the socket is immediately readable
- */
- TEST(EventBaseTest, ReadImmediate) {
- EventBase eb;
- SocketPair sp;
- // Write some data to the socket so the other end will
- // be immediately readable
- size_t dataLength = 1234;
- writeToFD(sp[1], dataLength);
- // Register for read events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- // Register a timeout to perform another write
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, 2345, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 20);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- ASSERT_EQ(handler.log.size(), 2);
- // There should have been 1 event for immediate readability
- ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
- ASSERT_EQ(handler.log[0].bytesRead, dataLength);
- ASSERT_EQ(handler.log[0].bytesWritten, 0);
- // There should be another event after the timeout wrote more data
- ASSERT_EQ(handler.log[1].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[1].bytesRead, events[0].length);
- ASSERT_EQ(handler.log[1].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, milliseconds(20));
- }
- /**
- * Test a WRITE event
- */
- TEST(EventBaseTest, WriteEvent) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t initialBytesWritten = writeUntilFull(sp[0]);
- // Register for write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::WRITE);
- // Register timeouts to perform two reads
- ScheduledEvent events[] = {
- {10, EventHandler::READ, 0, 0},
- {60, EventHandler::READ, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Since we didn't use the EventHandler::PERSIST flag, the handler should
- // have only been able to write once, then unregistered itself.
- ASSERT_EQ(handler.log.size(), 1);
- ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[0].bytesRead, 0);
- ASSERT_GT(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
- ASSERT_EQ(events[0].result, initialBytesWritten);
- ASSERT_EQ(events[1].result, handler.log[0].bytesWritten);
- }
- /**
- * Test (WRITE | PERSIST)
- */
- TEST(EventBaseTest, WritePersist) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t initialBytesWritten = writeUntilFull(sp[0]);
- // Register for write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
- // Register several timeouts to read from the socket at several intervals
- ScheduledEvent events[] = {
- {10, EventHandler::READ, 0, 0},
- {40, EventHandler::READ, 0, 0},
- {70, EventHandler::READ, 0, 0},
- {100, EventHandler::READ, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler after the third read
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 85);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // The handler should have received the first 3 events,
- // then been unregistered after that.
- ASSERT_EQ(handler.log.size(), 3);
- ASSERT_EQ(events[0].result, initialBytesWritten);
- for (int n = 0; n < 3; ++n) {
- ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[n].timestamp, milliseconds(events[n].milliseconds));
- ASSERT_EQ(handler.log[n].bytesRead, 0);
- ASSERT_GT(handler.log[n].bytesWritten, 0);
- ASSERT_EQ(handler.log[n].bytesWritten, events[n + 1].result);
- }
- T_CHECK_TIMEOUT(start, end, milliseconds(events[3].milliseconds));
- }
- /**
- * Test registering for WRITE when the socket is immediately writable
- */
- TEST(EventBaseTest, WriteImmediate) {
- EventBase eb;
- SocketPair sp;
- // Register for write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
- // Register a timeout to perform a read
- ScheduledEvent events[] = {
- {10, EventHandler::READ, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler
- int64_t unregisterTimeout = 40;
- eb.tryRunAfterDelay(
- std::bind(&TestHandler::unregisterHandler, &handler), unregisterTimeout);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- ASSERT_EQ(handler.log.size(), 2);
- // Since the socket buffer was initially empty,
- // there should have been 1 event for immediate writability
- ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
- ASSERT_EQ(handler.log[0].bytesRead, 0);
- ASSERT_GT(handler.log[0].bytesWritten, 0);
- // There should be another event after the timeout wrote more data
- ASSERT_EQ(handler.log[1].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[1].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[1].bytesRead, 0);
- ASSERT_GT(handler.log[1].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, milliseconds(unregisterTimeout));
- }
- /**
- * Test (READ | WRITE) when the socket becomes readable first
- */
- TEST(EventBaseTest, ReadWrite) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t sock0WriteLength = writeUntilFull(sp[0]);
- // Register for read and write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ_WRITE);
- // Register timeouts to perform a write then a read.
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, 2345, 0},
- {40, EventHandler::READ, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Since we didn't use the EventHandler::PERSIST flag, the handler should
- // have only noticed readability, then unregistered itself. Check that only
- // one event was logged.
- ASSERT_EQ(handler.log.size(), 1);
- ASSERT_EQ(handler.log[0].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[0].bytesRead, events[0].length);
- ASSERT_EQ(handler.log[0].bytesWritten, 0);
- ASSERT_EQ(events[1].result, sock0WriteLength);
- T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
- }
- /**
- * Test (READ | WRITE) when the socket becomes writable first
- */
- TEST(EventBaseTest, WriteRead) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t sock0WriteLength = writeUntilFull(sp[0]);
- // Register for read and write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ_WRITE);
- // Register timeouts to perform a read then a write.
- size_t sock1WriteLength = 2345;
- ScheduledEvent events[] = {
- {10, EventHandler::READ, 0, 0},
- {40, EventHandler::WRITE, sock1WriteLength, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Since we didn't use the EventHandler::PERSIST flag, the handler should
- // have only noticed writability, then unregistered itself. Check that only
- // one event was logged.
- ASSERT_EQ(handler.log.size(), 1);
- ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[0].bytesRead, 0);
- ASSERT_GT(handler.log[0].bytesWritten, 0);
- ASSERT_EQ(events[0].result, sock0WriteLength);
- ASSERT_EQ(events[1].result, sock1WriteLength);
- T_CHECK_TIMEOUT(start, end, milliseconds(events[1].milliseconds));
- // Make sure the written data is still waiting to be read.
- size_t bytesRemaining = readUntilEmpty(sp[0]);
- ASSERT_EQ(bytesRemaining, events[1].length);
- }
- /**
- * Test (READ | WRITE) when the socket becomes readable and writable
- * at the same time.
- */
- TEST(EventBaseTest, ReadWriteSimultaneous) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t sock0WriteLength = writeUntilFull(sp[0]);
- // Register for read and write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(EventHandler::READ_WRITE);
- // Register a timeout to perform a read and write together
- ScheduledEvent events[] = {
- {10, EventHandler::READ | EventHandler::WRITE, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // It's not strictly required that the EventBase register us about both
- // events in the same call. So, it's possible that if the EventBase
- // implementation changes this test could start failing, and it wouldn't be
- // considered breaking the API. However for now it's nice to exercise this
- // code path.
- ASSERT_EQ(handler.log.size(), 1);
- ASSERT_EQ(handler.log[0].events, EventHandler::READ | EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[0].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[0].bytesRead, sock0WriteLength);
- ASSERT_GT(handler.log[0].bytesWritten, 0);
- T_CHECK_TIMEOUT(start, end, milliseconds(events[0].milliseconds));
- }
- /**
- * Test (READ | WRITE | PERSIST)
- */
- TEST(EventBaseTest, ReadWritePersist) {
- EventBase eb;
- SocketPair sp;
- // Register for read and write events
- TestHandler handler(&eb, sp[0]);
- handler.registerHandler(
- EventHandler::READ | EventHandler::WRITE | EventHandler::PERSIST);
- // Register timeouts to perform several reads and writes
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, 2345, 0},
- {20, EventHandler::READ, 0, 0},
- {35, EventHandler::WRITE, 200, 0},
- {45, EventHandler::WRITE, 15, 0},
- {55, EventHandler::READ, 0, 0},
- {120, EventHandler::WRITE, 2345, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 80);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- ASSERT_EQ(handler.log.size(), 6);
- // Since we didn't fill up the write buffer immediately, there should
- // be an immediate event for writability.
- ASSERT_EQ(handler.log[0].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(start, handler.log[0].timestamp, milliseconds(0));
- ASSERT_EQ(handler.log[0].bytesRead, 0);
- ASSERT_GT(handler.log[0].bytesWritten, 0);
- // Events 1 through 5 should correspond to the scheduled events
- for (int n = 1; n < 6; ++n) {
- ScheduledEvent* event = &events[n - 1];
- T_CHECK_TIMEOUT(
- start, handler.log[n].timestamp, milliseconds(event->milliseconds));
- if (event->events == EventHandler::READ) {
- ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
- ASSERT_EQ(handler.log[n].bytesRead, 0);
- ASSERT_GT(handler.log[n].bytesWritten, 0);
- } else {
- ASSERT_EQ(handler.log[n].events, EventHandler::READ);
- ASSERT_EQ(handler.log[n].bytesRead, event->length);
- ASSERT_EQ(handler.log[n].bytesWritten, 0);
- }
- }
- // The timeout should have unregistered the handler before the last write.
- // Make sure that data is still waiting to be read
- size_t bytesRemaining = readUntilEmpty(sp[0]);
- ASSERT_EQ(bytesRemaining, events[5].length);
- }
- class PartialReadHandler : public TestHandler {
- public:
- PartialReadHandler(EventBase* eventBase, int fd, size_t readLength)
- : TestHandler(eventBase, fd), fd_(fd), readLength_(readLength) {}
- void handlerReady(uint16_t events) noexcept override {
- assert(events == EventHandler::READ);
- ssize_t bytesRead = readFromFD(fd_, readLength_);
- log.emplace_back(events, bytesRead, 0);
- }
- private:
- int fd_;
- size_t readLength_;
- };
- /**
- * Test reading only part of the available data when a read event is fired.
- * When PERSIST is used, make sure the handler gets notified again the next
- * time around the loop.
- */
- TEST(EventBaseTest, ReadPartial) {
- EventBase eb;
- SocketPair sp;
- // Register for read events
- size_t readLength = 100;
- PartialReadHandler handler(&eb, sp[0], readLength);
- handler.registerHandler(EventHandler::READ | EventHandler::PERSIST);
- // Register a timeout to perform a single write,
- // with more data than PartialReadHandler will read at once
- ScheduledEvent events[] = {
- {10, EventHandler::WRITE, (3 * readLength) + (readLength / 2), 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- ASSERT_EQ(handler.log.size(), 4);
- // The first 3 invocations should read readLength bytes each
- for (int n = 0; n < 3; ++n) {
- ASSERT_EQ(handler.log[n].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[n].bytesRead, readLength);
- ASSERT_EQ(handler.log[n].bytesWritten, 0);
- }
- // The last read only has readLength/2 bytes
- ASSERT_EQ(handler.log[3].events, EventHandler::READ);
- T_CHECK_TIMEOUT(
- start, handler.log[3].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[3].bytesRead, readLength / 2);
- ASSERT_EQ(handler.log[3].bytesWritten, 0);
- }
- class PartialWriteHandler : public TestHandler {
- public:
- PartialWriteHandler(EventBase* eventBase, int fd, size_t writeLength)
- : TestHandler(eventBase, fd), fd_(fd), writeLength_(writeLength) {}
- void handlerReady(uint16_t events) noexcept override {
- assert(events == EventHandler::WRITE);
- ssize_t bytesWritten = writeToFD(fd_, writeLength_);
- log.emplace_back(events, 0, bytesWritten);
- }
- private:
- int fd_;
- size_t writeLength_;
- };
- /**
- * Test writing without completely filling up the write buffer when the fd
- * becomes writable. When PERSIST is used, make sure the handler gets
- * notified again the next time around the loop.
- */
- TEST(EventBaseTest, WritePartial) {
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t initialBytesWritten = writeUntilFull(sp[0]);
- // Register for write events
- size_t writeLength = 100;
- PartialWriteHandler handler(&eb, sp[0], writeLength);
- handler.registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
- // Register a timeout to read, so that more data can be written
- ScheduledEvent events[] = {
- {10, EventHandler::READ, 0, 0},
- {0, 0, 0, 0},
- };
- scheduleEvents(&eb, sp[1], events);
- // Schedule a timeout to unregister the handler
- eb.tryRunAfterDelay(std::bind(&TestHandler::unregisterHandler, &handler), 30);
- // Loop
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Depending on how big the socket buffer is, there will be multiple writes
- // Only check the first 5
- int numChecked = 5;
- ASSERT_GE(handler.log.size(), numChecked);
- ASSERT_EQ(events[0].result, initialBytesWritten);
- // The first 3 invocations should read writeLength bytes each
- for (int n = 0; n < numChecked; ++n) {
- ASSERT_EQ(handler.log[n].events, EventHandler::WRITE);
- T_CHECK_TIMEOUT(
- start, handler.log[n].timestamp, milliseconds(events[0].milliseconds));
- ASSERT_EQ(handler.log[n].bytesRead, 0);
- ASSERT_EQ(handler.log[n].bytesWritten, writeLength);
- }
- }
- /**
- * Test destroying a registered EventHandler
- */
- TEST(EventBaseTest, DestroyHandler) {
- class DestroyHandler : public AsyncTimeout {
- public:
- DestroyHandler(EventBase* eb, EventHandler* h)
- : AsyncTimeout(eb), handler_(h) {}
- void timeoutExpired() noexcept override {
- delete handler_;
- }
- private:
- EventHandler* handler_;
- };
- EventBase eb;
- SocketPair sp;
- // Fill up the write buffer before starting
- size_t initialBytesWritten = writeUntilFull(sp[0]);
- // Register for write events
- TestHandler* handler = new TestHandler(&eb, sp[0]);
- handler->registerHandler(EventHandler::WRITE | EventHandler::PERSIST);
- // After 10ms, read some data, so that the handler
- // will be notified that it can write.
- eb.tryRunAfterDelay(
- std::bind(checkReadUntilEmpty, sp[1], initialBytesWritten), 10);
- // Start a timer to destroy the handler after 25ms
- // This mainly just makes sure the code doesn't break or assert
- DestroyHandler dh(&eb, handler);
- dh.scheduleTimeout(25);
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Make sure the EventHandler was uninstalled properly when it was
- // destroyed, and the EventBase loop exited
- T_CHECK_TIMEOUT(start, end, milliseconds(25));
- // Make sure that the handler wrote data to the socket
- // before it was destroyed
- size_t bytesRemaining = readUntilEmpty(sp[1]);
- ASSERT_GT(bytesRemaining, 0);
- }
- ///////////////////////////////////////////////////////////////////////////
- // Tests for timeout events
- ///////////////////////////////////////////////////////////////////////////
- TEST(EventBaseTest, RunAfterDelay) {
- EventBase eb;
- TimePoint timestamp1(false);
- TimePoint timestamp2(false);
- TimePoint timestamp3(false);
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 40);
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
- T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
- T_CHECK_TIMEOUT(start, timestamp3, milliseconds(40));
- T_CHECK_TIMEOUT(start, end, milliseconds(40));
- }
- /**
- * Test the behavior of tryRunAfterDelay() when some timeouts are
- * still scheduled when the EventBase is destroyed.
- */
- TEST(EventBaseTest, RunAfterDelayDestruction) {
- TimePoint timestamp1(false);
- TimePoint timestamp2(false);
- TimePoint timestamp3(false);
- TimePoint timestamp4(false);
- TimePoint start(false);
- TimePoint end(false);
- {
- EventBase eb;
- // Run two normal timeouts
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp1), 10);
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp2), 20);
- // Schedule a timeout to stop the event loop after 40ms
- eb.tryRunAfterDelay(std::bind(&EventBase::terminateLoopSoon, &eb), 40);
- // Schedule 2 timeouts that would fire after the event loop stops
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp3), 80);
- eb.tryRunAfterDelay(std::bind(&TimePoint::reset, ×tamp4), 160);
- start.reset();
- eb.loop();
- end.reset();
- }
- T_CHECK_TIMEOUT(start, timestamp1, milliseconds(10));
- T_CHECK_TIMEOUT(start, timestamp2, milliseconds(20));
- T_CHECK_TIMEOUT(start, end, milliseconds(40));
- ASSERT_TRUE(timestamp3.isUnset());
- ASSERT_TRUE(timestamp4.isUnset());
- // Ideally this test should be run under valgrind to ensure that no
- // memory is leaked.
- }
- class TestTimeout : public AsyncTimeout {
- public:
- explicit TestTimeout(EventBase* eventBase)
- : AsyncTimeout(eventBase), timestamp(false) {}
- void timeoutExpired() noexcept override {
- timestamp.reset();
- }
- TimePoint timestamp;
- };
- TEST(EventBaseTest, BasicTimeouts) {
- EventBase eb;
- TestTimeout t1(&eb);
- TestTimeout t2(&eb);
- TestTimeout t3(&eb);
- t1.scheduleTimeout(10);
- t2.scheduleTimeout(20);
- t3.scheduleTimeout(40);
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(10));
- T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
- T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(40));
- T_CHECK_TIMEOUT(start, end, milliseconds(40));
- }
- class ReschedulingTimeout : public AsyncTimeout {
- public:
- ReschedulingTimeout(EventBase* evb, const vector<uint32_t>& timeouts)
- : AsyncTimeout(evb), timeouts_(timeouts), iterator_(timeouts_.begin()) {}
- void start() {
- reschedule();
- }
- void timeoutExpired() noexcept override {
- timestamps.emplace_back();
- reschedule();
- }
- void reschedule() {
- if (iterator_ != timeouts_.end()) {
- uint32_t timeout = *iterator_;
- ++iterator_;
- scheduleTimeout(timeout);
- }
- }
- vector<TimePoint> timestamps;
- private:
- vector<uint32_t> timeouts_;
- vector<uint32_t>::const_iterator iterator_;
- };
- /**
- * Test rescheduling the same timeout multiple times
- */
- TEST(EventBaseTest, ReuseTimeout) {
- EventBase eb;
- vector<uint32_t> timeouts;
- timeouts.push_back(10);
- timeouts.push_back(30);
- timeouts.push_back(15);
- ReschedulingTimeout t(&eb, timeouts);
- t.start();
- TimePoint start;
- eb.loop();
- TimePoint end;
- // Use a higher tolerance than usual. We're waiting on 3 timeouts
- // consecutively. In general, each timeout may go over by a few
- // milliseconds, and we're tripling this error by witing on 3 timeouts.
- milliseconds tolerance{6};
- ASSERT_EQ(timeouts.size(), t.timestamps.size());
- uint32_t total = 0;
- for (size_t n = 0; n < timeouts.size(); ++n) {
- total += timeouts[n];
- T_CHECK_TIMEOUT(start, t.timestamps[n], milliseconds(total), tolerance);
- }
- T_CHECK_TIMEOUT(start, end, milliseconds(total), tolerance);
- }
- /**
- * Test rescheduling a timeout before it has fired
- */
- TEST(EventBaseTest, RescheduleTimeout) {
- EventBase eb;
- TestTimeout t1(&eb);
- TestTimeout t2(&eb);
- TestTimeout t3(&eb);
- t1.scheduleTimeout(15);
- t2.scheduleTimeout(30);
- t3.scheduleTimeout(30);
- auto f = static_cast<bool (AsyncTimeout::*)(uint32_t)>(
- &AsyncTimeout::scheduleTimeout);
- // after 10ms, reschedule t2 to run sooner than originally scheduled
- eb.tryRunAfterDelay(std::bind(f, &t2, 10), 10);
- // after 10ms, reschedule t3 to run later than originally scheduled
- eb.tryRunAfterDelay(std::bind(f, &t3, 40), 10);
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIMEOUT(start, t1.timestamp, milliseconds(15));
- T_CHECK_TIMEOUT(start, t2.timestamp, milliseconds(20));
- T_CHECK_TIMEOUT(start, t3.timestamp, milliseconds(50));
- T_CHECK_TIMEOUT(start, end, milliseconds(50));
- }
- /**
- * Test cancelling a timeout
- */
- TEST(EventBaseTest, CancelTimeout) {
- EventBase eb;
- vector<uint32_t> timeouts;
- timeouts.push_back(10);
- timeouts.push_back(30);
- timeouts.push_back(25);
- ReschedulingTimeout t(&eb, timeouts);
- t.start();
- eb.tryRunAfterDelay(std::bind(&AsyncTimeout::cancelTimeout, &t), 50);
- TimePoint start;
- eb.loop();
- TimePoint end;
- ASSERT_EQ(t.timestamps.size(), 2);
- T_CHECK_TIMEOUT(start, t.timestamps[0], milliseconds(10));
- T_CHECK_TIMEOUT(start, t.timestamps[1], milliseconds(40));
- T_CHECK_TIMEOUT(start, end, milliseconds(50));
- }
- /**
- * Test destroying a scheduled timeout object
- */
- TEST(EventBaseTest, DestroyTimeout) {
- class DestroyTimeout : public AsyncTimeout {
- public:
- DestroyTimeout(EventBase* eb, AsyncTimeout* t)
- : AsyncTimeout(eb), timeout_(t) {}
- void timeoutExpired() noexcept override {
- delete timeout_;
- }
- private:
- AsyncTimeout* timeout_;
- };
- EventBase eb;
- TestTimeout* t1 = new TestTimeout(&eb);
- t1->scheduleTimeout(30);
- DestroyTimeout dt(&eb, t1);
- dt.scheduleTimeout(10);
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIMEOUT(start, end, milliseconds(10));
- }
- /**
- * Test the scheduled executor impl
- */
- TEST(EventBaseTest, ScheduledFn) {
- EventBase eb;
- TimePoint timestamp1(false);
- TimePoint timestamp2(false);
- TimePoint timestamp3(false);
- eb.schedule(std::bind(&TimePoint::reset, ×tamp1), milliseconds(9));
- eb.schedule(std::bind(&TimePoint::reset, ×tamp2), milliseconds(19));
- eb.schedule(std::bind(&TimePoint::reset, ×tamp3), milliseconds(39));
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
- T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
- T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
- T_CHECK_TIMEOUT(start, end, milliseconds(39));
- }
- TEST(EventBaseTest, ScheduledFnAt) {
- EventBase eb;
- TimePoint timestamp0(false);
- TimePoint timestamp1(false);
- TimePoint timestamp2(false);
- TimePoint timestamp3(false);
- eb.scheduleAt(
- std::bind(&TimePoint::reset, ×tamp1), eb.now() - milliseconds(5));
- eb.scheduleAt(
- std::bind(&TimePoint::reset, ×tamp1), eb.now() + milliseconds(9));
- eb.scheduleAt(
- std::bind(&TimePoint::reset, ×tamp2), eb.now() + milliseconds(19));
- eb.scheduleAt(
- std::bind(&TimePoint::reset, ×tamp3), eb.now() + milliseconds(39));
- TimePoint start;
- eb.loop();
- TimePoint end;
- T_CHECK_TIME_LT(start, timestamp0, milliseconds(0));
- T_CHECK_TIMEOUT(start, timestamp1, milliseconds(9));
- T_CHECK_TIMEOUT(start, timestamp2, milliseconds(19));
- T_CHECK_TIMEOUT(start, timestamp3, milliseconds(39));
- T_CHECK_TIMEOUT(start, end, milliseconds(39));
- }
- ///////////////////////////////////////////////////////////////////////////
- // Test for runInThreadTestFunc()
- ///////////////////////////////////////////////////////////////////////////
- struct RunInThreadData {
- RunInThreadData(int numThreads, int opsPerThread_)
- : opsPerThread(opsPerThread_), opsToGo(numThreads * opsPerThread) {}
- EventBase evb;
- deque<pair<int, int>> values;
- int opsPerThread;
- int opsToGo;
- };
- struct RunInThreadArg {
- RunInThreadArg(RunInThreadData* data_, int threadId, int value_)
- : data(data_), thread(threadId), value(value_) {}
- RunInThreadData* data;
- int thread;
- int value;
- };
- void runInThreadTestFunc(RunInThreadArg* arg) {
- arg->data->values.emplace_back(arg->thread, arg->value);
- RunInThreadData* data = arg->data;
- delete arg;
- if (--data->opsToGo == 0) {
- // Break out of the event base loop if we are the last thread running
- data->evb.terminateLoopSoon();
- }
- }
- TEST(EventBaseTest, RunInThread) {
- constexpr uint32_t numThreads = 50;
- constexpr uint32_t opsPerThread = 100;
- RunInThreadData data(numThreads, opsPerThread);
- deque<std::thread> threads;
- SCOPE_EXIT {
- // Wait on all of the threads.
- for (auto& thread : threads) {
- thread.join();
- }
- };
- for (uint32_t i = 0; i < numThreads; ++i) {
- threads.emplace_back([i, &data] {
- for (int n = 0; n < data.opsPerThread; ++n) {
- RunInThreadArg* arg = new RunInThreadArg(&data, i, n);
- data.evb.runInEventBaseThread(runInThreadTestFunc, arg);
- usleep(10);
- }
- });
- }
- // Add a timeout event to run after 3 seconds.
- // Otherwise loop() will return immediately since there are no events to run.
- // Once the last thread exits, it will stop the loop(). However, this
- // timeout also stops the loop in case there is a bug performing the normal
- // stop.
- data.evb.tryRunAfterDelay(
- std::bind(&EventBase::terminateLoopSoon, &data.evb), 3000);
- TimePoint start;
- data.evb.loop();
- TimePoint end;
- // Verify that the loop exited because all threads finished and requested it
- // to stop. This should happen much sooner than the 3 second timeout.
- // Assert that it happens in under a second. (This is still tons of extra
- // padding.)
- auto timeTaken =
- std::chrono::duration_cast<milliseconds>(end.getTime() - start.getTime());
- ASSERT_LT(timeTaken.count(), 1000);
- VLOG(11) << "Time taken: " << timeTaken.count();
- // Verify that we have all of the events from every thread
- int expectedValues[numThreads];
- for (uint32_t n = 0; n < numThreads; ++n) {
- expectedValues[n] = 0;
- }
- for (deque<pair<int, int>>::const_iterator it = data.values.begin();
- it != data.values.end();
- ++it) {
- int threadID = it->first;
- int value = it->second;
- ASSERT_EQ(expectedValues[threadID], value);
- ++expectedValues[threadID];
- }
- for (uint32_t n = 0; n < numThreads; ++n) {
- ASSERT_EQ(expectedValues[n], opsPerThread);
- }
- }
- // This test simulates some calls, and verifies that the waiting happens by
- // triggering what otherwise would be race conditions, and trying to detect
- // whether any of the race conditions happened.
- TEST(EventBaseTest, RunInEventBaseThreadAndWait) {
- const size_t c = 256;
- vector<unique_ptr<atomic<size_t>>> atoms(c);
- for (size_t i = 0; i < c; ++i) {
- auto& atom = atoms.at(i);
- atom = std::make_unique<atomic<size_t>>(0);
- }
- vector<thread> threads;
- for (size_t i = 0; i < c; ++i) {
- threads.emplace_back([&atoms, i] {
- EventBase eb;
- auto& atom = *atoms.at(i);
- auto ebth = thread([&] { eb.loopForever(); });
- eb.waitUntilRunning();
- eb.runInEventBaseThreadAndWait([&] {
- size_t x = 0;
- atom.compare_exchange_weak(
- x, 1, std::memory_order_release, std::memory_order_relaxed);
- });
- size_t x = 0;
- atom.compare_exchange_weak(
- x, 2, std::memory_order_release, std::memory_order_relaxed);
- eb.terminateLoopSoon();
- ebth.join();
- });
- }
- for (size_t i = 0; i < c; ++i) {
- auto& th = threads.at(i);
- th.join();
- }
- size_t sum = 0;
- for (auto& atom : atoms) {
- sum += *atom;
- }
- EXPECT_EQ(c, sum);
- }
- TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitCross) {
- EventBase eb;
- thread th(&EventBase::loopForever, &eb);
- SCOPE_EXIT {
- eb.terminateLoopSoon();
- th.join();
- };
- auto mutated = false;
- eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
- EXPECT_TRUE(mutated);
- }
- TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadAndWaitWithin) {
- EventBase eb;
- thread th(&EventBase::loopForever, &eb);
- SCOPE_EXIT {
- eb.terminateLoopSoon();
- th.join();
- };
- eb.runInEventBaseThreadAndWait([&] {
- auto mutated = false;
- eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
- EXPECT_TRUE(mutated);
- });
- }
- TEST(EventBaseTest, RunImmediatelyOrRunInEventBaseThreadNotLooping) {
- EventBase eb;
- auto mutated = false;
- eb.runImmediatelyOrRunInEventBaseThreadAndWait([&] { mutated = true; });
- EXPECT_TRUE(mutated);
- }
- ///////////////////////////////////////////////////////////////////////////
- // Tests for runInLoop()
- ///////////////////////////////////////////////////////////////////////////
- class CountedLoopCallback : public EventBase::LoopCallback {
- public:
- CountedLoopCallback(
- EventBase* eventBase,
- unsigned int count,
- std::function<void()> action = std::function<void()>())
- : eventBase_(eventBase), count_(count), action_(action) {}
- void runLoopCallback() noexcept override {
- --count_;
- if (count_ > 0) {
- eventBase_->runInLoop(this);
- } else if (action_) {
- action_();
- }
- }
- unsigned int getCount() const {
- return count_;
- }
- private:
- EventBase* eventBase_;
- unsigned int count_;
- std::function<void()> action_;
- };
- // Test that EventBase::loop() doesn't exit while there are
- // still LoopCallbacks remaining to be invoked.
- TEST(EventBaseTest, RepeatedRunInLoop) {
- EventBase eventBase;
- CountedLoopCallback c(&eventBase, 10);
- eventBase.runInLoop(&c);
- // The callback shouldn't have run immediately
- ASSERT_EQ(c.getCount(), 10);
- eventBase.loop();
- // loop() should loop until the CountedLoopCallback stops
- // re-installing itself.
- ASSERT_EQ(c.getCount(), 0);
- }
- // Test that EventBase::loop() works as expected without time measurements.
- TEST(EventBaseTest, RunInLoopNoTimeMeasurement) {
- EventBase eventBase(false);
- CountedLoopCallback c(&eventBase, 10);
- eventBase.runInLoop(&c);
- // The callback shouldn't have run immediately
- ASSERT_EQ(c.getCount(), 10);
- eventBase.loop();
- // loop() should loop until the CountedLoopCallback stops
- // re-installing itself.
- ASSERT_EQ(c.getCount(), 0);
- }
- // Test runInLoop() calls with terminateLoopSoon()
- TEST(EventBaseTest, RunInLoopStopLoop) {
- EventBase eventBase;
- CountedLoopCallback c1(&eventBase, 20);
- CountedLoopCallback c2(
- &eventBase, 10, std::bind(&EventBase::terminateLoopSoon, &eventBase));
- eventBase.runInLoop(&c1);
- eventBase.runInLoop(&c2);
- ASSERT_EQ(c1.getCount(), 20);
- ASSERT_EQ(c2.getCount(), 10);
- eventBase.loopForever();
- // c2 should have stopped the loop after 10 iterations
- ASSERT_EQ(c2.getCount(), 0);
- // We allow the EventBase to run the loop callbacks in whatever order it
- // chooses. We'll accept c1's count being either 10 (if the loop terminated
- // after c1 ran on the 10th iteration) or 11 (if c2 terminated the loop
- // before c1 ran).
- //
- // (With the current code, c1 will always run 10 times, but we don't consider
- // this a hard API requirement.)
- ASSERT_GE(c1.getCount(), 10);
- ASSERT_LE(c1.getCount(), 11);
- }
- TEST(EventBaseTest, messageAvailableException) {
- auto deadManWalking = [] {
- EventBase eventBase;
- std::thread t([&] {
- // Call this from another thread to force use of NotificationQueue in
- // runInEventBaseThread
- eventBase.runInEventBaseThread(
- []() { throw std::runtime_error("boom"); });
- });
- t.join();
- eventBase.loopForever();
- };
- EXPECT_DEATH(deadManWalking(), ".*");
- }
- TEST(EventBaseTest, TryRunningAfterTerminate) {
- EventBase eventBase;
- CountedLoopCallback c1(
- &eventBase, 1, std::bind(&EventBase::terminateLoopSoon, &eventBase));
- eventBase.runInLoop(&c1);
- eventBase.loopForever();
- bool ran = false;
- eventBase.runInEventBaseThread([&]() { ran = true; });
- ASSERT_FALSE(ran);
- }
- // Test cancelling runInLoop() callbacks
- TEST(EventBaseTest, CancelRunInLoop) {
- EventBase eventBase;
- CountedLoopCallback c1(&eventBase, 20);
- CountedLoopCallback c2(&eventBase, 20);
- CountedLoopCallback c3(&eventBase, 20);
- std::function<void()> cancelC1Action =
- std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c1);
- std::function<void()> cancelC2Action =
- std::bind(&EventBase::LoopCallback::cancelLoopCallback, &c2);
- CountedLoopCallback cancelC1(&eventBase, 10, cancelC1Action);
- CountedLoopCallback cancelC2(&eventBase, 10, cancelC2Action);
- // Install cancelC1 after c1
- eventBase.runInLoop(&c1);
- eventBase.runInLoop(&cancelC1);
- // Install cancelC2 before c2
- eventBase.runInLoop(&cancelC2);
- eventBase.runInLoop(&c2);
- // Install c3
- eventBase.runInLoop(&c3);
- ASSERT_EQ(c1.getCount(), 20);
- ASSERT_EQ(c2.getCount(), 20);
- ASSERT_EQ(c3.getCount(), 20);
- ASSERT_EQ(cancelC1.getCount(), 10);
- ASSERT_EQ(cancelC2.getCount(), 10);
- // Run the loop
- eventBase.loop();
- // cancelC1 and cancelC2 should have both fired after 10 iterations and
- // stopped re-installing themselves
- ASSERT_EQ(cancelC1.getCount(), 0);
- ASSERT_EQ(cancelC2.getCount(), 0);
- // c3 should have continued on for the full 20 iterations
- ASSERT_EQ(c3.getCount(), 0);
- // c1 and c2 should have both been cancelled on the 10th iteration.
- //
- // Callbacks are always run in the order they are installed,
- // so c1 should have fired 10 times, and been canceled after it ran on the
- // 10th iteration. c2 should have only fired 9 times, because cancelC2 will
- // have run before it on the 10th iteration, and cancelled it before it
- // fired.
- ASSERT_EQ(c1.getCount(), 10);
- ASSERT_EQ(c2.getCount(), 11);
- }
- class TerminateTestCallback : public EventBase::LoopCallback,
- public EventHandler {
- public:
- TerminateTestCallback(EventBase* eventBase, int fd)
- : EventHandler(eventBase, fd),
- eventBase_(eventBase),
- loopInvocations_(0),
- maxLoopInvocations_(0),
- eventInvocations_(0),
- maxEventInvocations_(0) {}
- void reset(uint32_t maxLoopInvocations, uint32_t maxEventInvocations) {
- loopInvocations_ = 0;
- maxLoopInvocations_ = maxLoopInvocations;
- eventInvocations_ = 0;
- maxEventInvocations_ = maxEventInvocations;
- cancelLoopCallback();
- unregisterHandler();
- }
- void handlerReady(uint16_t /* events */) noexcept override {
- // We didn't register with PERSIST, so we will have been automatically
- // unregistered already.
- ASSERT_FALSE(isHandlerRegistered());
- ++eventInvocations_;
- if (eventInvocations_ >= maxEventInvocations_) {
- return;
- }
- eventBase_->runInLoop(this);
- }
- void runLoopCallback() noexcept override {
- ++loopInvocations_;
- if (loopInvocations_ >= maxLoopInvocations_) {
- return;
- }
- registerHandler(READ);
- }
- uint32_t getLoopInvocations() const {
- return loopInvocations_;
- }
- uint32_t getEventInvocations() const {
- return eventInvocations_;
- }
- private:
- EventBase* eventBase_;
- uint32_t loopInvocations_;
- uint32_t maxLoopInvocations_;
- uint32_t eventInvocations_;
- uint32_t maxEventInvocations_;
- };
- /**
- * Test that EventBase::loop() correctly detects when there are no more events
- * left to run.
- *
- * This uses a single callback, which alternates registering itself as a loop
- * callback versus a EventHandler callback. This exercises a regression where
- * EventBase::loop() incorrectly exited if there were no more fd handlers
- * registered, but a loop callback installed a new fd handler.
- */
- TEST(EventBaseTest, LoopTermination) {
- EventBase eventBase;
- // Open a pipe and close the write end,
- // so the read endpoint will be readable
- int pipeFds[2];
- int rc = pipe(pipeFds);
- ASSERT_EQ(rc, 0);
- close(pipeFds[1]);
- TerminateTestCallback callback(&eventBase, pipeFds[0]);
- // Test once where the callback will exit after a loop callback
- callback.reset(10, 100);
- eventBase.runInLoop(&callback);
- eventBase.loop();
- ASSERT_EQ(callback.getLoopInvocations(), 10);
- ASSERT_EQ(callback.getEventInvocations(), 9);
- // Test once where the callback will exit after an fd event callback
- callback.reset(100, 7);
- eventBase.runInLoop(&callback);
- eventBase.loop();
- ASSERT_EQ(callback.getLoopInvocations(), 7);
- ASSERT_EQ(callback.getEventInvocations(), 7);
- close(pipeFds[0]);
- }
- ///////////////////////////////////////////////////////////////////////////
- // Tests for latency calculations
- ///////////////////////////////////////////////////////////////////////////
- class IdleTimeTimeoutSeries : public AsyncTimeout {
- public:
- explicit IdleTimeTimeoutSeries(
- EventBase* base,
- std::deque<std::size_t>& timeout)
- : AsyncTimeout(base), timeouts_(0), timeout_(timeout) {
- scheduleTimeout(1);
- }
- ~IdleTimeTimeoutSeries() override {}
- void timeoutExpired() noexcept override {
- ++timeouts_;
- if (timeout_.empty()) {
- cancelTimeout();
- } else {
- std::size_t sleepTime = timeout_.front();
- timeout_.pop_front();
- if (sleepTime) {
- usleep(sleepTime);
- }
- scheduleTimeout(1);
- }
- }
- int getTimeouts() const {
- return timeouts_;
- }
- private:
- int timeouts_;
- std::deque<std::size_t>& timeout_;
- };
- /**
- * Verify that idle time is correctly accounted for when decaying our loop
- * time.
- *
- * This works by creating a high loop time (via usleep), expecting a latency
- * callback with known value, and then scheduling a timeout for later. This
- * later timeout is far enough in the future that the idle time should have
- * caused the loop time to decay.
- */
- TEST(EventBaseTest, IdleTime) {
- EventBase eventBase;
- std::deque<std::size_t> timeouts0(4, 8080);
- timeouts0.push_front(8000);
- timeouts0.push_back(14000);
- IdleTimeTimeoutSeries tos0(&eventBase, timeouts0);
- std::deque<std::size_t> timeouts(20, 20);
- std::unique_ptr<IdleTimeTimeoutSeries> tos;
- bool hostOverloaded = false;
- // Loop once before starting the main test. This will run NotificationQueue
- // callbacks that get automatically installed when the EventBase is first
- // created. We want to make sure they don't interfere with the timing
- // operations below.
- eventBase.loopOnce(EVLOOP_NONBLOCK);
- eventBase.setLoadAvgMsec(1000ms);
- eventBase.resetLoadAvg(5900.0);
- auto testStart = std::chrono::steady_clock::now();
- int latencyCallbacks = 0;
- eventBase.setMaxLatency(6000us, [&]() {
- ++latencyCallbacks;
- if (latencyCallbacks != 1) {
- FAIL() << "Unexpected latency callback";
- }
- if (tos0.getTimeouts() < 6) {
- // This could only happen if the host this test is running
- // on is heavily loaded.
- int64_t usElapsed = duration_cast<microseconds>(
- std::chrono::steady_clock::now() - testStart)
- .count();
- EXPECT_LE(43800, usElapsed);
- hostOverloaded = true;
- return;
- }
- EXPECT_EQ(6, tos0.getTimeouts());
- EXPECT_GE(6100, eventBase.getAvgLoopTime() - 1200);
- EXPECT_LE(6100, eventBase.getAvgLoopTime() + 1200);
- tos = std::make_unique<IdleTimeTimeoutSeries>(&eventBase, timeouts);
- });
- // Kick things off with an "immediate" timeout
- tos0.scheduleTimeout(1);
- eventBase.loop();
- if (hostOverloaded) {
- SKIP() << "host too heavily loaded to execute test";
- }
- ASSERT_EQ(1, latencyCallbacks);
- ASSERT_EQ(7, tos0.getTimeouts());
- ASSERT_GE(5900, eventBase.getAvgLoopTime() - 1200);
- ASSERT_LE(5900, eventBase.getAvgLoopTime() + 1200);
- ASSERT_TRUE(!!tos);
- ASSERT_EQ(21, tos->getTimeouts());
- }
- /**
- * Test that thisLoop functionality works with terminateLoopSoon
- */
- TEST(EventBaseTest, ThisLoop) {
- EventBase eb;
- bool runInLoop = false;
- bool runThisLoop = false;
- eb.runInLoop(
- [&]() {
- eb.terminateLoopSoon();
- eb.runInLoop([&]() { runInLoop = true; });
- eb.runInLoop([&]() { runThisLoop = true; }, true);
- },
- true);
- eb.loopForever();
- // Should not work
- ASSERT_FALSE(runInLoop);
- // Should work with thisLoop
- ASSERT_TRUE(runThisLoop);
- }
- TEST(EventBaseTest, EventBaseThreadLoop) {
- EventBase base;
- bool ran = false;
- base.runInEventBaseThread([&]() { ran = true; });
- base.loop();
- ASSERT_TRUE(ran);
- }
- TEST(EventBaseTest, EventBaseThreadName) {
- EventBase base;
- base.setName("foo");
- base.loop();
- #if (__GLIBC__ >= 2) && (__GLIBC_MINOR__ >= 12)
- char name[16];
- pthread_getname_np(pthread_self(), name, 16);
- ASSERT_EQ(0, strcmp("foo", name));
- #endif
- }
- TEST(EventBaseTest, RunBeforeLoop) {
- EventBase base;
- CountedLoopCallback cb(&base, 1, [&]() { base.terminateLoopSoon(); });
- base.runBeforeLoop(&cb);
- base.loopForever();
- ASSERT_EQ(cb.getCount(), 0);
- }
- TEST(EventBaseTest, RunBeforeLoopWait) {
- EventBase base;
- CountedLoopCallback cb(&base, 1);
- base.tryRunAfterDelay([&]() { base.terminateLoopSoon(); }, 500);
- base.runBeforeLoop(&cb);
- base.loopForever();
- // Check that we only ran once, and did not loop multiple times.
- ASSERT_EQ(cb.getCount(), 0);
- }
- class PipeHandler : public EventHandler {
- public:
- PipeHandler(EventBase* eventBase, int fd) : EventHandler(eventBase, fd) {}
- void handlerReady(uint16_t /* events */) noexcept override {
- abort();
- }
- };
- TEST(EventBaseTest, StopBeforeLoop) {
- EventBase evb;
- // Give the evb something to do.
- int p[2];
- ASSERT_EQ(0, pipe(p));
- PipeHandler handler(&evb, p[0]);
- handler.registerHandler(EventHandler::READ);
- // It's definitely not running yet
- evb.terminateLoopSoon();
- // let it run, it should exit quickly.
- std::thread t([&] { evb.loop(); });
- t.join();
- handler.unregisterHandler();
- close(p[0]);
- close(p[1]);
- SUCCEED();
- }
- TEST(EventBaseTest, RunCallbacksOnDestruction) {
- bool ran = false;
- {
- EventBase base;
- base.runInEventBaseThread([&]() { ran = true; });
- }
- ASSERT_TRUE(ran);
- }
- TEST(EventBaseTest, LoopKeepAlive) {
- EventBase evb;
- bool done = false;
- std::thread t([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
- /* sleep override */ std::this_thread::sleep_for(
- std::chrono::milliseconds(100));
- evb.runInEventBaseThread(
- [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
- });
- evb.loop();
- ASSERT_TRUE(done);
- t.join();
- }
- TEST(EventBaseTest, LoopKeepAliveInLoop) {
- EventBase evb;
- bool done = false;
- std::thread t;
- evb.runInEventBaseThread([&] {
- t = std::thread([&, loopKeepAlive = getKeepAliveToken(evb)]() mutable {
- /* sleep override */ std::this_thread::sleep_for(
- std::chrono::milliseconds(100));
- evb.runInEventBaseThread(
- [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
- });
- });
- evb.loop();
- ASSERT_TRUE(done);
- t.join();
- }
- TEST(EventBaseTest, LoopKeepAliveWithLoopForever) {
- std::unique_ptr<EventBase> evb = std::make_unique<EventBase>();
- bool done = false;
- std::thread evThread([&] {
- evb->loopForever();
- evb.reset();
- done = true;
- });
- {
- auto* ev = evb.get();
- Executor::KeepAlive<EventBase> keepAlive;
- ev->runInEventBaseThreadAndWait(
- [&ev, &keepAlive] { keepAlive = getKeepAliveToken(ev); });
- ASSERT_FALSE(done) << "Loop finished before we asked it to";
- ev->terminateLoopSoon();
- /* sleep override */
- std::this_thread::sleep_for(std::chrono::milliseconds(30));
- ASSERT_FALSE(done) << "Loop terminated early";
- ev->runInEventBaseThread([keepAlive = std::move(keepAlive)] {});
- }
- evThread.join();
- ASSERT_TRUE(done);
- }
- TEST(EventBaseTest, LoopKeepAliveShutdown) {
- auto evb = std::make_unique<EventBase>();
- bool done = false;
- std::thread t([&done,
- loopKeepAlive = getKeepAliveToken(evb.get()),
- evbPtr = evb.get()]() mutable {
- /* sleep override */ std::this_thread::sleep_for(
- std::chrono::milliseconds(100));
- evbPtr->runInEventBaseThread(
- [&done, loopKeepAlive = std::move(loopKeepAlive)] { done = true; });
- });
- evb.reset();
- ASSERT_TRUE(done);
- t.join();
- }
- TEST(EventBaseTest, LoopKeepAliveAtomic) {
- auto evb = std::make_unique<EventBase>();
- static constexpr size_t kNumThreads = 100;
- static constexpr size_t kNumTasks = 100;
- std::vector<std::thread> ts;
- std::vector<std::unique_ptr<Baton<>>> batons;
- size_t done{0};
- for (size_t i = 0; i < kNumThreads; ++i) {
- batons.emplace_back(std::make_unique<Baton<>>());
- }
- for (size_t i = 0; i < kNumThreads; ++i) {
- ts.emplace_back([evbPtr = evb.get(), batonPtr = batons[i].get(), &done] {
- std::vector<Executor::KeepAlive<EventBase>> keepAlives;
- for (size_t j = 0; j < kNumTasks; ++j) {
- keepAlives.emplace_back(getKeepAliveToken(evbPtr));
- }
- batonPtr->post();
- /* sleep override */ std::this_thread::sleep_for(std::chrono::seconds(1));
- for (auto& keepAlive : keepAlives) {
- evbPtr->runInEventBaseThread(
- [&done, keepAlive = std::move(keepAlive)]() { ++done; });
- }
- });
- }
- for (auto& baton : batons) {
- baton->wait();
- }
- evb.reset();
- EXPECT_EQ(kNumThreads * kNumTasks, done);
- for (auto& t : ts) {
- t.join();
- }
- }
- TEST(EventBaseTest, LoopKeepAliveCast) {
- EventBase evb;
- Executor::KeepAlive<> keepAlive = getKeepAliveToken(evb);
- }
- TEST(EventBaseTest, DrivableExecutorTest) {
- folly::Promise<bool> p;
- auto f = p.getFuture();
- EventBase base;
- bool finished = false;
- std::thread t([&] {
- /* sleep override */
- std::this_thread::sleep_for(std::chrono::microseconds(10));
- finished = true;
- base.runInEventBaseThread([&]() { p.setValue(true); });
- });
- // Ensure drive does not busy wait
- base.drive(); // TODO: fix notification queue init() extra wakeup
- base.drive();
- EXPECT_TRUE(finished);
- folly::Promise<bool> p2;
- auto f2 = p2.getFuture();
- // Ensure waitVia gets woken up properly, even from
- // a separate thread.
- base.runAfterDelay([&]() { p2.setValue(true); }, 10);
- f2.waitVia(&base);
- EXPECT_TRUE(f2.isReady());
- t.join();
- }
- TEST(EventBaseTest, IOExecutorTest) {
- EventBase base;
- // Ensure EventBase manages itself as an IOExecutor.
- EXPECT_EQ(base.getEventBase(), &base);
- }
- TEST(EventBaseTest, RequestContextTest) {
- EventBase evb;
- auto defaultCtx = RequestContext::get();
- std::weak_ptr<RequestContext> rctx_weak_ptr;
- {
- RequestContextScopeGuard rctx;
- rctx_weak_ptr = RequestContext::saveContext();
- auto context = RequestContext::get();
- EXPECT_NE(defaultCtx, context);
- evb.runInLoop([context] { EXPECT_EQ(context, RequestContext::get()); });
- evb.loop();
- }
- // Ensure that RequestContext created for the scope has been released and
- // deleted.
- EXPECT_EQ(rctx_weak_ptr.expired(), true);
- EXPECT_EQ(defaultCtx, RequestContext::get());
- }
- TEST(EventBaseTest, CancelLoopCallbackRequestContextTest) {
- EventBase evb;
- CountedLoopCallback c(&evb, 1);
- auto defaultCtx = RequestContext::get();
- EXPECT_EQ(defaultCtx, RequestContext::get());
- std::weak_ptr<RequestContext> rctx_weak_ptr;
- {
- RequestContextScopeGuard rctx;
- rctx_weak_ptr = RequestContext::saveContext();
- auto context = RequestContext::get();
- EXPECT_NE(defaultCtx, context);
- evb.runInLoop(&c);
- c.cancelLoopCallback();
- }
- // Ensure that RequestContext created for the scope has been released and
- // deleted.
- EXPECT_EQ(rctx_weak_ptr.expired(), true);
- EXPECT_EQ(defaultCtx, RequestContext::get());
- }
|