AsyncFileWriter.h 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /*
  2. * Copyright 2017-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #pragma once
  17. #include <condition_variable>
  18. #include <mutex>
  19. #include <thread>
  20. #include <folly/File.h>
  21. #include <folly/Range.h>
  22. #include <folly/Synchronized.h>
  23. #include <folly/logging/LogWriter.h>
  24. namespace folly {
  25. /**
  26. * A LogWriter implementation that asynchronously writes to a file descriptor.
  27. *
  28. * This class performs the log I/O in a separarate thread.
  29. *
  30. * The advantage of this class over ImmediateFileWriter is that logging I/O can
  31. * never slow down or block your normal program operation. If log messages are
  32. * generated faster than they can be written, messages will be dropped (and an
  33. * indication of how many messages were dropped will be written to the log file
  34. * when we are able to catch up a bit.)
  35. *
  36. * However, one downside is that if your program crashes, not all log messages
  37. * may have been written, so you may lose messages generated immediately before
  38. * the crash.
  39. */
  40. class AsyncFileWriter : public LogWriter {
  41. public:
  42. /**
  43. * The default maximum buffer size.
  44. *
  45. * The comments for setMaxBufferSize() explain how this parameter is used.
  46. */
  47. static constexpr size_t kDefaultMaxBufferSize = 1024 * 1024;
  48. /**
  49. * Construct an AsyncFileWriter that appends to the file at the specified
  50. * path.
  51. */
  52. explicit AsyncFileWriter(folly::StringPiece path);
  53. /**
  54. * Construct an AsyncFileWriter that writes to the specified File object.
  55. */
  56. explicit AsyncFileWriter(folly::File&& file);
  57. ~AsyncFileWriter();
  58. void writeMessage(folly::StringPiece buffer, uint32_t flags = 0) override;
  59. void writeMessage(std::string&& buffer, uint32_t flags = 0) override;
  60. /**
  61. * Block until the I/O thread has finished writing all messages that
  62. * were already enqueued when flush() was called.
  63. */
  64. void flush() override;
  65. /**
  66. * Returns true if the output steam is a tty.
  67. */
  68. bool ttyOutput() const override;
  69. /**
  70. * Set the maximum buffer size for this AsyncFileWriter, in bytes.
  71. *
  72. * This controls the upper bound on how much unwritten data will be buffered
  73. * in memory. If messages are being logged faster than they can be written
  74. * to output file, new messages will be discarded if they would cause the
  75. * amount of buffered data to exceed this limit.
  76. */
  77. void setMaxBufferSize(size_t size);
  78. /**
  79. * Get the maximum buffer size for this AsyncFileWriter, in bytes.
  80. */
  81. size_t getMaxBufferSize() const;
  82. /**
  83. * Get the output file.
  84. */
  85. const folly::File& getFile() const {
  86. return file_;
  87. }
  88. private:
  89. enum Flags : uint32_t {
  90. // FLAG_IO_THREAD_STARTED indicates that the constructor has started the
  91. // I/O thread.
  92. FLAG_IO_THREAD_STARTED = 0x01,
  93. // FLAG_DESTROYING indicates that the destructor is running and destroying
  94. // the I/O thread.
  95. FLAG_DESTROYING = 0x02,
  96. // FLAG_STOP indicates that the I/O thread has been asked to stop.
  97. // This is set either by the destructor or by preFork()
  98. FLAG_STOP = 0x04,
  99. // FLAG_IO_THREAD_STOPPED indicates that the I/O thread is about to return
  100. // and can now be joined. ioCV_ will be signalled when this flag is set.
  101. FLAG_IO_THREAD_STOPPED = 0x08,
  102. // FLAG_IO_THREAD_JOINED indicates that the I/O thread has been joined.
  103. FLAG_IO_THREAD_JOINED = 0x10,
  104. };
  105. /*
  106. * A simple implementation using two queues.
  107. * All writer threads enqueue into one queue while the I/O thread is
  108. * processing the other.
  109. *
  110. * We could potentially also provide an implementation using folly::MPMCQueue
  111. * in the future, which may improve contention under very high write loads.
  112. */
  113. struct Data {
  114. std::array<std::vector<std::string>, 2> queues;
  115. uint32_t flags{0};
  116. uint64_t ioThreadCounter{0};
  117. size_t maxBufferBytes{kDefaultMaxBufferSize};
  118. size_t currentBufferSize{0};
  119. size_t numDiscarded{0};
  120. std::thread ioThread;
  121. std::vector<std::string>* getCurrentQueue() {
  122. return &queues[ioThreadCounter & 0x1];
  123. }
  124. };
  125. void ioThread();
  126. void performIO(std::vector<std::string>* ioQueue, size_t numDiscarded);
  127. void onIoError(const std::exception& ex);
  128. std::string getNumDiscardedMsg(size_t numDiscarded);
  129. bool preFork();
  130. void postForkParent();
  131. void postForkChild();
  132. void stopIoThread(
  133. folly::Synchronized<Data, std::mutex>::LockedPtr& data,
  134. uint32_t extraFlags);
  135. void restartThread();
  136. folly::File file_;
  137. folly::Synchronized<Data, std::mutex> data_;
  138. /**
  139. * messageReady_ is signaled by writer threads whenever they add a new
  140. * message to the current queue.
  141. */
  142. std::condition_variable messageReady_;
  143. /**
  144. * ioCV_ is signaled by the I/O thread each time it increments
  145. * the ioThreadCounter (once each time around its loop).
  146. */
  147. std::condition_variable ioCV_;
  148. /**
  149. * lockedData_ exists only to help pass the lock between preFork() and
  150. * postForkParent()/postForkChild(). We potentially could add some new
  151. * low-level methods to Synchronized to allow manually locking and unlocking
  152. * to avoid having to store this object as a member variable.
  153. */
  154. folly::Synchronized<Data, std::mutex>::LockedPtr lockedData_;
  155. };
  156. } // namespace folly