IOBufQueue.cpp 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/io/IOBufQueue.h>
  17. #include <string.h>
  18. #include <stdexcept>
  19. using std::make_pair;
  20. using std::pair;
  21. using std::unique_ptr;
  22. namespace {
  23. using folly::IOBuf;
  24. const size_t MIN_ALLOC_SIZE = 2000;
  25. const size_t MAX_ALLOC_SIZE = 8000;
  26. const size_t MAX_PACK_COPY = 4096;
  27. /**
  28. * Convenience function to append chain src to chain dst.
  29. */
  30. void appendToChain(unique_ptr<IOBuf>& dst, unique_ptr<IOBuf>&& src, bool pack) {
  31. if (dst == nullptr) {
  32. dst = std::move(src);
  33. } else {
  34. IOBuf* tail = dst->prev();
  35. if (pack) {
  36. // Copy up to MAX_PACK_COPY bytes if we can free buffers; this helps
  37. // reduce wastage (the tail's tailroom and the head's headroom) when
  38. // joining two IOBufQueues together.
  39. size_t copyRemaining = MAX_PACK_COPY;
  40. std::size_t n;
  41. while (src && (n = src->length()) < copyRemaining &&
  42. n < tail->tailroom() && n > 0) {
  43. memcpy(tail->writableTail(), src->data(), n);
  44. tail->append(n);
  45. copyRemaining -= n;
  46. src = src->pop();
  47. }
  48. }
  49. if (src) {
  50. tail->appendChain(std::move(src));
  51. }
  52. }
  53. }
  54. } // namespace
  55. namespace folly {
  56. IOBufQueue::IOBufQueue(const Options& options)
  57. : options_(options), cachePtr_(&localCache_) {
  58. localCache_.attached = true;
  59. }
  60. IOBufQueue::~IOBufQueue() {
  61. clearWritableRangeCache();
  62. }
  63. IOBufQueue::IOBufQueue(IOBufQueue&& other) noexcept
  64. : options_(other.options_), cachePtr_(&localCache_) {
  65. other.clearWritableRangeCache();
  66. head_ = std::move(other.head_);
  67. chainLength_ = other.chainLength_;
  68. tailStart_ = other.tailStart_;
  69. localCache_.cachedRange = other.localCache_.cachedRange;
  70. localCache_.attached = true;
  71. other.chainLength_ = 0;
  72. other.tailStart_ = nullptr;
  73. other.localCache_.cachedRange = {nullptr, nullptr};
  74. }
  75. IOBufQueue& IOBufQueue::operator=(IOBufQueue&& other) {
  76. if (&other != this) {
  77. other.clearWritableRangeCache();
  78. clearWritableRangeCache();
  79. options_ = other.options_;
  80. head_ = std::move(other.head_);
  81. chainLength_ = other.chainLength_;
  82. tailStart_ = other.tailStart_;
  83. localCache_.cachedRange = other.localCache_.cachedRange;
  84. localCache_.attached = true;
  85. other.chainLength_ = 0;
  86. other.tailStart_ = nullptr;
  87. other.localCache_.cachedRange = {nullptr, nullptr};
  88. }
  89. return *this;
  90. }
  91. std::pair<void*, std::size_t> IOBufQueue::headroom() {
  92. // Note, headroom is independent from the tail, so we don't need to flush the
  93. // cache.
  94. if (head_) {
  95. return std::make_pair(head_->writableBuffer(), head_->headroom());
  96. } else {
  97. return std::make_pair(nullptr, 0);
  98. }
  99. }
  100. void IOBufQueue::markPrepended(std::size_t n) {
  101. if (n == 0) {
  102. return;
  103. }
  104. // Note, headroom is independent from the tail, so we don't need to flush the
  105. // cache.
  106. assert(head_);
  107. head_->prepend(n);
  108. chainLength_ += n;
  109. }
  110. void IOBufQueue::prepend(const void* buf, std::size_t n) {
  111. // We're not touching the tail, so we don't need to flush the cache.
  112. auto hroom = head_->headroom();
  113. if (!head_ || hroom < n) {
  114. throw std::overflow_error("Not enough room to prepend");
  115. }
  116. memcpy(head_->writableBuffer() + hroom - n, buf, n);
  117. head_->prepend(n);
  118. chainLength_ += n;
  119. }
  120. void IOBufQueue::append(unique_ptr<IOBuf>&& buf, bool pack) {
  121. if (!buf) {
  122. return;
  123. }
  124. auto guard = updateGuard();
  125. if (options_.cacheChainLength) {
  126. chainLength_ += buf->computeChainDataLength();
  127. }
  128. appendToChain(head_, std::move(buf), pack);
  129. }
  130. void IOBufQueue::append(IOBufQueue& other, bool pack) {
  131. if (!other.head_) {
  132. return;
  133. }
  134. // We're going to chain other, thus we need to grab both guards.
  135. auto otherGuard = other.updateGuard();
  136. auto guard = updateGuard();
  137. if (options_.cacheChainLength) {
  138. if (other.options_.cacheChainLength) {
  139. chainLength_ += other.chainLength_;
  140. } else {
  141. chainLength_ += other.head_->computeChainDataLength();
  142. }
  143. }
  144. appendToChain(head_, std::move(other.head_), pack);
  145. other.chainLength_ = 0;
  146. }
  147. void IOBufQueue::append(const void* buf, size_t len) {
  148. auto guard = updateGuard();
  149. auto src = static_cast<const uint8_t*>(buf);
  150. while (len != 0) {
  151. if ((head_ == nullptr) || head_->prev()->isSharedOne() ||
  152. (head_->prev()->tailroom() == 0)) {
  153. appendToChain(
  154. head_,
  155. IOBuf::create(
  156. std::max(MIN_ALLOC_SIZE, std::min(len, MAX_ALLOC_SIZE))),
  157. false);
  158. }
  159. IOBuf* last = head_->prev();
  160. std::size_t copyLen = std::min(len, (size_t)last->tailroom());
  161. memcpy(last->writableTail(), src, copyLen);
  162. src += copyLen;
  163. last->append(copyLen);
  164. chainLength_ += copyLen;
  165. len -= copyLen;
  166. }
  167. }
  168. void IOBufQueue::wrapBuffer(
  169. const void* buf,
  170. size_t len,
  171. std::size_t blockSize) {
  172. auto src = static_cast<const uint8_t*>(buf);
  173. while (len != 0) {
  174. size_t n = std::min(len, size_t(blockSize));
  175. append(IOBuf::wrapBuffer(src, n));
  176. src += n;
  177. len -= n;
  178. }
  179. }
  180. pair<void*, std::size_t> IOBufQueue::preallocateSlow(
  181. std::size_t min,
  182. std::size_t newAllocationSize,
  183. std::size_t max) {
  184. // Avoid grabbing update guard, since we're manually setting the cache ptrs.
  185. flushCache();
  186. // Allocate a new buffer of the requested max size.
  187. unique_ptr<IOBuf> newBuf(IOBuf::create(std::max(min, newAllocationSize)));
  188. tailStart_ = newBuf->writableTail();
  189. cachePtr_->cachedRange = std::pair<uint8_t*, uint8_t*>(
  190. tailStart_, tailStart_ + newBuf->tailroom());
  191. appendToChain(head_, std::move(newBuf), false);
  192. return make_pair(writableTail(), std::min<std::size_t>(max, tailroom()));
  193. }
  194. unique_ptr<IOBuf> IOBufQueue::split(size_t n, bool throwOnUnderflow) {
  195. auto guard = updateGuard();
  196. unique_ptr<IOBuf> result;
  197. while (n != 0) {
  198. if (head_ == nullptr) {
  199. if (throwOnUnderflow) {
  200. throw std::underflow_error(
  201. "Attempt to remove more bytes than are present in IOBufQueue");
  202. } else {
  203. break;
  204. }
  205. } else if (head_->length() <= n) {
  206. n -= head_->length();
  207. chainLength_ -= head_->length();
  208. unique_ptr<IOBuf> remainder = head_->pop();
  209. appendToChain(result, std::move(head_), false);
  210. head_ = std::move(remainder);
  211. } else {
  212. unique_ptr<IOBuf> clone = head_->cloneOne();
  213. clone->trimEnd(clone->length() - n);
  214. appendToChain(result, std::move(clone), false);
  215. head_->trimStart(n);
  216. chainLength_ -= n;
  217. break;
  218. }
  219. }
  220. if (UNLIKELY(result == nullptr)) {
  221. return IOBuf::create(0);
  222. }
  223. return result;
  224. }
  225. void IOBufQueue::trimStart(size_t amount) {
  226. auto trimmed = trimStartAtMost(amount);
  227. if (trimmed != amount) {
  228. throw std::underflow_error(
  229. "Attempt to trim more bytes than are present in IOBufQueue");
  230. }
  231. }
  232. size_t IOBufQueue::trimStartAtMost(size_t amount) {
  233. auto guard = updateGuard();
  234. auto original = amount;
  235. while (amount > 0) {
  236. if (!head_) {
  237. break;
  238. }
  239. if (head_->length() > amount) {
  240. head_->trimStart(amount);
  241. chainLength_ -= amount;
  242. amount = 0;
  243. break;
  244. }
  245. amount -= head_->length();
  246. chainLength_ -= head_->length();
  247. head_ = head_->pop();
  248. }
  249. return original - amount;
  250. }
  251. void IOBufQueue::trimEnd(size_t amount) {
  252. auto trimmed = trimEndAtMost(amount);
  253. if (trimmed != amount) {
  254. throw std::underflow_error(
  255. "Attempt to trim more bytes than are present in IOBufQueue");
  256. }
  257. }
  258. size_t IOBufQueue::trimEndAtMost(size_t amount) {
  259. auto guard = updateGuard();
  260. auto original = amount;
  261. while (amount > 0) {
  262. if (!head_) {
  263. break;
  264. }
  265. if (head_->prev()->length() > amount) {
  266. head_->prev()->trimEnd(amount);
  267. chainLength_ -= amount;
  268. amount = 0;
  269. break;
  270. }
  271. amount -= head_->prev()->length();
  272. chainLength_ -= head_->prev()->length();
  273. if (head_->isChained()) {
  274. head_->prev()->unlink();
  275. } else {
  276. head_.reset();
  277. }
  278. }
  279. return original - amount;
  280. }
  281. std::unique_ptr<folly::IOBuf> IOBufQueue::pop_front() {
  282. auto guard = updateGuard();
  283. if (!head_) {
  284. return nullptr;
  285. }
  286. chainLength_ -= head_->length();
  287. std::unique_ptr<folly::IOBuf> retBuf = std::move(head_);
  288. head_ = retBuf->pop();
  289. return retBuf;
  290. }
  291. void IOBufQueue::clear() {
  292. if (!head_) {
  293. return;
  294. }
  295. auto guard = updateGuard();
  296. IOBuf* buf = head_.get();
  297. do {
  298. buf->clear();
  299. buf = buf->next();
  300. } while (buf != head_.get());
  301. chainLength_ = 0;
  302. }
  303. void IOBufQueue::appendToString(std::string& out) const {
  304. if (!head_) {
  305. return;
  306. }
  307. auto len = options_.cacheChainLength
  308. ? chainLength_ + (cachePtr_->cachedRange.first - tailStart_)
  309. : head_->computeChainDataLength() +
  310. (cachePtr_->cachedRange.first - tailStart_);
  311. out.reserve(out.size() + len);
  312. for (auto range : *head_) {
  313. out.append(reinterpret_cast<const char*>(range.data()), range.size());
  314. }
  315. if (tailStart_ != cachePtr_->cachedRange.first) {
  316. out.append(
  317. reinterpret_cast<const char*>(tailStart_),
  318. cachePtr_->cachedRange.first - tailStart_);
  319. }
  320. }
  321. void IOBufQueue::gather(std::size_t maxLength) {
  322. auto guard = updateGuard();
  323. if (head_ != nullptr) {
  324. head_->gather(maxLength);
  325. }
  326. }
  327. } // namespace folly