ShutdownSocketSet.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. /*
  2. * Copyright 2014-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/ShutdownSocketSet.h>
  17. #include <chrono>
  18. #include <thread>
  19. #include <glog/logging.h>
  20. #include <folly/FileUtil.h>
  21. #include <folly/portability/Sockets.h>
  22. namespace folly {
  23. ShutdownSocketSet::ShutdownSocketSet(int maxFd)
  24. : maxFd_(maxFd),
  25. data_(static_cast<std::atomic<uint8_t>*>(
  26. folly::checkedCalloc(size_t(maxFd), sizeof(std::atomic<uint8_t>)))),
  27. nullFile_("/dev/null", O_RDWR) {}
  28. void ShutdownSocketSet::add(int fd) {
  29. // Silently ignore any fds >= maxFd_, very unlikely
  30. DCHECK_GE(fd, 0);
  31. if (fd >= maxFd_) {
  32. return;
  33. }
  34. auto& sref = data_[size_t(fd)];
  35. uint8_t prevState = FREE;
  36. CHECK(sref.compare_exchange_strong(
  37. prevState, IN_USE, std::memory_order_relaxed))
  38. << "Invalid prev state for fd " << fd << ": " << int(prevState);
  39. }
  40. void ShutdownSocketSet::remove(int fd) {
  41. DCHECK_GE(fd, 0);
  42. if (fd >= maxFd_) {
  43. return;
  44. }
  45. auto& sref = data_[size_t(fd)];
  46. uint8_t prevState = 0;
  47. prevState = sref.load(std::memory_order_relaxed);
  48. do {
  49. switch (prevState) {
  50. case IN_SHUTDOWN:
  51. std::this_thread::sleep_for(std::chrono::milliseconds(1));
  52. prevState = sref.load(std::memory_order_relaxed);
  53. continue;
  54. case FREE:
  55. LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
  56. << int(prevState);
  57. }
  58. } while (
  59. !sref.compare_exchange_weak(prevState, FREE, std::memory_order_relaxed));
  60. }
  61. int ShutdownSocketSet::close(int fd) {
  62. DCHECK_GE(fd, 0);
  63. if (fd >= maxFd_) {
  64. return folly::closeNoInt(fd);
  65. }
  66. auto& sref = data_[size_t(fd)];
  67. uint8_t prevState = sref.load(std::memory_order_relaxed);
  68. uint8_t newState = 0;
  69. do {
  70. switch (prevState) {
  71. case IN_USE:
  72. case SHUT_DOWN:
  73. newState = FREE;
  74. break;
  75. case IN_SHUTDOWN:
  76. newState = MUST_CLOSE;
  77. break;
  78. default:
  79. LOG(FATAL) << "Invalid prev state for fd " << fd << ": "
  80. << int(prevState);
  81. }
  82. } while (!sref.compare_exchange_weak(
  83. prevState, newState, std::memory_order_relaxed));
  84. return newState == FREE ? folly::closeNoInt(fd) : 0;
  85. }
  86. void ShutdownSocketSet::shutdown(int fd, bool abortive) {
  87. DCHECK_GE(fd, 0);
  88. if (fd >= maxFd_) {
  89. doShutdown(fd, abortive);
  90. return;
  91. }
  92. auto& sref = data_[size_t(fd)];
  93. uint8_t prevState = IN_USE;
  94. if (!sref.compare_exchange_strong(
  95. prevState, IN_SHUTDOWN, std::memory_order_relaxed)) {
  96. return;
  97. }
  98. doShutdown(fd, abortive);
  99. prevState = IN_SHUTDOWN;
  100. if (sref.compare_exchange_strong(
  101. prevState, SHUT_DOWN, std::memory_order_relaxed)) {
  102. return;
  103. }
  104. CHECK_EQ(prevState, MUST_CLOSE)
  105. << "Invalid prev state for fd " << fd << ": " << int(prevState);
  106. folly::closeNoInt(fd); // ignore errors, nothing to do
  107. CHECK(
  108. sref.compare_exchange_strong(prevState, FREE, std::memory_order_relaxed))
  109. << "Invalid prev state for fd " << fd << ": " << int(prevState);
  110. }
  111. void ShutdownSocketSet::shutdownAll(bool abortive) {
  112. for (int i = 0; i < maxFd_; ++i) {
  113. auto& sref = data_[size_t(i)];
  114. if (sref.load(std::memory_order_relaxed) == IN_USE) {
  115. shutdown(i, abortive);
  116. }
  117. }
  118. }
  119. void ShutdownSocketSet::doShutdown(int fd, bool abortive) {
  120. // shutdown() the socket first, to awaken any threads blocked on the fd
  121. // (subsequent IO will fail because it's been shutdown); close()ing the
  122. // socket does not wake up blockers, see
  123. // http://stackoverflow.com/a/3624545/1736339
  124. folly::shutdownNoInt(fd, SHUT_RDWR);
  125. // If abortive shutdown is desired, we'll set the SO_LINGER option on
  126. // the socket with a timeout of 0; this will cause RST to be sent on
  127. // close.
  128. if (abortive) {
  129. struct linger l = {1, 0};
  130. if (setsockopt(fd, SOL_SOCKET, SO_LINGER, &l, sizeof(l)) != 0) {
  131. // Probably not a socket, ignore.
  132. return;
  133. }
  134. }
  135. // We can't close() the socket, as that would be dangerous; a new file
  136. // could be opened and get the same file descriptor, and then code assuming
  137. // the old fd would do IO in the wrong place. We'll (atomically) dup2
  138. // /dev/null onto the fd instead.
  139. folly::dup2NoInt(nullFile_.fd(), fd);
  140. }
  141. } // namespace folly