EventBase.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #ifndef __STDC_FORMAT_MACROS
  17. #define __STDC_FORMAT_MACROS
  18. #endif
  19. #include <folly/io/async/EventBase.h>
  20. #include <fcntl.h>
  21. #include <memory>
  22. #include <mutex>
  23. #include <thread>
  24. #include <folly/Memory.h>
  25. #include <folly/String.h>
  26. #include <folly/io/async/NotificationQueue.h>
  27. #include <folly/io/async/VirtualEventBase.h>
  28. #include <folly/portability/Unistd.h>
  29. #include <folly/synchronization/Baton.h>
  30. #include <folly/system/ThreadName.h>
  31. namespace folly {
  32. /*
  33. * EventBase::FunctionRunner
  34. */
  35. class EventBase::FunctionRunner
  36. : public NotificationQueue<EventBase::Func>::Consumer {
  37. public:
  38. void messageAvailable(Func&& msg) noexcept override {
  39. // In libevent2, internal events do not break the loop.
  40. // Most users would expect loop(), followed by runInEventBaseThread(),
  41. // to break the loop and check if it should exit or not.
  42. // To have similar bejaviour to libevent1.4, tell the loop to break here.
  43. // Note that loop() may still continue to loop, but it will also check the
  44. // stop_ flag as well as runInLoop callbacks, etc.
  45. event_base_loopbreak(getEventBase()->evb_);
  46. if (!msg) {
  47. // terminateLoopSoon() sends a null message just to
  48. // wake up the loop. We can ignore these messages.
  49. return;
  50. }
  51. msg();
  52. }
  53. };
  54. // The interface used to libevent is not thread-safe. Calls to
  55. // event_init() and event_base_free() directly modify an internal
  56. // global 'current_base', so a mutex is required to protect this.
  57. //
  58. // event_init() should only ever be called once. Subsequent calls
  59. // should be made to event_base_new(). We can recognise that
  60. // event_init() has already been called by simply inspecting current_base.
  61. static std::mutex libevent_mutex_;
  62. /*
  63. * EventBase methods
  64. */
  65. EventBase::EventBase(bool enableTimeMeasurement)
  66. : runOnceCallbacks_(nullptr),
  67. stop_(false),
  68. loopThread_(),
  69. queue_(nullptr),
  70. fnRunner_(nullptr),
  71. maxLatency_(0),
  72. avgLoopTime_(std::chrono::seconds(2)),
  73. maxLatencyLoopTime_(avgLoopTime_),
  74. enableTimeMeasurement_(enableTimeMeasurement),
  75. nextLoopCnt_(
  76. std::size_t(-40)) // Early wrap-around so bugs will manifest soon
  77. ,
  78. latestLoopCnt_(nextLoopCnt_),
  79. startWork_(),
  80. observer_(nullptr),
  81. observerSampleCount_(0),
  82. executionObserver_(nullptr) {
  83. struct event ev;
  84. {
  85. std::lock_guard<std::mutex> lock(libevent_mutex_);
  86. // The value 'current_base' (libevent 1) or
  87. // 'event_global_current_base_' (libevent 2) is filled in by event_set(),
  88. // allowing examination of its value without an explicit reference here.
  89. // If ev.ev_base is nullptr, then event_init() must be called, otherwise
  90. // call event_base_new().
  91. event_set(&ev, 0, 0, nullptr, nullptr);
  92. if (!ev.ev_base) {
  93. evb_ = event_init();
  94. }
  95. }
  96. if (ev.ev_base) {
  97. evb_ = event_base_new();
  98. }
  99. if (UNLIKELY(evb_ == nullptr)) {
  100. LOG(ERROR) << "EventBase(): Failed to init event base.";
  101. folly::throwSystemError("error in EventBase::EventBase()");
  102. }
  103. VLOG(5) << "EventBase(): Created.";
  104. initNotificationQueue();
  105. }
  106. // takes ownership of the event_base
  107. EventBase::EventBase(event_base* evb, bool enableTimeMeasurement)
  108. : runOnceCallbacks_(nullptr),
  109. stop_(false),
  110. loopThread_(),
  111. evb_(evb),
  112. queue_(nullptr),
  113. fnRunner_(nullptr),
  114. maxLatency_(0),
  115. avgLoopTime_(std::chrono::seconds(2)),
  116. maxLatencyLoopTime_(avgLoopTime_),
  117. enableTimeMeasurement_(enableTimeMeasurement),
  118. nextLoopCnt_(
  119. std::size_t(-40)) // Early wrap-around so bugs will manifest soon
  120. ,
  121. latestLoopCnt_(nextLoopCnt_),
  122. startWork_(),
  123. observer_(nullptr),
  124. observerSampleCount_(0),
  125. executionObserver_(nullptr) {
  126. if (UNLIKELY(evb_ == nullptr)) {
  127. LOG(ERROR) << "EventBase(): Pass nullptr as event base.";
  128. throw std::invalid_argument("EventBase(): event base cannot be nullptr");
  129. }
  130. initNotificationQueue();
  131. }
  132. EventBase::~EventBase() {
  133. std::future<void> virtualEventBaseDestroyFuture;
  134. if (virtualEventBase_) {
  135. virtualEventBaseDestroyFuture = virtualEventBase_->destroy();
  136. }
  137. // Keep looping until all keep-alive handles are released. Each keep-alive
  138. // handle signals that some external code will still schedule some work on
  139. // this EventBase (so it's not safe to destroy it).
  140. while (loopKeepAliveCount() > 0) {
  141. applyLoopKeepAlive();
  142. loopOnce();
  143. }
  144. if (virtualEventBaseDestroyFuture.valid()) {
  145. virtualEventBaseDestroyFuture.get();
  146. }
  147. // Call all destruction callbacks, before we start cleaning up our state.
  148. while (!onDestructionCallbacks_.empty()) {
  149. LoopCallback* callback = &onDestructionCallbacks_.front();
  150. onDestructionCallbacks_.pop_front();
  151. callback->runLoopCallback();
  152. }
  153. clearCobTimeouts();
  154. DCHECK_EQ(0u, runBeforeLoopCallbacks_.size());
  155. (void)runLoopCallbacks();
  156. if (!fnRunner_->consumeUntilDrained()) {
  157. LOG(ERROR) << "~EventBase(): Unable to drain notification queue";
  158. }
  159. // Stop consumer before deleting NotificationQueue
  160. fnRunner_->stopConsuming();
  161. {
  162. std::lock_guard<std::mutex> lock(libevent_mutex_);
  163. event_base_free(evb_);
  164. }
  165. for (auto storage : localStorageToDtor_) {
  166. storage->onEventBaseDestruction(*this);
  167. }
  168. VLOG(5) << "EventBase(): Destroyed.";
  169. }
  170. size_t EventBase::getNotificationQueueSize() const {
  171. return queue_->size();
  172. }
  173. void EventBase::setMaxReadAtOnce(uint32_t maxAtOnce) {
  174. fnRunner_->setMaxReadAtOnce(maxAtOnce);
  175. }
  176. void EventBase::checkIsInEventBaseThread() const {
  177. auto evbTid = loopThread_.load(std::memory_order_relaxed);
  178. if (evbTid == std::thread::id()) {
  179. return;
  180. }
  181. // Using getThreadName(evbTid) instead of name_ will work also if
  182. // the thread name is set outside of EventBase (and name_ is empty).
  183. auto curTid = std::this_thread::get_id();
  184. CHECK(evbTid == curTid)
  185. << "This logic must be executed in the event base thread. "
  186. << "Event base thread name: \""
  187. << folly::getThreadName(evbTid).value_or("")
  188. << "\", current thread name: \""
  189. << folly::getThreadName(curTid).value_or("") << "\"";
  190. }
  191. // Set smoothing coefficient for loop load average; input is # of milliseconds
  192. // for exp(-1) decay.
  193. void EventBase::setLoadAvgMsec(std::chrono::milliseconds ms) {
  194. assert(enableTimeMeasurement_);
  195. std::chrono::microseconds us = std::chrono::milliseconds(ms);
  196. if (ms > std::chrono::milliseconds::zero()) {
  197. maxLatencyLoopTime_.setTimeInterval(us);
  198. avgLoopTime_.setTimeInterval(us);
  199. } else {
  200. LOG(ERROR) << "non-positive arg to setLoadAvgMsec()";
  201. }
  202. }
  203. void EventBase::resetLoadAvg(double value) {
  204. assert(enableTimeMeasurement_);
  205. avgLoopTime_.reset(value);
  206. maxLatencyLoopTime_.reset(value);
  207. }
  208. static std::chrono::milliseconds getTimeDelta(
  209. std::chrono::steady_clock::time_point* prev) {
  210. auto result = std::chrono::steady_clock::now() - *prev;
  211. *prev = std::chrono::steady_clock::now();
  212. return std::chrono::duration_cast<std::chrono::milliseconds>(result);
  213. }
  214. void EventBase::waitUntilRunning() {
  215. while (!isRunning()) {
  216. std::this_thread::yield();
  217. }
  218. }
  219. // enters the event_base loop -- will only exit when forced to
  220. bool EventBase::loop() {
  221. return loopBody();
  222. }
  223. bool EventBase::loopIgnoreKeepAlive() {
  224. if (loopKeepAliveActive_) {
  225. // Make sure NotificationQueue is not counted as one of the readers
  226. // (otherwise loopBody won't return until terminateLoopSoon is called).
  227. fnRunner_->stopConsuming();
  228. fnRunner_->startConsumingInternal(this, queue_.get());
  229. loopKeepAliveActive_ = false;
  230. }
  231. return loopBody(0, true);
  232. }
  233. bool EventBase::loopOnce(int flags) {
  234. return loopBody(flags | EVLOOP_ONCE);
  235. }
  236. bool EventBase::loopBody(int flags, bool ignoreKeepAlive) {
  237. VLOG(5) << "EventBase(): Starting loop.";
  238. DCHECK(!invokingLoop_)
  239. << "Your code just tried to loop over an event base from inside another "
  240. << "event base loop. Since libevent is not reentrant, this leads to "
  241. << "undefined behavior in opt builds. Please fix immediately. For the "
  242. << "common case of an inner function that needs to do some synchronous "
  243. << "computation on an event-base, replace getEventBase() by a new, "
  244. << "stack-allocated EvenBase.";
  245. invokingLoop_ = true;
  246. SCOPE_EXIT {
  247. invokingLoop_ = false;
  248. };
  249. int res = 0;
  250. bool ranLoopCallbacks;
  251. bool blocking = !(flags & EVLOOP_NONBLOCK);
  252. bool once = (flags & EVLOOP_ONCE);
  253. // time-measurement variables.
  254. std::chrono::steady_clock::time_point prev;
  255. std::chrono::steady_clock::time_point idleStart = {};
  256. std::chrono::microseconds busy;
  257. std::chrono::microseconds idle;
  258. loopThread_.store(std::this_thread::get_id(), std::memory_order_release);
  259. if (!name_.empty()) {
  260. setThreadName(name_);
  261. }
  262. if (enableTimeMeasurement_) {
  263. prev = std::chrono::steady_clock::now();
  264. idleStart = prev;
  265. }
  266. while (!stop_.load(std::memory_order_relaxed)) {
  267. if (!ignoreKeepAlive) {
  268. applyLoopKeepAlive();
  269. }
  270. ++nextLoopCnt_;
  271. // Run the before loop callbacks
  272. LoopCallbackList callbacks;
  273. callbacks.swap(runBeforeLoopCallbacks_);
  274. while (!callbacks.empty()) {
  275. auto* item = &callbacks.front();
  276. callbacks.pop_front();
  277. item->runLoopCallback();
  278. }
  279. // nobody can add loop callbacks from within this thread if
  280. // we don't have to handle anything to start with...
  281. if (blocking && loopCallbacks_.empty()) {
  282. res = event_base_loop(evb_, EVLOOP_ONCE);
  283. } else {
  284. res = event_base_loop(evb_, EVLOOP_ONCE | EVLOOP_NONBLOCK);
  285. }
  286. ranLoopCallbacks = runLoopCallbacks();
  287. if (enableTimeMeasurement_) {
  288. auto now = std::chrono::steady_clock::now();
  289. busy = std::chrono::duration_cast<std::chrono::microseconds>(
  290. now - startWork_);
  291. idle = std::chrono::duration_cast<std::chrono::microseconds>(
  292. startWork_ - idleStart);
  293. auto loop_time = busy + idle;
  294. avgLoopTime_.addSample(loop_time, busy);
  295. maxLatencyLoopTime_.addSample(loop_time, busy);
  296. if (observer_) {
  297. if (observerSampleCount_++ == observer_->getSampleRate()) {
  298. observerSampleCount_ = 0;
  299. observer_->loopSample(busy.count(), idle.count());
  300. }
  301. }
  302. VLOG(11) << "EventBase " << this << " did not timeout "
  303. << " loop time guess: " << loop_time.count()
  304. << " idle time: " << idle.count()
  305. << " busy time: " << busy.count()
  306. << " avgLoopTime: " << avgLoopTime_.get()
  307. << " maxLatencyLoopTime: " << maxLatencyLoopTime_.get()
  308. << " maxLatency_: " << maxLatency_.count() << "us"
  309. << " notificationQueueSize: " << getNotificationQueueSize()
  310. << " nothingHandledYet(): " << nothingHandledYet();
  311. // see if our average loop time has exceeded our limit
  312. if ((maxLatency_ > std::chrono::microseconds::zero()) &&
  313. (maxLatencyLoopTime_.get() > double(maxLatency_.count()))) {
  314. maxLatencyCob_();
  315. // back off temporarily -- don't keep spamming maxLatencyCob_
  316. // if we're only a bit over the limit
  317. maxLatencyLoopTime_.dampen(0.9);
  318. }
  319. // Our loop run did real work; reset the idle timer
  320. idleStart = now;
  321. } else {
  322. VLOG(11) << "EventBase " << this << " did not timeout";
  323. }
  324. // If the event loop indicate that there were no more events, and
  325. // we also didn't have any loop callbacks to run, there is nothing left to
  326. // do.
  327. if (res != 0 && !ranLoopCallbacks) {
  328. // Since Notification Queue is marked 'internal' some events may not have
  329. // run. Run them manually if so, and continue looping.
  330. //
  331. if (getNotificationQueueSize() > 0) {
  332. fnRunner_->handlerReady(0);
  333. } else {
  334. break;
  335. }
  336. }
  337. if (enableTimeMeasurement_) {
  338. VLOG(11) << "EventBase " << this
  339. << " loop time: " << getTimeDelta(&prev).count();
  340. }
  341. if (once) {
  342. break;
  343. }
  344. }
  345. // Reset stop_ so loop() can be called again
  346. stop_.store(false, std::memory_order_relaxed);
  347. if (res < 0) {
  348. LOG(ERROR) << "EventBase: -- error in event loop, res = " << res;
  349. return false;
  350. } else if (res == 1) {
  351. VLOG(5) << "EventBase: ran out of events (exiting loop)!";
  352. } else if (res > 1) {
  353. LOG(ERROR) << "EventBase: unknown event loop result = " << res;
  354. return false;
  355. }
  356. loopThread_.store({}, std::memory_order_release);
  357. VLOG(5) << "EventBase(): Done with loop.";
  358. return true;
  359. }
  360. ssize_t EventBase::loopKeepAliveCount() {
  361. if (loopKeepAliveCountAtomic_.load(std::memory_order_relaxed)) {
  362. loopKeepAliveCount_ +=
  363. loopKeepAliveCountAtomic_.exchange(0, std::memory_order_relaxed);
  364. }
  365. DCHECK_GE(loopKeepAliveCount_, 0);
  366. return loopKeepAliveCount_;
  367. }
  368. void EventBase::applyLoopKeepAlive() {
  369. auto keepAliveCount = loopKeepAliveCount();
  370. // Make sure default VirtualEventBase won't hold EventBase::loop() forever.
  371. if (virtualEventBase_ && virtualEventBase_->keepAliveCount() == 1) {
  372. --keepAliveCount;
  373. }
  374. if (loopKeepAliveActive_ && keepAliveCount == 0) {
  375. // Restore the notification queue internal flag
  376. fnRunner_->stopConsuming();
  377. fnRunner_->startConsumingInternal(this, queue_.get());
  378. loopKeepAliveActive_ = false;
  379. } else if (!loopKeepAliveActive_ && keepAliveCount > 0) {
  380. // Update the notification queue event to treat it as a normal
  381. // (non-internal) event. The notification queue event always remains
  382. // installed, and the main loop won't exit with it installed.
  383. fnRunner_->stopConsuming();
  384. fnRunner_->startConsuming(this, queue_.get());
  385. loopKeepAliveActive_ = true;
  386. }
  387. }
  388. void EventBase::loopForever() {
  389. bool ret;
  390. {
  391. SCOPE_EXIT {
  392. applyLoopKeepAlive();
  393. };
  394. // Make sure notification queue events are treated as normal events.
  395. // We can't use loopKeepAlive() here since LoopKeepAlive token can only be
  396. // released inside a loop.
  397. ++loopKeepAliveCount_;
  398. SCOPE_EXIT {
  399. --loopKeepAliveCount_;
  400. };
  401. ret = loop();
  402. }
  403. if (!ret) {
  404. folly::throwSystemError("error in EventBase::loopForever()");
  405. }
  406. }
  407. void EventBase::bumpHandlingTime() {
  408. if (!enableTimeMeasurement_) {
  409. return;
  410. }
  411. VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
  412. << " (loop) latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
  413. if (nothingHandledYet()) {
  414. latestLoopCnt_ = nextLoopCnt_;
  415. // set the time
  416. startWork_ = std::chrono::steady_clock::now();
  417. VLOG(11) << "EventBase " << this << " " << __PRETTY_FUNCTION__
  418. << " (loop) startWork_ " << startWork_.time_since_epoch().count();
  419. }
  420. }
  421. void EventBase::terminateLoopSoon() {
  422. VLOG(5) << "EventBase(): Received terminateLoopSoon() command.";
  423. // Set stop to true, so the event loop will know to exit.
  424. stop_.store(true, std::memory_order_relaxed);
  425. // Call event_base_loopbreak() so that libevent will exit the next time
  426. // around the loop.
  427. event_base_loopbreak(evb_);
  428. // If terminateLoopSoon() is called from another thread,
  429. // the EventBase thread might be stuck waiting for events.
  430. // In this case, it won't wake up and notice that stop_ is set until it
  431. // receives another event. Send an empty frame to the notification queue
  432. // so that the event loop will wake up even if there are no other events.
  433. //
  434. // We don't care about the return value of trySendFrame(). If it fails
  435. // this likely means the EventBase already has lots of events waiting
  436. // anyway.
  437. try {
  438. queue_->putMessage(nullptr);
  439. } catch (...) {
  440. // We don't care if putMessage() fails. This likely means
  441. // the EventBase already has lots of events waiting anyway.
  442. }
  443. }
  444. void EventBase::runInLoop(LoopCallback* callback, bool thisIteration) {
  445. dcheckIsInEventBaseThread();
  446. callback->cancelLoopCallback();
  447. callback->context_ = RequestContext::saveContext();
  448. if (runOnceCallbacks_ != nullptr && thisIteration) {
  449. runOnceCallbacks_->push_back(*callback);
  450. } else {
  451. loopCallbacks_.push_back(*callback);
  452. }
  453. }
  454. void EventBase::runInLoop(Func cob, bool thisIteration) {
  455. dcheckIsInEventBaseThread();
  456. auto wrapper = new FunctionLoopCallback(std::move(cob));
  457. wrapper->context_ = RequestContext::saveContext();
  458. if (runOnceCallbacks_ != nullptr && thisIteration) {
  459. runOnceCallbacks_->push_back(*wrapper);
  460. } else {
  461. loopCallbacks_.push_back(*wrapper);
  462. }
  463. }
  464. void EventBase::runOnDestruction(LoopCallback* callback) {
  465. std::lock_guard<std::mutex> lg(onDestructionCallbacksMutex_);
  466. callback->cancelLoopCallback();
  467. onDestructionCallbacks_.push_back(*callback);
  468. }
  469. void EventBase::runBeforeLoop(LoopCallback* callback) {
  470. dcheckIsInEventBaseThread();
  471. callback->cancelLoopCallback();
  472. runBeforeLoopCallbacks_.push_back(*callback);
  473. }
  474. bool EventBase::runInEventBaseThread(Func fn) {
  475. // Send the message.
  476. // It will be received by the FunctionRunner in the EventBase's thread.
  477. // We try not to schedule nullptr callbacks
  478. if (!fn) {
  479. LOG(ERROR) << "EventBase " << this
  480. << ": Scheduling nullptr callbacks is not allowed";
  481. return false;
  482. }
  483. // Short-circuit if we are already in our event base
  484. if (inRunningEventBaseThread()) {
  485. runInLoop(std::move(fn));
  486. return true;
  487. }
  488. try {
  489. queue_->putMessage(std::move(fn));
  490. } catch (const std::exception& ex) {
  491. LOG(ERROR) << "EventBase " << this << ": failed to schedule function "
  492. << "for EventBase thread: " << ex.what();
  493. return false;
  494. }
  495. return true;
  496. }
  497. bool EventBase::runInEventBaseThreadAndWait(Func fn) {
  498. if (inRunningEventBaseThread()) {
  499. LOG(ERROR) << "EventBase " << this << ": Waiting in the event loop is not "
  500. << "allowed";
  501. return false;
  502. }
  503. Baton<> ready;
  504. runInEventBaseThread([&ready, fn = std::move(fn)]() mutable {
  505. SCOPE_EXIT {
  506. ready.post();
  507. };
  508. // A trick to force the stored functor to be executed and then destructed
  509. // before posting the baton and waking the waiting thread.
  510. copy(std::move(fn))();
  511. });
  512. ready.wait();
  513. return true;
  514. }
  515. bool EventBase::runImmediatelyOrRunInEventBaseThreadAndWait(Func fn) {
  516. if (isInEventBaseThread()) {
  517. fn();
  518. return true;
  519. } else {
  520. return runInEventBaseThreadAndWait(std::move(fn));
  521. }
  522. }
  523. bool EventBase::runLoopCallbacks() {
  524. bumpHandlingTime();
  525. if (!loopCallbacks_.empty()) {
  526. // Swap the loopCallbacks_ list with a temporary list on our stack.
  527. // This way we will only run callbacks scheduled at the time
  528. // runLoopCallbacks() was invoked.
  529. //
  530. // If any of these callbacks in turn call runInLoop() to schedule more
  531. // callbacks, those new callbacks won't be run until the next iteration
  532. // around the event loop. This prevents runInLoop() callbacks from being
  533. // able to start file descriptor and timeout based events.
  534. LoopCallbackList currentCallbacks;
  535. currentCallbacks.swap(loopCallbacks_);
  536. runOnceCallbacks_ = &currentCallbacks;
  537. while (!currentCallbacks.empty()) {
  538. LoopCallback* callback = &currentCallbacks.front();
  539. currentCallbacks.pop_front();
  540. folly::RequestContextScopeGuard rctx(std::move(callback->context_));
  541. callback->runLoopCallback();
  542. }
  543. runOnceCallbacks_ = nullptr;
  544. return true;
  545. }
  546. return false;
  547. }
  548. void EventBase::initNotificationQueue() {
  549. // Infinite size queue
  550. queue_ = std::make_unique<NotificationQueue<Func>>();
  551. // We allocate fnRunner_ separately, rather than declaring it directly
  552. // as a member of EventBase solely so that we don't need to include
  553. // NotificationQueue.h from EventBase.h
  554. fnRunner_ = std::make_unique<FunctionRunner>();
  555. // Mark this as an internal event, so event_base_loop() will return if
  556. // there are no other events besides this one installed.
  557. //
  558. // Most callers don't care about the internal notification queue used by
  559. // EventBase. The queue is always installed, so if we did count the queue as
  560. // an active event, loop() would never exit with no more events to process.
  561. // Users can use loopForever() if they do care about the notification queue.
  562. // (This is useful for EventBase threads that do nothing but process
  563. // runInEventBaseThread() notifications.)
  564. fnRunner_->startConsumingInternal(this, queue_.get());
  565. }
  566. void EventBase::SmoothLoopTime::setTimeInterval(
  567. std::chrono::microseconds timeInterval) {
  568. expCoeff_ = -1.0 / timeInterval.count();
  569. VLOG(11) << "expCoeff_ " << expCoeff_ << " " << __PRETTY_FUNCTION__;
  570. }
  571. void EventBase::SmoothLoopTime::reset(double value) {
  572. value_ = value;
  573. }
  574. void EventBase::SmoothLoopTime::addSample(
  575. std::chrono::microseconds total,
  576. std::chrono::microseconds busy) {
  577. if ((buffer_time_ + total) > buffer_interval_ && buffer_cnt_ > 0) {
  578. // See https://en.wikipedia.org/wiki/Exponential_smoothing for
  579. // more info on this calculation.
  580. double coeff = exp(buffer_time_.count() * expCoeff_);
  581. value_ =
  582. value_ * coeff + (1.0 - coeff) * (busy_buffer_.count() / buffer_cnt_);
  583. buffer_time_ = std::chrono::microseconds{0};
  584. busy_buffer_ = std::chrono::microseconds{0};
  585. buffer_cnt_ = 0;
  586. }
  587. buffer_time_ += total;
  588. busy_buffer_ += busy;
  589. buffer_cnt_++;
  590. }
  591. bool EventBase::nothingHandledYet() const noexcept {
  592. VLOG(11) << "latest " << latestLoopCnt_ << " next " << nextLoopCnt_;
  593. return (nextLoopCnt_ != latestLoopCnt_);
  594. }
  595. void EventBase::attachTimeoutManager(AsyncTimeout* obj, InternalEnum internal) {
  596. struct event* ev = obj->getEvent();
  597. assert(ev->ev_base == nullptr);
  598. event_base_set(getLibeventBase(), ev);
  599. if (internal == AsyncTimeout::InternalEnum::INTERNAL) {
  600. // Set the EVLIST_INTERNAL flag
  601. event_ref_flags(ev) |= EVLIST_INTERNAL;
  602. }
  603. }
  604. void EventBase::detachTimeoutManager(AsyncTimeout* obj) {
  605. cancelTimeout(obj);
  606. struct event* ev = obj->getEvent();
  607. ev->ev_base = nullptr;
  608. }
  609. bool EventBase::scheduleTimeout(
  610. AsyncTimeout* obj,
  611. TimeoutManager::timeout_type timeout) {
  612. dcheckIsInEventBaseThread();
  613. // Set up the timeval and add the event
  614. struct timeval tv;
  615. tv.tv_sec = long(timeout.count() / 1000LL);
  616. tv.tv_usec = long((timeout.count() % 1000LL) * 1000LL);
  617. struct event* ev = obj->getEvent();
  618. DCHECK(ev->ev_base);
  619. if (event_add(ev, &tv) < 0) {
  620. LOG(ERROR) << "EventBase: failed to schedule timeout: " << errnoStr(errno);
  621. return false;
  622. }
  623. return true;
  624. }
  625. void EventBase::cancelTimeout(AsyncTimeout* obj) {
  626. dcheckIsInEventBaseThread();
  627. struct event* ev = obj->getEvent();
  628. if (EventUtil::isEventRegistered(ev)) {
  629. event_del(ev);
  630. }
  631. }
  632. void EventBase::setName(const std::string& name) {
  633. dcheckIsInEventBaseThread();
  634. name_ = name;
  635. if (isRunning()) {
  636. setThreadName(loopThread_.load(std::memory_order_relaxed), name_);
  637. }
  638. }
  639. const std::string& EventBase::getName() {
  640. dcheckIsInEventBaseThread();
  641. return name_;
  642. }
  643. void EventBase::scheduleAt(Func&& fn, TimePoint const& timeout) {
  644. auto duration = timeout - now();
  645. timer().scheduleTimeoutFn(
  646. std::move(fn),
  647. std::chrono::duration_cast<std::chrono::milliseconds>(duration));
  648. }
  649. const char* EventBase::getLibeventVersion() {
  650. return event_get_version();
  651. }
  652. const char* EventBase::getLibeventMethod() {
  653. return event_get_method();
  654. }
  655. VirtualEventBase& EventBase::getVirtualEventBase() {
  656. folly::call_once(virtualEventBaseInitFlag_, [&] {
  657. virtualEventBase_ = std::make_unique<VirtualEventBase>(*this);
  658. });
  659. return *virtualEventBase_;
  660. }
  661. EventBase* EventBase::getEventBase() {
  662. return this;
  663. }
  664. constexpr std::chrono::milliseconds EventBase::SmoothLoopTime::buffer_interval_;
  665. } // namespace folly