Compression.cpp 61 KB


  1. /*
  2. * Copyright 2013-present Facebook, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <folly/compression/Compression.h>
  17. #if FOLLY_HAVE_LIBLZ4
  18. #include <lz4.h>
  19. #include <lz4hc.h>
  20. #if LZ4_VERSION_NUMBER >= 10301
  21. #include <lz4frame.h>
  22. #endif
  23. #endif
  24. #include <glog/logging.h>
  25. #if FOLLY_HAVE_LIBSNAPPY
  26. #include <snappy-sinksource.h>
  27. #include <snappy.h>
  28. #endif
  29. #if FOLLY_HAVE_LIBZ
  30. #include <folly/compression/Zlib.h>
  31. #endif
  32. #if FOLLY_HAVE_LIBLZMA
  33. #include <lzma.h>
  34. #endif
  35. #if FOLLY_HAVE_LIBZSTD
  36. #include <folly/compression/Zstd.h>
  37. #endif
  38. #if FOLLY_HAVE_LIBBZ2
  39. #include <folly/portability/Windows.h>
  40. #include <bzlib.h>
  41. #endif
  42. #include <folly/Conv.h>
  43. #include <folly/Memory.h>
  44. #include <folly/Portability.h>
  45. #include <folly/Random.h>
  46. #include <folly/ScopeGuard.h>
  47. #include <folly/Varint.h>
  48. #include <folly/compression/Utils.h>
  49. #include <folly/io/Cursor.h>
  50. #include <folly/lang/Bits.h>
  51. #include <folly/stop_watch.h>
  52. #include <algorithm>
  53. #include <unordered_set>
  54. using folly::io::compression::detail::dataStartsWithLE;
  55. using folly::io::compression::detail::prefixToStringLE;
  56. namespace folly {
  57. namespace io {
  58. Codec::Codec(
  59. CodecType type,
  60. Optional<int> level,
  61. StringPiece name,
  62. bool counters)
  63. : type_(type) {
  64. if (counters) {
  65. bytesBeforeCompression_ = {type,
  66. name,
  67. level,
  68. CompressionCounterKey::BYTES_BEFORE_COMPRESSION,
  69. CompressionCounterType::SUM};
  70. bytesAfterCompression_ = {type,
  71. name,
  72. level,
  73. CompressionCounterKey::BYTES_AFTER_COMPRESSION,
  74. CompressionCounterType::SUM};
  75. bytesBeforeDecompression_ = {
  76. type,
  77. name,
  78. level,
  79. CompressionCounterKey::BYTES_BEFORE_DECOMPRESSION,
  80. CompressionCounterType::SUM};
  81. bytesAfterDecompression_ = {
  82. type,
  83. name,
  84. level,
  85. CompressionCounterKey::BYTES_AFTER_DECOMPRESSION,
  86. CompressionCounterType::SUM};
  87. compressions_ = {type,
  88. name,
  89. level,
  90. CompressionCounterKey::COMPRESSIONS,
  91. CompressionCounterType::SUM};
  92. decompressions_ = {type,
  93. name,
  94. level,
  95. CompressionCounterKey::DECOMPRESSIONS,
  96. CompressionCounterType::SUM};
  97. compressionMilliseconds_ = {type,
  98. name,
  99. level,
  100. CompressionCounterKey::COMPRESSION_MILLISECONDS,
  101. CompressionCounterType::SUM};
  102. decompressionMilliseconds_ = {
  103. type,
  104. name,
  105. level,
  106. CompressionCounterKey::DECOMPRESSION_MILLISECONDS,
  107. CompressionCounterType::SUM};
  108. }
  109. }
  110. namespace {
  111. constexpr uint32_t kLoggingRate = 50;
  112. class Timer {
  113. public:
  114. explicit Timer(folly::detail::CompressionCounter& counter)
  115. : counter_(&counter) {}
  116. ~Timer() {
  117. *counter_ += timer_.elapsed().count();
  118. }
  119. private:
  120. folly::detail::CompressionCounter* counter_;
  121. stop_watch<std::chrono::milliseconds> timer_;
  122. };
  123. } // namespace
  124. // Ensure consistent behavior in the nullptr case
  125. std::unique_ptr<IOBuf> Codec::compress(const IOBuf* data) {
  126. if (data == nullptr) {
  127. throw std::invalid_argument("Codec: data must not be nullptr");
  128. }
  129. const uint64_t len = data->computeChainDataLength();
  130. if (len > maxUncompressedLength()) {
  131. throw std::runtime_error("Codec: uncompressed length too large");
  132. }
  133. bool const logging = folly::Random::oneIn(kLoggingRate);
  134. folly::Optional<Timer> const timer =
  135. logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
  136. auto result = doCompress(data);
  137. if (logging) {
  138. compressions_++;
  139. bytesBeforeCompression_ += len;
  140. bytesAfterCompression_ += result->computeChainDataLength();
  141. }
  142. return result;
  143. }
  144. std::string Codec::compress(const StringPiece data) {
  145. const uint64_t len = data.size();
  146. if (len > maxUncompressedLength()) {
  147. throw std::runtime_error("Codec: uncompressed length too large");
  148. }
  149. bool const logging = folly::Random::oneIn(kLoggingRate);
  150. folly::Optional<Timer> const timer =
  151. logging ? Timer(compressionMilliseconds_) : folly::Optional<Timer>();
  152. auto result = doCompressString(data);
  153. if (logging) {
  154. compressions_++;
  155. bytesBeforeCompression_ += len;
  156. bytesAfterCompression_ += result.size();
  157. }
  158. return result;
  159. }
  160. std::unique_ptr<IOBuf> Codec::uncompress(
  161. const IOBuf* data,
  162. Optional<uint64_t> uncompressedLength) {
  163. if (data == nullptr) {
  164. throw std::invalid_argument("Codec: data must not be nullptr");
  165. }
  166. if (!uncompressedLength) {
  167. if (needsUncompressedLength()) {
  168. throw std::invalid_argument("Codec: uncompressed length required");
  169. }
  170. } else if (*uncompressedLength > maxUncompressedLength()) {
  171. throw std::runtime_error("Codec: uncompressed length too large");
  172. }
  173. if (data->empty()) {
  174. if (uncompressedLength.value_or(0) != 0) {
  175. throw std::runtime_error("Codec: invalid uncompressed length");
  176. }
  177. return IOBuf::create(0);
  178. }
  179. bool const logging = folly::Random::oneIn(kLoggingRate);
  180. folly::Optional<Timer> const timer =
  181. logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
  182. auto result = doUncompress(data, uncompressedLength);
  183. if (logging) {
  184. decompressions_++;
  185. bytesBeforeDecompression_ += data->computeChainDataLength();
  186. bytesAfterDecompression_ += result->computeChainDataLength();
  187. }
  188. return result;
  189. }
  190. std::string Codec::uncompress(
  191. const StringPiece data,
  192. Optional<uint64_t> uncompressedLength) {
  193. if (!uncompressedLength) {
  194. if (needsUncompressedLength()) {
  195. throw std::invalid_argument("Codec: uncompressed length required");
  196. }
  197. } else if (*uncompressedLength > maxUncompressedLength()) {
  198. throw std::runtime_error("Codec: uncompressed length too large");
  199. }
  200. if (data.empty()) {
  201. if (uncompressedLength.value_or(0) != 0) {
  202. throw std::runtime_error("Codec: invalid uncompressed length");
  203. }
  204. return "";
  205. }
  206. bool const logging = folly::Random::oneIn(kLoggingRate);
  207. folly::Optional<Timer> const timer =
  208. logging ? Timer(decompressionMilliseconds_) : folly::Optional<Timer>();
  209. auto result = doUncompressString(data, uncompressedLength);
  210. if (logging) {
  211. decompressions_++;
  212. bytesBeforeDecompression_ += data.size();
  213. bytesAfterDecompression_ += result.size();
  214. }
  215. return result;
  216. }
  217. bool Codec::needsUncompressedLength() const {
  218. return doNeedsUncompressedLength();
  219. }
  220. uint64_t Codec::maxUncompressedLength() const {
  221. return doMaxUncompressedLength();
  222. }
  223. bool Codec::doNeedsUncompressedLength() const {
  224. return false;
  225. }
  226. uint64_t Codec::doMaxUncompressedLength() const {
  227. return UNLIMITED_UNCOMPRESSED_LENGTH;
  228. }
  229. std::vector<std::string> Codec::validPrefixes() const {
  230. return {};
  231. }
  232. bool Codec::canUncompress(const IOBuf*, Optional<uint64_t>) const {
  233. return false;
  234. }
  235. std::string Codec::doCompressString(const StringPiece data) {
  236. const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
  237. auto outputBuffer = doCompress(&inputBuffer);
  238. std::string output;
  239. output.reserve(outputBuffer->computeChainDataLength());
  240. for (auto range : *outputBuffer) {
  241. output.append(reinterpret_cast<const char*>(range.data()), range.size());
  242. }
  243. return output;
  244. }
  245. std::string Codec::doUncompressString(
  246. const StringPiece data,
  247. Optional<uint64_t> uncompressedLength) {
  248. const IOBuf inputBuffer{IOBuf::WRAP_BUFFER, data};
  249. auto outputBuffer = doUncompress(&inputBuffer, uncompressedLength);
  250. std::string output;
  251. output.reserve(outputBuffer->computeChainDataLength());
  252. for (auto range : *outputBuffer) {
  253. output.append(reinterpret_cast<const char*>(range.data()), range.size());
  254. }
  255. return output;
  256. }
  257. uint64_t Codec::maxCompressedLength(uint64_t uncompressedLength) const {
  258. return doMaxCompressedLength(uncompressedLength);
  259. }
  260. Optional<uint64_t> Codec::getUncompressedLength(
  261. const folly::IOBuf* data,
  262. Optional<uint64_t> uncompressedLength) const {
  263. auto const compressedLength = data->computeChainDataLength();
  264. if (compressedLength == 0) {
  265. if (uncompressedLength.value_or(0) != 0) {
  266. throw std::runtime_error("Invalid uncompressed length");
  267. }
  268. return 0;
  269. }
  270. return doGetUncompressedLength(data, uncompressedLength);
  271. }
  272. Optional<uint64_t> Codec::doGetUncompressedLength(
  273. const folly::IOBuf*,
  274. Optional<uint64_t> uncompressedLength) const {
  275. return uncompressedLength;
  276. }
  277. bool StreamCodec::needsDataLength() const {
  278. return doNeedsDataLength();
  279. }
  280. bool StreamCodec::doNeedsDataLength() const {
  281. return false;
  282. }
  283. void StreamCodec::assertStateIs(State expected) const {
  284. if (state_ != expected) {
  285. throw std::logic_error(folly::to<std::string>(
  286. "Codec: state is ", state_, "; expected state ", expected));
  287. }
  288. }
  289. void StreamCodec::resetStream(Optional<uint64_t> uncompressedLength) {
  290. state_ = State::RESET;
  291. uncompressedLength_ = uncompressedLength;
  292. progressMade_ = true;
  293. doResetStream();
  294. }
  295. bool StreamCodec::compressStream(
  296. ByteRange& input,
  297. MutableByteRange& output,
  298. StreamCodec::FlushOp flushOp) {
  299. if (state_ == State::RESET && input.empty() &&
  300. flushOp == StreamCodec::FlushOp::END &&
  301. uncompressedLength().value_or(0) != 0) {
  302. throw std::runtime_error("Codec: invalid uncompressed length");
  303. }
  304. if (!uncompressedLength() && needsDataLength()) {
  305. throw std::runtime_error("Codec: uncompressed length required");
  306. }
  307. if (state_ == State::RESET && !input.empty() &&
  308. uncompressedLength() == uint64_t(0)) {
  309. throw std::runtime_error("Codec: invalid uncompressed length");
  310. }
  311. // Handle input state transitions
  312. switch (flushOp) {
  313. case StreamCodec::FlushOp::NONE:
  314. if (state_ == State::RESET) {
  315. state_ = State::COMPRESS;
  316. }
  317. assertStateIs(State::COMPRESS);
  318. break;
  319. case StreamCodec::FlushOp::FLUSH:
  320. if (state_ == State::RESET || state_ == State::COMPRESS) {
  321. state_ = State::COMPRESS_FLUSH;
  322. }
  323. assertStateIs(State::COMPRESS_FLUSH);
  324. break;
  325. case StreamCodec::FlushOp::END:
  326. if (state_ == State::RESET || state_ == State::COMPRESS) {
  327. state_ = State::COMPRESS_END;
  328. }
  329. assertStateIs(State::COMPRESS_END);
  330. break;
  331. }
  332. size_t const inputSize = input.size();
  333. size_t const outputSize = output.size();
  334. bool const done = doCompressStream(input, output, flushOp);
  335. if (!done && inputSize == input.size() && outputSize == output.size()) {
  336. if (!progressMade_) {
  337. throw std::runtime_error("Codec: No forward progress made");
  338. }
  339. // Throw an exception if there is no progress again next time
  340. progressMade_ = false;
  341. } else {
  342. progressMade_ = true;
  343. }
  344. // Handle output state transitions
  345. if (done) {
  346. if (state_ == State::COMPRESS_FLUSH) {
  347. state_ = State::COMPRESS;
  348. } else if (state_ == State::COMPRESS_END) {
  349. state_ = State::END;
  350. }
  351. // Check internal invariants
  352. DCHECK(input.empty());
  353. DCHECK(flushOp != StreamCodec::FlushOp::NONE);
  354. }
  355. return done;
  356. }
  357. bool StreamCodec::uncompressStream(
  358. ByteRange& input,
  359. MutableByteRange& output,
  360. StreamCodec::FlushOp flushOp) {
  361. if (state_ == State::RESET && input.empty()) {
  362. if (uncompressedLength().value_or(0) == 0) {
  363. return true;
  364. }
  365. return false;
  366. }
  367. // Handle input state transitions
  368. if (state_ == State::RESET) {
  369. state_ = State::UNCOMPRESS;
  370. }
  371. assertStateIs(State::UNCOMPRESS);
  372. size_t const inputSize = input.size();
  373. size_t const outputSize = output.size();
  374. bool const done = doUncompressStream(input, output, flushOp);
  375. if (!done && inputSize == input.size() && outputSize == output.size()) {
  376. if (!progressMade_) {
  377. throw std::runtime_error("Codec: no forward progress made");
  378. }
  379. // Throw an exception if there is no progress again next time
  380. progressMade_ = false;
  381. } else {
  382. progressMade_ = true;
  383. }
  384. // Handle output state transitions
  385. if (done) {
  386. state_ = State::END;
  387. }
  388. return done;
  389. }
  390. static std::unique_ptr<IOBuf> addOutputBuffer(
  391. MutableByteRange& output,
  392. uint64_t size) {
  393. DCHECK(output.empty());
  394. auto buffer = IOBuf::create(size);
  395. buffer->append(buffer->capacity());
  396. output = {buffer->writableData(), buffer->length()};
  397. return buffer;
  398. }
  399. std::unique_ptr<IOBuf> StreamCodec::doCompress(IOBuf const* data) {
  400. uint64_t const uncompressedLength = data->computeChainDataLength();
  401. resetStream(uncompressedLength);
  402. uint64_t const maxCompressedLen = maxCompressedLength(uncompressedLength);
  403. auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
  404. auto constexpr kDefaultBufferLength = uint64_t(4) << 20; // 4 MB
  405. MutableByteRange output;
  406. auto buffer = addOutputBuffer(
  407. output,
  408. maxCompressedLen <= kMaxSingleStepLength ? maxCompressedLen
  409. : kDefaultBufferLength);
  410. // Compress the entire IOBuf chain into the IOBuf chain pointed to by buffer
  411. IOBuf const* current = data;
  412. ByteRange input{current->data(), current->length()};
  413. StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
  414. bool done = false;
  415. while (!done) {
  416. while (input.empty() && current->next() != data) {
  417. current = current->next();
  418. input = {current->data(), current->length()};
  419. }
  420. if (current->next() == data) {
  421. // This is the last input buffer so end the stream
  422. flushOp = StreamCodec::FlushOp::END;
  423. }
  424. if (output.empty()) {
  425. buffer->prependChain(addOutputBuffer(output, kDefaultBufferLength));
  426. }
  427. done = compressStream(input, output, flushOp);
  428. if (done) {
  429. DCHECK(input.empty());
  430. DCHECK(flushOp == StreamCodec::FlushOp::END);
  431. DCHECK_EQ(current->next(), data);
  432. }
  433. }
  434. buffer->prev()->trimEnd(output.size());
  435. return buffer;
  436. }
  437. static uint64_t computeBufferLength(
  438. uint64_t const compressedLength,
  439. uint64_t const blockSize) {
  440. uint64_t constexpr kMaxBufferLength = uint64_t(4) << 20; // 4 MiB
  441. uint64_t const goodBufferSize = 4 * std::max(blockSize, compressedLength);
  442. return std::min(goodBufferSize, kMaxBufferLength);
  443. }
  444. std::unique_ptr<IOBuf> StreamCodec::doUncompress(
  445. IOBuf const* data,
  446. Optional<uint64_t> uncompressedLength) {
  447. auto constexpr kMaxSingleStepLength = uint64_t(64) << 20; // 64 MB
  448. auto constexpr kBlockSize = uint64_t(128) << 10;
  449. auto const defaultBufferLength =
  450. computeBufferLength(data->computeChainDataLength(), kBlockSize);
  451. uncompressedLength = getUncompressedLength(data, uncompressedLength);
  452. resetStream(uncompressedLength);
  453. MutableByteRange output;
  454. auto buffer = addOutputBuffer(
  455. output,
  456. (uncompressedLength && *uncompressedLength <= kMaxSingleStepLength
  457. ? *uncompressedLength
  458. : defaultBufferLength));
  459. // Uncompress the entire IOBuf chain into the IOBuf chain pointed to by buffer
  460. IOBuf const* current = data;
  461. ByteRange input{current->data(), current->length()};
  462. StreamCodec::FlushOp flushOp = StreamCodec::FlushOp::NONE;
  463. bool done = false;
  464. while (!done) {
  465. while (input.empty() && current->next() != data) {
  466. current = current->next();
  467. input = {current->data(), current->length()};
  468. }
  469. if (current->next() == data) {
  470. // Tell the uncompressor there is no more input (it may optimize)
  471. flushOp = StreamCodec::FlushOp::END;
  472. }
  473. if (output.empty()) {
  474. buffer->prependChain(addOutputBuffer(output, defaultBufferLength));
  475. }
  476. done = uncompressStream(input, output, flushOp);
  477. }
  478. if (!input.empty()) {
  479. throw std::runtime_error("Codec: Junk after end of data");
  480. }
  481. buffer->prev()->trimEnd(output.size());
  482. if (uncompressedLength &&
  483. *uncompressedLength != buffer->computeChainDataLength()) {
  484. throw std::runtime_error("Codec: invalid uncompressed length");
  485. }
  486. return buffer;
  487. }
  488. namespace {
  489. /**
  490. * No compression
  491. */
  492. class NoCompressionCodec final : public Codec {
  493. public:
  494. static std::unique_ptr<Codec> create(int level, CodecType type);
  495. explicit NoCompressionCodec(int level, CodecType type);
  496. private:
  497. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  498. std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
  499. std::unique_ptr<IOBuf> doUncompress(
  500. const IOBuf* data,
  501. Optional<uint64_t> uncompressedLength) override;
  502. };
  503. std::unique_ptr<Codec> NoCompressionCodec::create(int level, CodecType type) {
  504. return std::make_unique<NoCompressionCodec>(level, type);
  505. }
  506. NoCompressionCodec::NoCompressionCodec(int level, CodecType type)
  507. : Codec(type) {
  508. DCHECK(type == CodecType::NO_COMPRESSION);
  509. switch (level) {
  510. case COMPRESSION_LEVEL_DEFAULT:
  511. case COMPRESSION_LEVEL_FASTEST:
  512. case COMPRESSION_LEVEL_BEST:
  513. level = 0;
  514. }
  515. if (level != 0) {
  516. throw std::invalid_argument(
  517. to<std::string>("NoCompressionCodec: invalid level ", level));
  518. }
  519. }
  520. uint64_t NoCompressionCodec::doMaxCompressedLength(
  521. uint64_t uncompressedLength) const {
  522. return uncompressedLength;
  523. }
  524. std::unique_ptr<IOBuf> NoCompressionCodec::doCompress(const IOBuf* data) {
  525. return data->clone();
  526. }
  527. std::unique_ptr<IOBuf> NoCompressionCodec::doUncompress(
  528. const IOBuf* data,
  529. Optional<uint64_t> uncompressedLength) {
  530. if (uncompressedLength &&
  531. data->computeChainDataLength() != *uncompressedLength) {
  532. throw std::runtime_error(
  533. to<std::string>("NoCompressionCodec: invalid uncompressed length"));
  534. }
  535. return data->clone();
  536. }
  537. #if (FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA)
  538. namespace {
  539. void encodeVarintToIOBuf(uint64_t val, folly::IOBuf* out) {
  540. DCHECK_GE(out->tailroom(), kMaxVarintLength64);
  541. out->append(encodeVarint(val, out->writableTail()));
  542. }
  543. inline uint64_t decodeVarintFromCursor(folly::io::Cursor& cursor) {
  544. uint64_t val = 0;
  545. int8_t b = 0;
  546. for (int shift = 0; shift <= 63; shift += 7) {
  547. b = cursor.read<int8_t>();
  548. val |= static_cast<uint64_t>(b & 0x7f) << shift;
  549. if (b >= 0) {
  550. break;
  551. }
  552. }
  553. if (b < 0) {
  554. throw std::invalid_argument("Invalid varint value. Too big.");
  555. }
  556. return val;
  557. }
  558. } // namespace
  559. #endif // FOLLY_HAVE_LIBLZ4 || FOLLY_HAVE_LIBLZMA
  560. #if FOLLY_HAVE_LIBLZ4
  561. #if LZ4_VERSION_NUMBER >= 10802 && defined(LZ4_STATIC_LINKING_ONLY) && \
  562. defined(LZ4_HC_STATIC_LINKING_ONLY) && !defined(FOLLY_USE_LZ4_FAST_RESET)
  563. #define FOLLY_USE_LZ4_FAST_RESET
  564. #endif
  565. #ifdef FOLLY_USE_LZ4_FAST_RESET
  566. namespace {
  567. void lz4_stream_t_deleter(LZ4_stream_t* ctx) {
  568. LZ4_freeStream(ctx);
  569. }
  570. void lz4_streamhc_t_deleter(LZ4_streamHC_t* ctx) {
  571. LZ4_freeStreamHC(ctx);
  572. }
  573. } // namespace
  574. #endif
  575. /**
  576. * LZ4 compression
  577. */
  578. class LZ4Codec final : public Codec {
  579. public:
  580. static std::unique_ptr<Codec> create(int level, CodecType type);
  581. explicit LZ4Codec(int level, CodecType type);
  582. private:
  583. bool doNeedsUncompressedLength() const override;
  584. uint64_t doMaxUncompressedLength() const override;
  585. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  586. bool encodeSize() const {
  587. return type() == CodecType::LZ4_VARINT_SIZE;
  588. }
  589. std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
  590. std::unique_ptr<IOBuf> doUncompress(
  591. const IOBuf* data,
  592. Optional<uint64_t> uncompressedLength) override;
  593. #ifdef FOLLY_USE_LZ4_FAST_RESET
  594. std::unique_ptr<
  595. LZ4_stream_t,
  596. folly::static_function_deleter<LZ4_stream_t, lz4_stream_t_deleter>>
  597. ctx;
  598. std::unique_ptr<
  599. LZ4_streamHC_t,
  600. folly::static_function_deleter<LZ4_streamHC_t, lz4_streamhc_t_deleter>>
  601. hcctx;
  602. #endif
  603. bool highCompression_;
  604. };
  605. std::unique_ptr<Codec> LZ4Codec::create(int level, CodecType type) {
  606. return std::make_unique<LZ4Codec>(level, type);
  607. }
  608. static int lz4ConvertLevel(int level) {
  609. switch (level) {
  610. case 1:
  611. case COMPRESSION_LEVEL_FASTEST:
  612. case COMPRESSION_LEVEL_DEFAULT:
  613. return 1;
  614. case 2:
  615. case COMPRESSION_LEVEL_BEST:
  616. return 2;
  617. }
  618. throw std::invalid_argument(
  619. to<std::string>("LZ4Codec: invalid level: ", level));
  620. }
  621. LZ4Codec::LZ4Codec(int level, CodecType type)
  622. : Codec(type, lz4ConvertLevel(level)),
  623. highCompression_(lz4ConvertLevel(level) > 1) {
  624. DCHECK(type == CodecType::LZ4 || type == CodecType::LZ4_VARINT_SIZE);
  625. }
  626. bool LZ4Codec::doNeedsUncompressedLength() const {
  627. return !encodeSize();
  628. }
  629. // The value comes from lz4.h in lz4-r117, but older versions of lz4 don't
  630. // define LZ4_MAX_INPUT_SIZE (even though the max size is the same), so do it
  631. // here.
  632. #ifndef LZ4_MAX_INPUT_SIZE
  633. #define LZ4_MAX_INPUT_SIZE 0x7E000000
  634. #endif
  635. uint64_t LZ4Codec::doMaxUncompressedLength() const {
  636. return LZ4_MAX_INPUT_SIZE;
  637. }
  638. uint64_t LZ4Codec::doMaxCompressedLength(uint64_t uncompressedLength) const {
  639. return LZ4_compressBound(uncompressedLength) +
  640. (encodeSize() ? kMaxVarintLength64 : 0);
  641. }
  642. std::unique_ptr<IOBuf> LZ4Codec::doCompress(const IOBuf* data) {
  643. IOBuf clone;
  644. if (data->isChained()) {
  645. // LZ4 doesn't support streaming, so we have to coalesce
  646. clone = data->cloneCoalescedAsValue();
  647. data = &clone;
  648. }
  649. auto out = IOBuf::create(maxCompressedLength(data->length()));
  650. if (encodeSize()) {
  651. encodeVarintToIOBuf(data->length(), out.get());
  652. }
  653. int n;
  654. auto input = reinterpret_cast<const char*>(data->data());
  655. auto output = reinterpret_cast<char*>(out->writableTail());
  656. const auto inputLength = data->length();
  657. #ifdef FOLLY_USE_LZ4_FAST_RESET
  658. if (!highCompression_ && !ctx) {
  659. ctx.reset(LZ4_createStream());
  660. }
  661. if (highCompression_ && !hcctx) {
  662. hcctx.reset(LZ4_createStreamHC());
  663. }
  664. if (highCompression_) {
  665. n = LZ4_compress_HC_extStateHC_fastReset(
  666. hcctx.get(), input, output, inputLength, out->tailroom(), 0);
  667. } else {
  668. n = LZ4_compress_fast_extState_fastReset(
  669. ctx.get(), input, output, inputLength, out->tailroom(), 1);
  670. }
  671. #elif LZ4_VERSION_NUMBER >= 10700
  672. if (highCompression_) {
  673. n = LZ4_compress_HC(input, output, inputLength, out->tailroom(), 0);
  674. } else {
  675. n = LZ4_compress_default(input, output, inputLength, out->tailroom());
  676. }
  677. #else
  678. if (highCompression_) {
  679. n = LZ4_compressHC(input, output, inputLength);
  680. } else {
  681. n = LZ4_compress(input, output, inputLength);
  682. }
  683. #endif
  684. CHECK_GE(n, 0);
  685. CHECK_LE(n, out->capacity());
  686. out->append(n);
  687. return out;
  688. }
  689. std::unique_ptr<IOBuf> LZ4Codec::doUncompress(
  690. const IOBuf* data,
  691. Optional<uint64_t> uncompressedLength) {
  692. IOBuf clone;
  693. if (data->isChained()) {
  694. // LZ4 doesn't support streaming, so we have to coalesce
  695. clone = data->cloneCoalescedAsValue();
  696. data = &clone;
  697. }
  698. folly::io::Cursor cursor(data);
  699. uint64_t actualUncompressedLength;
  700. if (encodeSize()) {
  701. actualUncompressedLength = decodeVarintFromCursor(cursor);
  702. if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
  703. throw std::runtime_error("LZ4Codec: invalid uncompressed length");
  704. }
  705. } else {
  706. // Invariants
  707. DCHECK(uncompressedLength.hasValue());
  708. DCHECK(*uncompressedLength <= maxUncompressedLength());
  709. actualUncompressedLength = *uncompressedLength;
  710. }
  711. auto sp = StringPiece{cursor.peekBytes()};
  712. auto out = IOBuf::create(actualUncompressedLength);
  713. int n = LZ4_decompress_safe(
  714. sp.data(),
  715. reinterpret_cast<char*>(out->writableTail()),
  716. sp.size(),
  717. actualUncompressedLength);
  718. if (n < 0 || uint64_t(n) != actualUncompressedLength) {
  719. throw std::runtime_error(
  720. to<std::string>("LZ4 decompression returned invalid value ", n));
  721. }
  722. out->append(actualUncompressedLength);
  723. return out;
  724. }
  725. #if LZ4_VERSION_NUMBER >= 10301
  726. class LZ4FrameCodec final : public Codec {
  727. public:
  728. static std::unique_ptr<Codec> create(int level, CodecType type);
  729. explicit LZ4FrameCodec(int level, CodecType type);
  730. ~LZ4FrameCodec() override;
  731. std::vector<std::string> validPrefixes() const override;
  732. bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
  733. const override;
  734. private:
  735. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  736. std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
  737. std::unique_ptr<IOBuf> doUncompress(
  738. const IOBuf* data,
  739. Optional<uint64_t> uncompressedLength) override;
  740. // Reset the dctx_ if it is dirty or null.
  741. void resetDCtx();
  742. int level_;
  743. #ifdef FOLLY_USE_LZ4_FAST_RESET
  744. LZ4F_compressionContext_t cctx_{nullptr};
  745. #endif
  746. LZ4F_decompressionContext_t dctx_{nullptr};
  747. bool dirty_{false};
  748. };
  749. /* static */ std::unique_ptr<Codec> LZ4FrameCodec::create(
  750. int level,
  751. CodecType type) {
  752. return std::make_unique<LZ4FrameCodec>(level, type);
  753. }
  754. static constexpr uint32_t kLZ4FrameMagicLE = 0x184D2204;
  755. std::vector<std::string> LZ4FrameCodec::validPrefixes() const {
  756. return {prefixToStringLE(kLZ4FrameMagicLE)};
  757. }
  758. bool LZ4FrameCodec::canUncompress(const IOBuf* data, Optional<uint64_t>) const {
  759. return dataStartsWithLE(data, kLZ4FrameMagicLE);
  760. }
  761. uint64_t LZ4FrameCodec::doMaxCompressedLength(
  762. uint64_t uncompressedLength) const {
  763. LZ4F_preferences_t prefs{};
  764. prefs.compressionLevel = level_;
  765. prefs.frameInfo.contentSize = uncompressedLength;
  766. return LZ4F_compressFrameBound(uncompressedLength, &prefs);
  767. }
  768. static size_t lz4FrameThrowOnError(size_t code) {
  769. if (LZ4F_isError(code)) {
  770. throw std::runtime_error(
  771. to<std::string>("LZ4Frame error: ", LZ4F_getErrorName(code)));
  772. }
  773. return code;
  774. }
  775. void LZ4FrameCodec::resetDCtx() {
  776. if (dctx_ && !dirty_) {
  777. return;
  778. }
  779. if (dctx_) {
  780. LZ4F_freeDecompressionContext(dctx_);
  781. }
  782. lz4FrameThrowOnError(LZ4F_createDecompressionContext(&dctx_, 100));
  783. dirty_ = false;
  784. }
  785. static int lz4fConvertLevel(int level) {
  786. switch (level) {
  787. case COMPRESSION_LEVEL_FASTEST:
  788. case COMPRESSION_LEVEL_DEFAULT:
  789. return 0;
  790. case COMPRESSION_LEVEL_BEST:
  791. return 16;
  792. }
  793. return level;
  794. }
  795. LZ4FrameCodec::LZ4FrameCodec(int level, CodecType type)
  796. : Codec(type, lz4fConvertLevel(level)), level_(lz4fConvertLevel(level)) {
  797. DCHECK(type == CodecType::LZ4_FRAME);
  798. }
  799. LZ4FrameCodec::~LZ4FrameCodec() {
  800. if (dctx_) {
  801. LZ4F_freeDecompressionContext(dctx_);
  802. }
  803. #ifdef FOLLY_USE_LZ4_FAST_RESET
  804. if (cctx_) {
  805. LZ4F_freeCompressionContext(cctx_);
  806. }
  807. #endif
  808. }
  809. std::unique_ptr<IOBuf> LZ4FrameCodec::doCompress(const IOBuf* data) {
  810. // LZ4 Frame compression doesn't support streaming so we have to coalesce
  811. IOBuf clone;
  812. if (data->isChained()) {
  813. clone = data->cloneCoalescedAsValue();
  814. data = &clone;
  815. }
  816. #ifdef FOLLY_USE_LZ4_FAST_RESET
  817. if (!cctx_) {
  818. lz4FrameThrowOnError(LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION));
  819. }
  820. #endif
  821. // Set preferences
  822. const auto uncompressedLength = data->length();
  823. LZ4F_preferences_t prefs{};
  824. prefs.compressionLevel = level_;
  825. prefs.frameInfo.contentSize = uncompressedLength;
  826. // Compress
  827. auto buf = IOBuf::create(maxCompressedLength(uncompressedLength));
  828. const size_t written = lz4FrameThrowOnError(
  829. #ifdef FOLLY_USE_LZ4_FAST_RESET
  830. LZ4F_compressFrame_usingCDict(
  831. cctx_,
  832. buf->writableTail(),
  833. buf->tailroom(),
  834. data->data(),
  835. data->length(),
  836. nullptr,
  837. &prefs)
  838. #else
  839. LZ4F_compressFrame(
  840. buf->writableTail(),
  841. buf->tailroom(),
  842. data->data(),
  843. data->length(),
  844. &prefs)
  845. #endif
  846. );
  847. buf->append(written);
  848. return buf;
  849. }
  850. std::unique_ptr<IOBuf> LZ4FrameCodec::doUncompress(
  851. const IOBuf* data,
  852. Optional<uint64_t> uncompressedLength) {
  853. // Reset the dctx if any errors have occurred
  854. resetDCtx();
  855. // Coalesce the data
  856. ByteRange in = *data->begin();
  857. IOBuf clone;
  858. if (data->isChained()) {
  859. clone = data->cloneCoalescedAsValue();
  860. in = clone.coalesce();
  861. }
  862. data = nullptr;
  863. // Select decompression options
  864. LZ4F_decompressOptions_t options;
  865. options.stableDst = 1;
  866. // Select blockSize and growthSize for the IOBufQueue
  867. IOBufQueue queue(IOBufQueue::cacheChainLength());
  868. auto blockSize = uint64_t{64} << 10;
  869. auto growthSize = uint64_t{4} << 20;
  870. if (uncompressedLength) {
  871. // Allocate uncompressedLength in one chunk (up to 64 MB)
  872. const auto allocateSize = std::min(*uncompressedLength, uint64_t{64} << 20);
  873. queue.preallocate(allocateSize, allocateSize);
  874. blockSize = std::min(*uncompressedLength, blockSize);
  875. growthSize = std::min(*uncompressedLength, growthSize);
  876. } else {
  877. // Reduce growthSize for small data
  878. const auto guessUncompressedLen =
  879. 4 * std::max<uint64_t>(blockSize, in.size());
  880. growthSize = std::min(guessUncompressedLen, growthSize);
  881. }
  882. // Once LZ4_decompress() is called, the dctx_ cannot be reused until it
  883. // returns 0
  884. dirty_ = true;
  885. // Decompress until the frame is over
  886. size_t code = 0;
  887. do {
  888. // Allocate enough space to decompress at least a block
  889. void* out;
  890. size_t outSize;
  891. std::tie(out, outSize) = queue.preallocate(blockSize, growthSize);
  892. // Decompress
  893. size_t inSize = in.size();
  894. code = lz4FrameThrowOnError(
  895. LZ4F_decompress(dctx_, out, &outSize, in.data(), &inSize, &options));
  896. if (in.empty() && outSize == 0 && code != 0) {
  897. // We passed no input, no output was produced, and the frame isn't over
  898. // No more forward progress is possible
  899. throw std::runtime_error("LZ4Frame error: Incomplete frame");
  900. }
  901. in.uncheckedAdvance(inSize);
  902. queue.postallocate(outSize);
  903. } while (code != 0);
  904. // At this point the decompression context can be reused
  905. dirty_ = false;
  906. if (uncompressedLength && queue.chainLength() != *uncompressedLength) {
  907. throw std::runtime_error("LZ4Frame error: Invalid uncompressedLength");
  908. }
  909. return queue.move();
  910. }
  911. #endif // LZ4_VERSION_NUMBER >= 10301
  912. #endif // FOLLY_HAVE_LIBLZ4
  913. #if FOLLY_HAVE_LIBSNAPPY
  914. /**
  915. * Snappy compression
  916. */
  917. /**
  918. * Implementation of snappy::Source that reads from a IOBuf chain.
  919. */
  920. class IOBufSnappySource final : public snappy::Source {
  921. public:
  922. explicit IOBufSnappySource(const IOBuf* data);
  923. size_t Available() const override;
  924. const char* Peek(size_t* len) override;
  925. void Skip(size_t n) override;
  926. private:
  927. size_t available_;
  928. io::Cursor cursor_;
  929. };
  930. IOBufSnappySource::IOBufSnappySource(const IOBuf* data)
  931. : available_(data->computeChainDataLength()), cursor_(data) {}
  932. size_t IOBufSnappySource::Available() const {
  933. return available_;
  934. }
  935. const char* IOBufSnappySource::Peek(size_t* len) {
  936. auto sp = StringPiece{cursor_.peekBytes()};
  937. *len = sp.size();
  938. return sp.data();
  939. }
  940. void IOBufSnappySource::Skip(size_t n) {
  941. CHECK_LE(n, available_);
  942. cursor_.skip(n);
  943. available_ -= n;
  944. }
  945. class SnappyCodec final : public Codec {
  946. public:
  947. static std::unique_ptr<Codec> create(int level, CodecType type);
  948. explicit SnappyCodec(int level, CodecType type);
  949. private:
  950. uint64_t doMaxUncompressedLength() const override;
  951. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  952. std::unique_ptr<IOBuf> doCompress(const IOBuf* data) override;
  953. std::unique_ptr<IOBuf> doUncompress(
  954. const IOBuf* data,
  955. Optional<uint64_t> uncompressedLength) override;
  956. };
  957. std::unique_ptr<Codec> SnappyCodec::create(int level, CodecType type) {
  958. return std::make_unique<SnappyCodec>(level, type);
  959. }
  960. SnappyCodec::SnappyCodec(int level, CodecType type) : Codec(type) {
  961. DCHECK(type == CodecType::SNAPPY);
  962. switch (level) {
  963. case COMPRESSION_LEVEL_FASTEST:
  964. case COMPRESSION_LEVEL_DEFAULT:
  965. case COMPRESSION_LEVEL_BEST:
  966. level = 1;
  967. }
  968. if (level != 1) {
  969. throw std::invalid_argument(
  970. to<std::string>("SnappyCodec: invalid level: ", level));
  971. }
  972. }
  973. uint64_t SnappyCodec::doMaxUncompressedLength() const {
  974. // snappy.h uses uint32_t for lengths, so there's that.
  975. return std::numeric_limits<uint32_t>::max();
  976. }
  977. uint64_t SnappyCodec::doMaxCompressedLength(uint64_t uncompressedLength) const {
  978. return snappy::MaxCompressedLength(uncompressedLength);
  979. }
  980. std::unique_ptr<IOBuf> SnappyCodec::doCompress(const IOBuf* data) {
  981. IOBufSnappySource source(data);
  982. auto out = IOBuf::create(maxCompressedLength(source.Available()));
  983. snappy::UncheckedByteArraySink sink(
  984. reinterpret_cast<char*>(out->writableTail()));
  985. size_t n = snappy::Compress(&source, &sink);
  986. CHECK_LE(n, out->capacity());
  987. out->append(n);
  988. return out;
  989. }
  990. std::unique_ptr<IOBuf> SnappyCodec::doUncompress(
  991. const IOBuf* data,
  992. Optional<uint64_t> uncompressedLength) {
  993. uint32_t actualUncompressedLength = 0;
  994. {
  995. IOBufSnappySource source(data);
  996. if (!snappy::GetUncompressedLength(&source, &actualUncompressedLength)) {
  997. throw std::runtime_error("snappy::GetUncompressedLength failed");
  998. }
  999. if (uncompressedLength && *uncompressedLength != actualUncompressedLength) {
  1000. throw std::runtime_error("snappy: invalid uncompressed length");
  1001. }
  1002. }
  1003. auto out = IOBuf::create(actualUncompressedLength);
  1004. {
  1005. IOBufSnappySource source(data);
  1006. if (!snappy::RawUncompress(
  1007. &source, reinterpret_cast<char*>(out->writableTail()))) {
  1008. throw std::runtime_error("snappy::RawUncompress failed");
  1009. }
  1010. }
  1011. out->append(actualUncompressedLength);
  1012. return out;
  1013. }
  1014. #endif // FOLLY_HAVE_LIBSNAPPY
  1015. #if FOLLY_HAVE_LIBLZMA
  1016. /**
  1017. * LZMA2 compression
  1018. */
  1019. class LZMA2StreamCodec final : public StreamCodec {
  1020. public:
  1021. static std::unique_ptr<Codec> createCodec(int level, CodecType type);
  1022. static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
  1023. explicit LZMA2StreamCodec(int level, CodecType type);
  1024. ~LZMA2StreamCodec() override;
  1025. std::vector<std::string> validPrefixes() const override;
  1026. bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
  1027. const override;
  1028. private:
  1029. bool doNeedsDataLength() const override;
  1030. uint64_t doMaxUncompressedLength() const override;
  1031. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  1032. bool encodeSize() const {
  1033. return type() == CodecType::LZMA2_VARINT_SIZE;
  1034. }
  1035. void doResetStream() override;
  1036. bool doCompressStream(
  1037. ByteRange& input,
  1038. MutableByteRange& output,
  1039. StreamCodec::FlushOp flushOp) override;
  1040. bool doUncompressStream(
  1041. ByteRange& input,
  1042. MutableByteRange& output,
  1043. StreamCodec::FlushOp flushOp) override;
  1044. void resetCStream();
  1045. void resetDStream();
  1046. bool decodeAndCheckVarint(ByteRange& input);
  1047. bool flushVarintBuffer(MutableByteRange& output);
  1048. void resetVarintBuffer();
  1049. Optional<lzma_stream> cstream_{};
  1050. Optional<lzma_stream> dstream_{};
  1051. std::array<uint8_t, kMaxVarintLength64> varintBuffer_;
  1052. ByteRange varintToEncode_;
  1053. size_t varintBufferPos_{0};
  1054. int level_;
  1055. bool needReset_{true};
  1056. bool needDecodeSize_{false};
  1057. };
  1058. static constexpr uint64_t kLZMA2MagicLE = 0x005A587A37FD;
  1059. static constexpr unsigned kLZMA2MagicBytes = 6;
  1060. std::vector<std::string> LZMA2StreamCodec::validPrefixes() const {
  1061. if (type() == CodecType::LZMA2_VARINT_SIZE) {
  1062. return {};
  1063. }
  1064. return {prefixToStringLE(kLZMA2MagicLE, kLZMA2MagicBytes)};
  1065. }
  1066. bool LZMA2StreamCodec::doNeedsDataLength() const {
  1067. return encodeSize();
  1068. }
  1069. bool LZMA2StreamCodec::canUncompress(const IOBuf* data, Optional<uint64_t>)
  1070. const {
  1071. if (type() == CodecType::LZMA2_VARINT_SIZE) {
  1072. return false;
  1073. }
  1074. // Returns false for all inputs less than 8 bytes.
  1075. // This is okay, because no valid LZMA2 streams are less than 8 bytes.
  1076. return dataStartsWithLE(data, kLZMA2MagicLE, kLZMA2MagicBytes);
  1077. }
  1078. std::unique_ptr<Codec> LZMA2StreamCodec::createCodec(
  1079. int level,
  1080. CodecType type) {
  1081. return make_unique<LZMA2StreamCodec>(level, type);
  1082. }
  1083. std::unique_ptr<StreamCodec> LZMA2StreamCodec::createStream(
  1084. int level,
  1085. CodecType type) {
  1086. return make_unique<LZMA2StreamCodec>(level, type);
  1087. }
  1088. LZMA2StreamCodec::LZMA2StreamCodec(int level, CodecType type)
  1089. : StreamCodec(type) {
  1090. DCHECK(type == CodecType::LZMA2 || type == CodecType::LZMA2_VARINT_SIZE);
  1091. switch (level) {
  1092. case COMPRESSION_LEVEL_FASTEST:
  1093. level = 0;
  1094. break;
  1095. case COMPRESSION_LEVEL_DEFAULT:
  1096. level = LZMA_PRESET_DEFAULT;
  1097. break;
  1098. case COMPRESSION_LEVEL_BEST:
  1099. level = 9;
  1100. break;
  1101. }
  1102. if (level < 0 || level > 9) {
  1103. throw std::invalid_argument(
  1104. to<std::string>("LZMA2Codec: invalid level: ", level));
  1105. }
  1106. level_ = level;
  1107. }
  1108. LZMA2StreamCodec::~LZMA2StreamCodec() {
  1109. if (cstream_) {
  1110. lzma_end(cstream_.get_pointer());
  1111. cstream_.clear();
  1112. }
  1113. if (dstream_) {
  1114. lzma_end(dstream_.get_pointer());
  1115. dstream_.clear();
  1116. }
  1117. }
  1118. uint64_t LZMA2StreamCodec::doMaxUncompressedLength() const {
  1119. // From lzma/base.h: "Stream is roughly 8 EiB (2^63 bytes)"
  1120. return uint64_t(1) << 63;
  1121. }
  1122. uint64_t LZMA2StreamCodec::doMaxCompressedLength(
  1123. uint64_t uncompressedLength) const {
  1124. return lzma_stream_buffer_bound(uncompressedLength) +
  1125. (encodeSize() ? kMaxVarintLength64 : 0);
  1126. }
  1127. void LZMA2StreamCodec::doResetStream() {
  1128. needReset_ = true;
  1129. }
  1130. void LZMA2StreamCodec::resetCStream() {
  1131. if (!cstream_) {
  1132. cstream_.assign(LZMA_STREAM_INIT);
  1133. }
  1134. lzma_ret const rc =
  1135. lzma_easy_encoder(cstream_.get_pointer(), level_, LZMA_CHECK_NONE);
  1136. if (rc != LZMA_OK) {
  1137. throw std::runtime_error(folly::to<std::string>(
  1138. "LZMA2StreamCodec: lzma_easy_encoder error: ", rc));
  1139. }
  1140. }
  1141. void LZMA2StreamCodec::resetDStream() {
  1142. if (!dstream_) {
  1143. dstream_.assign(LZMA_STREAM_INIT);
  1144. }
  1145. lzma_ret const rc = lzma_auto_decoder(
  1146. dstream_.get_pointer(), std::numeric_limits<uint64_t>::max(), 0);
  1147. if (rc != LZMA_OK) {
  1148. throw std::runtime_error(folly::to<std::string>(
  1149. "LZMA2StreamCodec: lzma_auto_decoder error: ", rc));
  1150. }
  1151. }
  1152. static lzma_ret lzmaThrowOnError(lzma_ret const rc) {
  1153. switch (rc) {
  1154. case LZMA_OK:
  1155. case LZMA_STREAM_END:
  1156. case LZMA_BUF_ERROR: // not fatal: returned if no progress was made twice
  1157. return rc;
  1158. default:
  1159. throw std::runtime_error(
  1160. to<std::string>("LZMA2StreamCodec: error: ", rc));
  1161. }
  1162. }
  1163. static lzma_action lzmaTranslateFlush(StreamCodec::FlushOp flush) {
  1164. switch (flush) {
  1165. case StreamCodec::FlushOp::NONE:
  1166. return LZMA_RUN;
  1167. case StreamCodec::FlushOp::FLUSH:
  1168. return LZMA_SYNC_FLUSH;
  1169. case StreamCodec::FlushOp::END:
  1170. return LZMA_FINISH;
  1171. default:
  1172. throw std::invalid_argument("LZMA2StreamCodec: Invalid flush");
  1173. }
  1174. }
  1175. /**
  1176. * Flushes the varint buffer.
  1177. * Advances output by the number of bytes written.
  1178. * Returns true when flushing is complete.
  1179. */
  1180. bool LZMA2StreamCodec::flushVarintBuffer(MutableByteRange& output) {
  1181. if (varintToEncode_.empty()) {
  1182. return true;
  1183. }
  1184. const size_t numBytesToCopy = std::min(varintToEncode_.size(), output.size());
  1185. if (numBytesToCopy > 0) {
  1186. memcpy(output.data(), varintToEncode_.data(), numBytesToCopy);
  1187. }
  1188. varintToEncode_.advance(numBytesToCopy);
  1189. output.advance(numBytesToCopy);
  1190. return varintToEncode_.empty();
  1191. }
  1192. bool LZMA2StreamCodec::doCompressStream(
  1193. ByteRange& input,
  1194. MutableByteRange& output,
  1195. StreamCodec::FlushOp flushOp) {
  1196. if (needReset_) {
  1197. resetCStream();
  1198. if (encodeSize()) {
  1199. varintBufferPos_ = 0;
  1200. size_t const varintSize =
  1201. encodeVarint(*uncompressedLength(), varintBuffer_.data());
  1202. varintToEncode_ = {varintBuffer_.data(), varintSize};
  1203. }
  1204. needReset_ = false;
  1205. }
  1206. if (!flushVarintBuffer(output)) {
  1207. return false;
  1208. }
  1209. cstream_->next_in = const_cast<uint8_t*>(input.data());
  1210. cstream_->avail_in = input.size();
  1211. cstream_->next_out = output.data();
  1212. cstream_->avail_out = output.size();
  1213. SCOPE_EXIT {
  1214. input.uncheckedAdvance(input.size() - cstream_->avail_in);
  1215. output.uncheckedAdvance(output.size() - cstream_->avail_out);
  1216. };
  1217. lzma_ret const rc = lzmaThrowOnError(
  1218. lzma_code(cstream_.get_pointer(), lzmaTranslateFlush(flushOp)));
  1219. switch (flushOp) {
  1220. case StreamCodec::FlushOp::NONE:
  1221. return false;
  1222. case StreamCodec::FlushOp::FLUSH:
  1223. return cstream_->avail_in == 0 && cstream_->avail_out != 0;
  1224. case StreamCodec::FlushOp::END:
  1225. return rc == LZMA_STREAM_END;
  1226. default:
  1227. throw std::invalid_argument("LZMA2StreamCodec: invalid FlushOp");
  1228. }
  1229. }
  1230. /**
  1231. * Attempts to decode a varint from input.
  1232. * The function advances input by the number of bytes read.
  1233. *
  1234. * If there are too many bytes and the varint is not valid, throw a
  1235. * runtime_error.
  1236. *
  1237. * If the uncompressed length was provided and a decoded varint does not match
  1238. * the provided length, throw a runtime_error.
  1239. *
  1240. * Returns true if the varint was successfully decoded and matches the
  1241. * uncompressed length if provided, and false if more bytes are needed.
  1242. */
  1243. bool LZMA2StreamCodec::decodeAndCheckVarint(ByteRange& input) {
  1244. if (input.empty()) {
  1245. return false;
  1246. }
  1247. size_t const numBytesToCopy =
  1248. std::min(kMaxVarintLength64 - varintBufferPos_, input.size());
  1249. memcpy(varintBuffer_.data() + varintBufferPos_, input.data(), numBytesToCopy);
  1250. size_t const rangeSize = varintBufferPos_ + numBytesToCopy;
  1251. ByteRange range{varintBuffer_.data(), rangeSize};
  1252. auto const ret = tryDecodeVarint(range);
  1253. if (ret.hasValue()) {
  1254. size_t const varintSize = rangeSize - range.size();
  1255. input.advance(varintSize - varintBufferPos_);
  1256. if (uncompressedLength() && *uncompressedLength() != ret.value()) {
  1257. throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
  1258. }
  1259. return true;
  1260. } else if (ret.error() == DecodeVarintError::TooManyBytes) {
  1261. throw std::runtime_error("LZMA2StreamCodec: invalid uncompressed length");
  1262. } else {
  1263. // Too few bytes
  1264. input.advance(numBytesToCopy);
  1265. varintBufferPos_ += numBytesToCopy;
  1266. return false;
  1267. }
  1268. }
  1269. bool LZMA2StreamCodec::doUncompressStream(
  1270. ByteRange& input,
  1271. MutableByteRange& output,
  1272. StreamCodec::FlushOp flushOp) {
  1273. if (needReset_) {
  1274. resetDStream();
  1275. needReset_ = false;
  1276. needDecodeSize_ = encodeSize();
  1277. if (encodeSize()) {
  1278. // Reset buffer
  1279. varintBufferPos_ = 0;
  1280. }
  1281. }
  1282. if (needDecodeSize_) {
  1283. // Try decoding the varint. If the input does not contain the entire varint,
  1284. // buffer the input. If the varint can not be decoded, fail.
  1285. if (!decodeAndCheckVarint(input)) {
  1286. return false;
  1287. }
  1288. needDecodeSize_ = false;
  1289. }
  1290. dstream_->next_in = const_cast<uint8_t*>(input.data());
  1291. dstream_->avail_in = input.size();
  1292. dstream_->next_out = output.data();
  1293. dstream_->avail_out = output.size();
  1294. SCOPE_EXIT {
  1295. input.advance(input.size() - dstream_->avail_in);
  1296. output.advance(output.size() - dstream_->avail_out);
  1297. };
  1298. lzma_ret rc;
  1299. switch (flushOp) {
  1300. case StreamCodec::FlushOp::NONE:
  1301. case StreamCodec::FlushOp::FLUSH:
  1302. rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_RUN));
  1303. break;
  1304. case StreamCodec::FlushOp::END:
  1305. rc = lzmaThrowOnError(lzma_code(dstream_.get_pointer(), LZMA_FINISH));
  1306. break;
  1307. default:
  1308. throw std::invalid_argument("LZMA2StreamCodec: invalid flush");
  1309. }
  1310. return rc == LZMA_STREAM_END;
  1311. }
  1312. #endif // FOLLY_HAVE_LIBLZMA
  1313. #if FOLLY_HAVE_LIBZSTD
  1314. static int zstdConvertLevel(int level) {
  1315. switch (level) {
  1316. case COMPRESSION_LEVEL_FASTEST:
  1317. return 1;
  1318. case COMPRESSION_LEVEL_DEFAULT:
  1319. return 1;
  1320. case COMPRESSION_LEVEL_BEST:
  1321. return 19;
  1322. }
  1323. if (level < 1 || level > ZSTD_maxCLevel()) {
  1324. throw std::invalid_argument(
  1325. to<std::string>("ZSTD: invalid level: ", level));
  1326. }
  1327. return level;
  1328. }
  1329. static int zstdFastConvertLevel(int level) {
  1330. switch (level) {
  1331. case COMPRESSION_LEVEL_FASTEST:
  1332. return -5;
  1333. case COMPRESSION_LEVEL_DEFAULT:
  1334. return -1;
  1335. case COMPRESSION_LEVEL_BEST:
  1336. return -1;
  1337. }
  1338. if (level < 1) {
  1339. throw std::invalid_argument(
  1340. to<std::string>("ZSTD: invalid level: ", level));
  1341. }
  1342. return -level;
  1343. }
  1344. std::unique_ptr<Codec> getZstdCodec(int level, CodecType type) {
  1345. DCHECK(type == CodecType::ZSTD);
  1346. return zstd::getCodec(zstd::Options(zstdConvertLevel(level)));
  1347. }
  1348. std::unique_ptr<StreamCodec> getZstdStreamCodec(int level, CodecType type) {
  1349. DCHECK(type == CodecType::ZSTD);
  1350. return zstd::getStreamCodec(zstd::Options(zstdConvertLevel(level)));
  1351. }
  1352. std::unique_ptr<Codec> getZstdFastCodec(int level, CodecType type) {
  1353. DCHECK(type == CodecType::ZSTD_FAST);
  1354. return zstd::getCodec(zstd::Options(zstdFastConvertLevel(level)));
  1355. }
  1356. std::unique_ptr<StreamCodec> getZstdFastStreamCodec(int level, CodecType type) {
  1357. DCHECK(type == CodecType::ZSTD_FAST);
  1358. return zstd::getStreamCodec(zstd::Options(zstdFastConvertLevel(level)));
  1359. }
  1360. #endif // FOLLY_HAVE_LIBZSTD
  1361. #if FOLLY_HAVE_LIBBZ2
  1362. class Bzip2StreamCodec final : public StreamCodec {
  1363. public:
  1364. static std::unique_ptr<Codec> createCodec(int level, CodecType type);
  1365. static std::unique_ptr<StreamCodec> createStream(int level, CodecType type);
  1366. explicit Bzip2StreamCodec(int level, CodecType type);
  1367. ~Bzip2StreamCodec() override;
  1368. std::vector<std::string> validPrefixes() const override;
  1369. bool canUncompress(IOBuf const* data, Optional<uint64_t> uncompressedLength)
  1370. const override;
  1371. private:
  1372. uint64_t doMaxCompressedLength(uint64_t uncompressedLength) const override;
  1373. void doResetStream() override;
  1374. bool doCompressStream(
  1375. ByteRange& input,
  1376. MutableByteRange& output,
  1377. StreamCodec::FlushOp flushOp) override;
  1378. bool doUncompressStream(
  1379. ByteRange& input,
  1380. MutableByteRange& output,
  1381. StreamCodec::FlushOp flushOp) override;
  1382. void resetCStream();
  1383. void resetDStream();
  1384. Optional<bz_stream> cstream_{};
  1385. Optional<bz_stream> dstream_{};
  1386. int level_;
  1387. bool needReset_{true};
  1388. };
  1389. /* static */ std::unique_ptr<Codec> Bzip2StreamCodec::createCodec(
  1390. int level,
  1391. CodecType type) {
  1392. return createStream(level, type);
  1393. }
  1394. /* static */ std::unique_ptr<StreamCodec> Bzip2StreamCodec::createStream(
  1395. int level,
  1396. CodecType type) {
  1397. return std::make_unique<Bzip2StreamCodec>(level, type);
  1398. }
  1399. Bzip2StreamCodec::Bzip2StreamCodec(int level, CodecType type)
  1400. : StreamCodec(type) {
  1401. DCHECK(type == CodecType::BZIP2);
  1402. switch (level) {
  1403. case COMPRESSION_LEVEL_FASTEST:
  1404. level = 1;
  1405. break;
  1406. case COMPRESSION_LEVEL_DEFAULT:
  1407. level = 9;
  1408. break;
  1409. case COMPRESSION_LEVEL_BEST:
  1410. level = 9;
  1411. break;
  1412. }
  1413. if (level < 1 || level > 9) {
  1414. throw std::invalid_argument(
  1415. to<std::string>("Bzip2: invalid level: ", level));
  1416. }
  1417. level_ = level;
  1418. }
  1419. static uint32_t constexpr kBzip2MagicLE = 0x685a42;
  1420. static uint64_t constexpr kBzip2MagicBytes = 3;
  1421. std::vector<std::string> Bzip2StreamCodec::validPrefixes() const {
  1422. return {prefixToStringLE(kBzip2MagicLE, kBzip2MagicBytes)};
  1423. }
  1424. bool Bzip2StreamCodec::canUncompress(IOBuf const* data, Optional<uint64_t>)
  1425. const {
  1426. return dataStartsWithLE(data, kBzip2MagicLE, kBzip2MagicBytes);
  1427. }
  1428. uint64_t Bzip2StreamCodec::doMaxCompressedLength(
  1429. uint64_t uncompressedLength) const {
  1430. // http://www.bzip.org/1.0.5/bzip2-manual-1.0.5.html#bzbufftobuffcompress
  1431. // To guarantee that the compressed data will fit in its buffer, allocate an
  1432. // output buffer of size 1% larger than the uncompressed data, plus six
  1433. // hundred extra bytes.
  1434. return uncompressedLength + uncompressedLength / 100 + 600;
  1435. }
  1436. static bz_stream createBzStream() {
  1437. bz_stream stream;
  1438. stream.bzalloc = nullptr;
  1439. stream.bzfree = nullptr;
  1440. stream.opaque = nullptr;
  1441. stream.next_in = stream.next_out = nullptr;
  1442. stream.avail_in = stream.avail_out = 0;
  1443. return stream;
  1444. }
  1445. // Throws on error condition, otherwise returns the code.
  1446. static int bzCheck(int const rc) {
  1447. switch (rc) {
  1448. case BZ_OK:
  1449. case BZ_RUN_OK:
  1450. case BZ_FLUSH_OK:
  1451. case BZ_FINISH_OK:
  1452. case BZ_STREAM_END:
  1453. // Allow BZ_PARAM_ERROR.
  1454. // It can get returned if no progress is made, but we handle that.
  1455. case BZ_PARAM_ERROR:
  1456. return rc;
  1457. default:
  1458. throw std::runtime_error(to<std::string>("Bzip2 error: ", rc));
  1459. }
  1460. }
  1461. Bzip2StreamCodec::~Bzip2StreamCodec() {
  1462. if (cstream_) {
  1463. BZ2_bzCompressEnd(cstream_.get_pointer());
  1464. cstream_.clear();
  1465. }
  1466. if (dstream_) {
  1467. BZ2_bzDecompressEnd(dstream_.get_pointer());
  1468. dstream_.clear();
  1469. }
  1470. }
  1471. void Bzip2StreamCodec::doResetStream() {
  1472. needReset_ = true;
  1473. }
  1474. void Bzip2StreamCodec::resetCStream() {
  1475. if (cstream_) {
  1476. BZ2_bzCompressEnd(cstream_.get_pointer());
  1477. }
  1478. cstream_ = createBzStream();
  1479. bzCheck(BZ2_bzCompressInit(cstream_.get_pointer(), level_, 0, 0));
  1480. }
  1481. int bzip2TranslateFlush(StreamCodec::FlushOp flushOp) {
  1482. switch (flushOp) {
  1483. case StreamCodec::FlushOp::NONE:
  1484. return BZ_RUN;
  1485. case StreamCodec::FlushOp::END:
  1486. return BZ_FINISH;
  1487. case StreamCodec::FlushOp::FLUSH:
  1488. throw std::invalid_argument(
  1489. "Bzip2StreamCodec: FlushOp::FLUSH not supported");
  1490. default:
  1491. throw std::invalid_argument("Bzip2StreamCodec: Invalid flush");
  1492. }
  1493. }
  1494. bool Bzip2StreamCodec::doCompressStream(
  1495. ByteRange& input,
  1496. MutableByteRange& output,
  1497. StreamCodec::FlushOp flushOp) {
  1498. if (needReset_) {
  1499. resetCStream();
  1500. needReset_ = false;
  1501. }
  1502. if (input.empty() && output.empty()) {
  1503. return false;
  1504. }
  1505. cstream_->next_in =
  1506. const_cast<char*>(reinterpret_cast<const char*>(input.data()));
  1507. cstream_->avail_in = input.size();
  1508. cstream_->next_out = reinterpret_cast<char*>(output.data());
  1509. cstream_->avail_out = output.size();
  1510. SCOPE_EXIT {
  1511. input.uncheckedAdvance(input.size() - cstream_->avail_in);
  1512. output.uncheckedAdvance(output.size() - cstream_->avail_out);
  1513. };
  1514. int const rc = bzCheck(
  1515. BZ2_bzCompress(cstream_.get_pointer(), bzip2TranslateFlush(flushOp)));
  1516. switch (flushOp) {
  1517. case StreamCodec::FlushOp::NONE:
  1518. return false;
  1519. case StreamCodec::FlushOp::FLUSH:
  1520. if (rc == BZ_RUN_OK) {
  1521. DCHECK_EQ(cstream_->avail_in, 0);
  1522. DCHECK(input.size() == 0 || cstream_->avail_out != output.size());
  1523. return true;
  1524. }
  1525. return false;
  1526. case StreamCodec::FlushOp::END:
  1527. return rc == BZ_STREAM_END;
  1528. default:
  1529. throw std::invalid_argument("Bzip2StreamCodec: invalid FlushOp");
  1530. }
  1531. return false;
  1532. }
  1533. void Bzip2StreamCodec::resetDStream() {
  1534. if (dstream_) {
  1535. BZ2_bzDecompressEnd(dstream_.get_pointer());
  1536. }
  1537. dstream_ = createBzStream();
  1538. bzCheck(BZ2_bzDecompressInit(dstream_.get_pointer(), 0, 0));
  1539. }
  1540. bool Bzip2StreamCodec::doUncompressStream(
  1541. ByteRange& input,
  1542. MutableByteRange& output,
  1543. StreamCodec::FlushOp flushOp) {
  1544. if (flushOp == StreamCodec::FlushOp::FLUSH) {
  1545. throw std::invalid_argument(
  1546. "Bzip2StreamCodec: FlushOp::FLUSH not supported");
  1547. }
  1548. if (needReset_) {
  1549. resetDStream();
  1550. needReset_ = false;
  1551. }
  1552. dstream_->next_in =
  1553. const_cast<char*>(reinterpret_cast<const char*>(input.data()));
  1554. dstream_->avail_in = input.size();
  1555. dstream_->next_out = reinterpret_cast<char*>(output.data());
  1556. dstream_->avail_out = output.size();
  1557. SCOPE_EXIT {
  1558. input.uncheckedAdvance(input.size() - dstream_->avail_in);
  1559. output.uncheckedAdvance(output.size() - dstream_->avail_out);
  1560. };
  1561. int const rc = bzCheck(BZ2_bzDecompress(dstream_.get_pointer()));
  1562. return rc == BZ_STREAM_END;
  1563. }
  1564. #endif // FOLLY_HAVE_LIBBZ2
  1565. #if FOLLY_HAVE_LIBZ
  1566. zlib::Options getZlibOptions(CodecType type) {
  1567. DCHECK(type == CodecType::GZIP || type == CodecType::ZLIB);
  1568. return type == CodecType::GZIP ? zlib::defaultGzipOptions()
  1569. : zlib::defaultZlibOptions();
  1570. }
  1571. std::unique_ptr<Codec> getZlibCodec(int level, CodecType type) {
  1572. return zlib::getCodec(getZlibOptions(type), level);
  1573. }
  1574. std::unique_ptr<StreamCodec> getZlibStreamCodec(int level, CodecType type) {
  1575. return zlib::getStreamCodec(getZlibOptions(type), level);
  1576. }
  1577. #endif // FOLLY_HAVE_LIBZ
  1578. /**
  1579. * Automatic decompression
  1580. */
  1581. class AutomaticCodec final : public Codec {
  1582. public:
  1583. static std::unique_ptr<Codec> create(
  1584. std::vector<std::unique_ptr<Codec>> customCodecs,
  1585. std::unique_ptr<Codec> terminalCodec);
  1586. explicit AutomaticCodec(
  1587. std::vector<std::unique_ptr<Codec>> customCodecs,
  1588. std::unique_ptr<Codec> terminalCodec);
  1589. std::vector<std::string> validPrefixes() const override;
  1590. bool canUncompress(const IOBuf* data, Optional<uint64_t> uncompressedLength)
  1591. const override;
  1592. private:
  1593. bool doNeedsUncompressedLength() const override;
  1594. uint64_t doMaxUncompressedLength() const override;
  1595. uint64_t doMaxCompressedLength(uint64_t) const override {
  1596. throw std::runtime_error(
  1597. "AutomaticCodec error: maxCompressedLength() not supported.");
  1598. }
  1599. std::unique_ptr<IOBuf> doCompress(const IOBuf*) override {
  1600. throw std::runtime_error("AutomaticCodec error: compress() not supported.");
  1601. }
  1602. std::unique_ptr<IOBuf> doUncompress(
  1603. const IOBuf* data,
  1604. Optional<uint64_t> uncompressedLength) override;
  1605. void addCodecIfSupported(CodecType type);
  1606. // Throws iff the codecs aren't compatible (very slow)
  1607. void checkCompatibleCodecs() const;
  1608. std::vector<std::unique_ptr<Codec>> codecs_;
  1609. std::unique_ptr<Codec> terminalCodec_;
  1610. bool needsUncompressedLength_;
  1611. uint64_t maxUncompressedLength_;
  1612. };
  1613. std::vector<std::string> AutomaticCodec::validPrefixes() const {
  1614. std::unordered_set<std::string> prefixes;
  1615. for (const auto& codec : codecs_) {
  1616. const auto codecPrefixes = codec->validPrefixes();
  1617. prefixes.insert(codecPrefixes.begin(), codecPrefixes.end());
  1618. }
  1619. return std::vector<std::string>{prefixes.begin(), prefixes.end()};
  1620. }
  1621. bool AutomaticCodec::canUncompress(
  1622. const IOBuf* data,
  1623. Optional<uint64_t> uncompressedLength) const {
  1624. return std::any_of(
  1625. codecs_.begin(),
  1626. codecs_.end(),
  1627. [data, uncompressedLength](std::unique_ptr<Codec> const& codec) {
  1628. return codec->canUncompress(data, uncompressedLength);
  1629. });
  1630. }
  1631. void AutomaticCodec::addCodecIfSupported(CodecType type) {
  1632. const bool present = std::any_of(
  1633. codecs_.begin(),
  1634. codecs_.end(),
  1635. [&type](std::unique_ptr<Codec> const& codec) {
  1636. return codec->type() == type;
  1637. });
  1638. bool const isTerminalType = terminalCodec_ && terminalCodec_->type() == type;
  1639. if (hasCodec(type) && !present && !isTerminalType) {
  1640. codecs_.push_back(getCodec(type));
  1641. }
  1642. }
  1643. /* static */ std::unique_ptr<Codec> AutomaticCodec::create(
  1644. std::vector<std::unique_ptr<Codec>> customCodecs,
  1645. std::unique_ptr<Codec> terminalCodec) {
  1646. return std::make_unique<AutomaticCodec>(
  1647. std::move(customCodecs), std::move(terminalCodec));
  1648. }
  1649. AutomaticCodec::AutomaticCodec(
  1650. std::vector<std::unique_ptr<Codec>> customCodecs,
  1651. std::unique_ptr<Codec> terminalCodec)
  1652. : Codec(CodecType::USER_DEFINED, folly::none, "auto"),
  1653. codecs_(std::move(customCodecs)),
  1654. terminalCodec_(std::move(terminalCodec)) {
  1655. // Fastest -> slowest
  1656. std::array<CodecType, 6> defaultTypes{{
  1657. CodecType::LZ4_FRAME,
  1658. CodecType::ZSTD,
  1659. CodecType::ZLIB,
  1660. CodecType::GZIP,
  1661. CodecType::LZMA2,
  1662. CodecType::BZIP2,
  1663. }};
  1664. for (auto type : defaultTypes) {
  1665. addCodecIfSupported(type);
  1666. }
  1667. if (kIsDebug) {
  1668. checkCompatibleCodecs();
  1669. }
  1670. // Check that none of the codecs are null
  1671. DCHECK(std::none_of(
  1672. codecs_.begin(), codecs_.end(), [](std::unique_ptr<Codec> const& codec) {
  1673. return codec == nullptr;
  1674. }));
  1675. // Check that the terminal codec's type is not duplicated (with the exception
  1676. // of USER_DEFINED).
  1677. if (terminalCodec_) {
  1678. DCHECK(std::none_of(
  1679. codecs_.begin(),
  1680. codecs_.end(),
  1681. [&](std::unique_ptr<Codec> const& codec) {
  1682. return codec->type() != CodecType::USER_DEFINED &&
  1683. codec->type() == terminalCodec_->type();
  1684. }));
  1685. }
  1686. bool const terminalNeedsUncompressedLength =
  1687. terminalCodec_ && terminalCodec_->needsUncompressedLength();
  1688. needsUncompressedLength_ = std::any_of(
  1689. codecs_.begin(),
  1690. codecs_.end(),
  1691. [](std::unique_ptr<Codec> const& codec) {
  1692. return codec->needsUncompressedLength();
  1693. }) ||
  1694. terminalNeedsUncompressedLength;
  1695. const auto it = std::max_element(
  1696. codecs_.begin(),
  1697. codecs_.end(),
  1698. [](std::unique_ptr<Codec> const& lhs, std::unique_ptr<Codec> const& rhs) {
  1699. return lhs->maxUncompressedLength() < rhs->maxUncompressedLength();
  1700. });
  1701. DCHECK(it != codecs_.end());
  1702. auto const terminalMaxUncompressedLength =
  1703. terminalCodec_ ? terminalCodec_->maxUncompressedLength() : 0;
  1704. maxUncompressedLength_ =
  1705. std::max((*it)->maxUncompressedLength(), terminalMaxUncompressedLength);
  1706. }
  1707. void AutomaticCodec::checkCompatibleCodecs() const {
  1708. // Keep track of all the possible headers.
  1709. std::unordered_set<std::string> headers;
  1710. // The empty header is not allowed.
  1711. headers.insert("");
  1712. // Step 1:
  1713. // Construct a set of headers and check that none of the headers occur twice.
  1714. // Eliminate edge cases.
  1715. for (auto&& codec : codecs_) {
  1716. const auto codecHeaders = codec->validPrefixes();
  1717. // Codecs without any valid headers are not allowed.
  1718. if (codecHeaders.empty()) {
  1719. throw std::invalid_argument{
  1720. "AutomaticCodec: validPrefixes() must not be empty."};
  1721. }
  1722. // Insert all the headers for the current codec.
  1723. const size_t beforeSize = headers.size();
  1724. headers.insert(codecHeaders.begin(), codecHeaders.end());
  1725. // Codecs are not compatible if any header occurred twice.
  1726. if (beforeSize + codecHeaders.size() != headers.size()) {
  1727. throw std::invalid_argument{
  1728. "AutomaticCodec: Two valid prefixes collide."};
  1729. }
  1730. }
  1731. // Step 2:
  1732. // Check if any strict non-empty prefix of any header is a header.
  1733. for (const auto& header : headers) {
  1734. for (size_t i = 1; i < header.size(); ++i) {
  1735. if (headers.count(header.substr(0, i))) {
  1736. throw std::invalid_argument{
  1737. "AutomaticCodec: One valid prefix is a prefix of another valid "
  1738. "prefix."};
  1739. }
  1740. }
  1741. }
  1742. }
  1743. bool AutomaticCodec::doNeedsUncompressedLength() const {
  1744. return needsUncompressedLength_;
  1745. }
  1746. uint64_t AutomaticCodec::doMaxUncompressedLength() const {
  1747. return maxUncompressedLength_;
  1748. }
  1749. std::unique_ptr<IOBuf> AutomaticCodec::doUncompress(
  1750. const IOBuf* data,
  1751. Optional<uint64_t> uncompressedLength) {
  1752. try {
  1753. for (auto&& codec : codecs_) {
  1754. if (codec->canUncompress(data, uncompressedLength)) {
  1755. return codec->uncompress(data, uncompressedLength);
  1756. }
  1757. }
  1758. } catch (std::exception const& e) {
  1759. if (!terminalCodec_) {
  1760. throw e;
  1761. }
  1762. }
  1763. // Try terminal codec
  1764. if (terminalCodec_) {
  1765. return terminalCodec_->uncompress(data, uncompressedLength);
  1766. }
  1767. throw std::runtime_error("AutomaticCodec error: Unknown compressed data");
  1768. }
  1769. using CodecFactory = std::unique_ptr<Codec> (*)(int, CodecType);
  1770. using StreamCodecFactory = std::unique_ptr<StreamCodec> (*)(int, CodecType);
  1771. struct Factory {
  1772. CodecFactory codec;
  1773. StreamCodecFactory stream;
  1774. };
  1775. constexpr Factory
  1776. codecFactories[static_cast<size_t>(CodecType::NUM_CODEC_TYPES)] = {
  1777. {}, // USER_DEFINED
  1778. {NoCompressionCodec::create, nullptr},
  1779. #if FOLLY_HAVE_LIBLZ4
  1780. {LZ4Codec::create, nullptr},
  1781. #else
  1782. {},
  1783. #endif
  1784. #if FOLLY_HAVE_LIBSNAPPY
  1785. {SnappyCodec::create, nullptr},
  1786. #else
  1787. {},
  1788. #endif
  1789. #if FOLLY_HAVE_LIBZ
  1790. {getZlibCodec, getZlibStreamCodec},
  1791. #else
  1792. {},
  1793. #endif
  1794. #if FOLLY_HAVE_LIBLZ4
  1795. {LZ4Codec::create, nullptr},
  1796. #else
  1797. {},
  1798. #endif
  1799. #if FOLLY_HAVE_LIBLZMA
  1800. {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
  1801. {LZMA2StreamCodec::createCodec, LZMA2StreamCodec::createStream},
  1802. #else
  1803. {},
  1804. {},
  1805. #endif
  1806. #if FOLLY_HAVE_LIBZSTD
  1807. {getZstdCodec, getZstdStreamCodec},
  1808. #else
  1809. {},
  1810. #endif
  1811. #if FOLLY_HAVE_LIBZ
  1812. {getZlibCodec, getZlibStreamCodec},
  1813. #else
  1814. {},
  1815. #endif
  1816. #if (FOLLY_HAVE_LIBLZ4 && LZ4_VERSION_NUMBER >= 10301)
  1817. {LZ4FrameCodec::create, nullptr},
  1818. #else
  1819. {},
  1820. #endif
  1821. #if FOLLY_HAVE_LIBBZ2
  1822. {Bzip2StreamCodec::createCodec, Bzip2StreamCodec::createStream},
  1823. #else
  1824. {},
  1825. #endif
  1826. #if FOLLY_HAVE_LIBZSTD
  1827. {getZstdFastCodec, getZstdFastStreamCodec},
  1828. #else
  1829. {},
  1830. #endif
  1831. };
  1832. Factory const& getFactory(CodecType type) {
  1833. size_t const idx = static_cast<size_t>(type);
  1834. if (idx >= static_cast<size_t>(CodecType::NUM_CODEC_TYPES)) {
  1835. throw std::invalid_argument(
  1836. to<std::string>("Compression type ", idx, " invalid"));
  1837. }
  1838. return codecFactories[idx];
  1839. }
  1840. } // namespace
  1841. bool hasCodec(CodecType type) {
  1842. return getFactory(type).codec != nullptr;
  1843. }
  1844. std::unique_ptr<Codec> getCodec(CodecType type, int level) {
  1845. auto const factory = getFactory(type).codec;
  1846. if (!factory) {
  1847. throw std::invalid_argument(
  1848. to<std::string>("Compression type ", type, " not supported"));
  1849. }
  1850. auto codec = (*factory)(level, type);
  1851. DCHECK(codec->type() == type);
  1852. return codec;
  1853. }
  1854. bool hasStreamCodec(CodecType type) {
  1855. return getFactory(type).stream != nullptr;
  1856. }
  1857. std::unique_ptr<StreamCodec> getStreamCodec(CodecType type, int level) {
  1858. auto const factory = getFactory(type).stream;
  1859. if (!factory) {
  1860. throw std::invalid_argument(
  1861. to<std::string>("Compression type ", type, " not supported"));
  1862. }
  1863. auto codec = (*factory)(level, type);
  1864. DCHECK(codec->type() == type);
  1865. return codec;
  1866. }
  1867. std::unique_ptr<Codec> getAutoUncompressionCodec(
  1868. std::vector<std::unique_ptr<Codec>> customCodecs,
  1869. std::unique_ptr<Codec> terminalCodec) {
  1870. return AutomaticCodec::create(
  1871. std::move(customCodecs), std::move(terminalCodec));
  1872. }
  1873. } // namespace io
  1874. } // namespace folly