CacheChange.h 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file CacheChange.h
  16. */
  17. #ifndef _FASTDDS_RTPS_CACHECHANGE_H_
  18. #define _FASTDDS_RTPS_CACHECHANGE_H_
  19. #include <fastdds/rtps/common/Types.h>
  20. #include <fastdds/rtps/common/WriteParams.h>
  21. #include <fastdds/rtps/common/SerializedPayload.h>
  22. #include <fastdds/rtps/common/Time_t.h>
  23. #include <fastdds/rtps/common/InstanceHandle.h>
  24. #include <fastdds/rtps/common/FragmentNumber.h>
  25. #include <vector>
  26. namespace eprosima {
  27. namespace fastrtps {
  28. namespace rtps {
  29. /**
  30. * @enum ChangeKind_t, different types of CacheChange_t.
  31. * @ingroup COMMON_MODULE
  32. */
  33. enum RTPS_DllAPI ChangeKind_t
  34. {
  35. ALIVE, //!< ALIVE
  36. NOT_ALIVE_DISPOSED, //!< NOT_ALIVE_DISPOSED
  37. NOT_ALIVE_UNREGISTERED, //!< NOT_ALIVE_UNREGISTERED
  38. NOT_ALIVE_DISPOSED_UNREGISTERED //!< NOT_ALIVE_DISPOSED_UNREGISTERED
  39. };
  40. /**
  41. * Structure CacheChange_t, contains information on a specific CacheChange.
  42. * @ingroup COMMON_MODULE
  43. */
  44. struct RTPS_DllAPI CacheChange_t
  45. {
  46. //!Kind of change, default value ALIVE.
  47. ChangeKind_t kind = ALIVE;
  48. //!GUID_t of the writer that generated this change.
  49. GUID_t writerGUID;
  50. //!Handle of the data associated wiht this change.
  51. InstanceHandle_t instanceHandle;
  52. //!SequenceNumber of the change
  53. SequenceNumber_t sequenceNumber;
  54. //!Serialized Payload associated with the change.
  55. SerializedPayload_t serializedPayload;
  56. //!Indicates if the cache has been read (only used in READERS)
  57. bool isRead = false;
  58. //!Source TimeStamp (only used in Readers)
  59. Time_t sourceTimestamp;
  60. //!Reception TimeStamp (only used in Readers)
  61. Time_t receptionTimestamp;
  62. WriteParams write_params;
  63. bool is_untyped_ = true;
  64. /*!
  65. * @brief Default constructor.
  66. * Creates an empty CacheChange_t.
  67. */
  68. CacheChange_t()
  69. {
  70. }
  71. CacheChange_t(
  72. const CacheChange_t&) = delete;
  73. const CacheChange_t& operator =(
  74. const CacheChange_t&) = delete;
  75. /**
  76. * Constructor with payload size
  77. * @param payload_size Serialized payload size
  78. * @param is_untyped Flag to mark the change as untyped.
  79. */
  80. CacheChange_t(
  81. uint32_t payload_size,
  82. bool is_untyped = false)
  83. : serializedPayload(payload_size)
  84. , is_untyped_(is_untyped)
  85. {
  86. }
  87. /*!
  88. * Copy a different change into this one. All the elements are copied, included the data, allocating new memory.
  89. * @param[in] ch_ptr Pointer to the change.
  90. * @return True if correct.
  91. */
  92. bool copy(
  93. const CacheChange_t* ch_ptr)
  94. {
  95. kind = ch_ptr->kind;
  96. writerGUID = ch_ptr->writerGUID;
  97. instanceHandle = ch_ptr->instanceHandle;
  98. sequenceNumber = ch_ptr->sequenceNumber;
  99. sourceTimestamp = ch_ptr->sourceTimestamp;
  100. write_params = ch_ptr->write_params;
  101. isRead = ch_ptr->isRead;
  102. fragment_size_ = ch_ptr->fragment_size_;
  103. fragment_count_ = ch_ptr->fragment_count_;
  104. first_missing_fragment_ = ch_ptr->first_missing_fragment_;
  105. return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
  106. }
  107. /*!
  108. * Copy information form a different change into this one.
  109. * All the elements are copied except data.
  110. * @param[in] ch_ptr Pointer to the change.
  111. */
  112. void copy_not_memcpy(
  113. const CacheChange_t* ch_ptr)
  114. {
  115. kind = ch_ptr->kind;
  116. writerGUID = ch_ptr->writerGUID;
  117. instanceHandle = ch_ptr->instanceHandle;
  118. sequenceNumber = ch_ptr->sequenceNumber;
  119. sourceTimestamp = ch_ptr->sourceTimestamp;
  120. write_params = ch_ptr->write_params;
  121. isRead = ch_ptr->isRead;
  122. // Copy certain values from serializedPayload
  123. serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
  124. // Copy fragment size and calculate fragment count
  125. setFragmentSize(ch_ptr->fragment_size_, false);
  126. }
  127. ~CacheChange_t()
  128. {
  129. }
  130. /*!
  131. * Get the number of fragments this change is split into.
  132. * @return number of fragments.
  133. */
  134. uint32_t getFragmentCount() const
  135. {
  136. return fragment_count_;
  137. }
  138. /*!
  139. * Get the size of each fragment this change is split into.
  140. * @return size of fragment (0 means change is not fragmented).
  141. */
  142. uint16_t getFragmentSize() const
  143. {
  144. return fragment_size_;
  145. }
  146. /*!
  147. * Checks if all fragments have been received.
  148. * @return true when change is fully assembled (i.e. no missing fragments).
  149. */
  150. bool is_fully_assembled()
  151. {
  152. return first_missing_fragment_ >= fragment_count_;
  153. }
  154. /*!
  155. * Fills a FragmentNumberSet_t with the list of missing fragments.
  156. * @param [out] frag_sns FragmentNumberSet_t where result is stored.
  157. */
  158. void get_missing_fragments(
  159. FragmentNumberSet_t& frag_sns)
  160. {
  161. // Note: Fragment numbers are 1-based but we keep them 0 based.
  162. frag_sns.base(first_missing_fragment_ + 1);
  163. // Traverse list of missing fragments, adding them to frag_sns
  164. uint32_t current_frag = first_missing_fragment_;
  165. while (current_frag < fragment_count_)
  166. {
  167. frag_sns.add(current_frag + 1);
  168. current_frag = get_next_missing_fragment(current_frag);
  169. }
  170. }
  171. /*!
  172. * Set fragment size for this change.
  173. *
  174. * @param fragment_size Size of fragments.
  175. * @param create_fragment_list Whether to create missing fragments list or not.
  176. *
  177. * @remarks Parameter create_fragment_list should only be true when receiving the first
  178. * fragment of a change.
  179. */
  180. void setFragmentSize(
  181. uint16_t fragment_size,
  182. bool create_fragment_list = false)
  183. {
  184. fragment_size_ = fragment_size;
  185. fragment_count_ = 0;
  186. first_missing_fragment_ = 0;
  187. if (fragment_size > 0)
  188. {
  189. // This follows RTPS 8.3.7.3.5
  190. fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
  191. if (create_fragment_list)
  192. {
  193. // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
  194. // fragment will have fragment_count_ as 'next fragment index'
  195. size_t offset = 0;
  196. for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
  197. {
  198. set_next_missing_fragment(i - 1, i); // index to next fragment in missing list
  199. }
  200. }
  201. else
  202. {
  203. // List not created. This means we are going to send this change fragmented, so it is already
  204. // assembled, and the missing list is empty (i.e. first missing points to fragment count)
  205. first_missing_fragment_ = fragment_count_;
  206. }
  207. }
  208. }
  209. bool add_fragments(
  210. const SerializedPayload_t& incoming_data,
  211. uint32_t fragment_starting_num,
  212. uint32_t fragments_in_submessage)
  213. {
  214. uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
  215. uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
  216. uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
  217. // Validate fragment indexes
  218. if (last_fragment_index > fragment_count_)
  219. {
  220. return false;
  221. }
  222. // validate lengths
  223. if (last_fragment_index < fragment_count_)
  224. {
  225. if (incoming_data.length < incoming_length)
  226. {
  227. return false;
  228. }
  229. }
  230. else
  231. {
  232. incoming_length = serializedPayload.length - original_offset;
  233. }
  234. if (original_offset + incoming_length > serializedPayload.length)
  235. {
  236. return false;
  237. }
  238. if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
  239. {
  240. memcpy(
  241. &serializedPayload.data[original_offset],
  242. incoming_data.data, incoming_length);
  243. }
  244. return is_fully_assembled();
  245. }
  246. private:
  247. // Fragment size
  248. uint16_t fragment_size_ = 0;
  249. // Number of fragments
  250. uint32_t fragment_count_ = 0;
  251. // First fragment in missing list
  252. uint32_t first_missing_fragment_ = 0;
  253. uint32_t get_next_missing_fragment(
  254. uint32_t fragment_index)
  255. {
  256. uint32_t* ptr = next_fragment_pointer(fragment_index);
  257. return *ptr;
  258. }
  259. void set_next_missing_fragment(
  260. uint32_t fragment_index,
  261. uint32_t next_fragment_index)
  262. {
  263. uint32_t* ptr = next_fragment_pointer(fragment_index);
  264. *ptr = next_fragment_index;
  265. }
  266. uint32_t* next_fragment_pointer(
  267. uint32_t fragment_index)
  268. {
  269. size_t offset = fragment_size_;
  270. offset *= fragment_index;
  271. offset = (offset + 3) & ~3;
  272. return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
  273. }
  274. /*!
  275. * Mark a set of consecutive fragments as received.
  276. * This will remove a set of consecutive fragments from the missing list.
  277. * Should be called BEFORE copying the received data into the serialized payload.
  278. *
  279. * @param initial_fragment Index (0-based) of first received fragment.
  280. * @param num_of_fragments Number of received fragments. Should be strictly positive.
  281. * @return true if the list of missing fragments was modified, false otherwise.
  282. */
  283. bool received_fragments(
  284. uint32_t initial_fragment,
  285. uint32_t num_of_fragments)
  286. {
  287. bool at_least_one_changed = false;
  288. if ( (fragment_size_ > 0) && (initial_fragment < fragment_count_) )
  289. {
  290. uint32_t last_fragment = initial_fragment + num_of_fragments;
  291. if (last_fragment > fragment_count_)
  292. {
  293. last_fragment = fragment_count_;
  294. }
  295. if (initial_fragment <= first_missing_fragment_)
  296. {
  297. // Perform first = *first until first >= last_received
  298. while (first_missing_fragment_ < last_fragment)
  299. {
  300. first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
  301. at_least_one_changed = true;
  302. }
  303. }
  304. else
  305. {
  306. // Find prev in missing list
  307. uint32_t current_frag = first_missing_fragment_;
  308. while (current_frag < initial_fragment)
  309. {
  310. uint32_t next_frag = get_next_missing_fragment(current_frag);
  311. if (next_frag >= initial_fragment)
  312. {
  313. // This is the fragment previous to initial_fragment.
  314. // Find future value for next by repeating next = *next until next >= last_fragment.
  315. uint32_t next_missing_fragment = next_frag;
  316. while (next_missing_fragment < last_fragment)
  317. {
  318. next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
  319. at_least_one_changed = true;
  320. }
  321. // Update next and finish loop
  322. if (at_least_one_changed)
  323. {
  324. set_next_missing_fragment(current_frag, next_missing_fragment);
  325. }
  326. break;
  327. }
  328. current_frag = next_frag;
  329. }
  330. }
  331. }
  332. return at_least_one_changed;
  333. }
  334. };
  335. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  336. /**
  337. * Enum ChangeForReaderStatus_t, possible states for a CacheChange_t in a ReaderProxy.
  338. * @ingroup COMMON_MODULE
  339. */
  340. enum ChangeForReaderStatus_t
  341. {
  342. UNSENT = 0, //!< UNSENT
  343. REQUESTED = 1, //!< REQUESTED
  344. UNACKNOWLEDGED = 2, //!< UNACKNOWLEDGED
  345. ACKNOWLEDGED = 3, //!< ACKNOWLEDGED
  346. UNDERWAY = 4 //!< UNDERWAY
  347. };
  348. /**
  349. * Struct ChangeForReader_t used to represent the state of a specific change with respect to a specific reader, as well as its relevance.
  350. * @ingroup COMMON_MODULE
  351. */
  352. class ChangeForReader_t
  353. {
  354. friend struct ChangeForReaderCmp;
  355. public:
  356. ChangeForReader_t()
  357. : status_(UNSENT)
  358. , is_relevant_(true)
  359. , change_(nullptr)
  360. {
  361. }
  362. ChangeForReader_t(
  363. const ChangeForReader_t& ch)
  364. : status_(ch.status_)
  365. , is_relevant_(ch.is_relevant_)
  366. , seq_num_(ch.seq_num_)
  367. , change_(ch.change_)
  368. , unsent_fragments_(ch.unsent_fragments_)
  369. {
  370. }
  371. //TODO(Ricardo) Temporal
  372. //ChangeForReader_t(const CacheChange_t* change) : status_(UNSENT),
  373. ChangeForReader_t(
  374. CacheChange_t* change)
  375. : status_(UNSENT)
  376. , is_relevant_(true)
  377. , seq_num_(change->sequenceNumber)
  378. , change_(change)
  379. {
  380. if (change->getFragmentSize() != 0)
  381. {
  382. unsent_fragments_.base(1u);
  383. unsent_fragments_.add_range(1u, change->getFragmentCount() + 1u);
  384. }
  385. }
  386. ChangeForReader_t(
  387. const SequenceNumber_t& seq_num)
  388. : status_(UNSENT)
  389. , is_relevant_(true)
  390. , seq_num_(seq_num)
  391. , change_(nullptr)
  392. {
  393. }
  394. ~ChangeForReader_t()
  395. {
  396. }
  397. ChangeForReader_t& operator =(
  398. const ChangeForReader_t& ch)
  399. {
  400. status_ = ch.status_;
  401. is_relevant_ = ch.is_relevant_;
  402. seq_num_ = ch.seq_num_;
  403. change_ = ch.change_;
  404. unsent_fragments_ = ch.unsent_fragments_;
  405. return *this;
  406. }
  407. /**
  408. * Get the cache change
  409. * @return Cache change
  410. */
  411. // TODO(Ricardo) Temporal
  412. //const CacheChange_t* getChange() const
  413. CacheChange_t* getChange() const
  414. {
  415. return change_;
  416. }
  417. void setStatus(
  418. const ChangeForReaderStatus_t status)
  419. {
  420. status_ = status;
  421. }
  422. ChangeForReaderStatus_t getStatus() const
  423. {
  424. return status_;
  425. }
  426. void setRelevance(
  427. const bool relevance)
  428. {
  429. is_relevant_ = relevance;
  430. }
  431. bool isRelevant() const
  432. {
  433. return is_relevant_;
  434. }
  435. const SequenceNumber_t getSequenceNumber() const
  436. {
  437. return seq_num_;
  438. }
  439. //! Set change as not valid
  440. void notValid()
  441. {
  442. is_relevant_ = false;
  443. change_ = nullptr;
  444. }
  445. //! Set change as valid
  446. bool isValid() const
  447. {
  448. return change_ != nullptr;
  449. }
  450. FragmentNumberSet_t getUnsentFragments() const
  451. {
  452. return unsent_fragments_;
  453. }
  454. void markAllFragmentsAsUnsent()
  455. {
  456. if (change_ != nullptr && change_->getFragmentSize() != 0)
  457. {
  458. unsent_fragments_.base(1u);
  459. unsent_fragments_.add_range(1u, change_->getFragmentCount() + 1u);
  460. }
  461. }
  462. void markFragmentsAsSent(
  463. const FragmentNumber_t& sentFragment)
  464. {
  465. unsent_fragments_.remove(sentFragment);
  466. if (!unsent_fragments_.empty() && unsent_fragments_.max() < change_->getFragmentCount())
  467. {
  468. FragmentNumber_t base = unsent_fragments_.base();
  469. FragmentNumber_t max = unsent_fragments_.max();
  470. assert(!unsent_fragments_.is_set(base));
  471. // Update base to first bit set
  472. base = unsent_fragments_.min();
  473. unsent_fragments_.base_update(base);
  474. // Add all possible fragments
  475. unsent_fragments_.add_range(max + 1u, change_->getFragmentCount() + 1u);
  476. }
  477. }
  478. void markFragmentsAsUnsent(
  479. const FragmentNumberSet_t& unsentFragments)
  480. {
  481. FragmentNumber_t other_base = unsentFragments.base();
  482. if (other_base < unsent_fragments_.base())
  483. {
  484. unsent_fragments_.base_update(other_base);
  485. }
  486. unsentFragments.for_each(
  487. [this](
  488. FragmentNumber_t element)
  489. {
  490. unsent_fragments_.add(element);
  491. });
  492. }
  493. private:
  494. //!Status
  495. ChangeForReaderStatus_t status_;
  496. //!Boolean specifying if this change is relevant
  497. bool is_relevant_;
  498. //!Sequence number
  499. SequenceNumber_t seq_num_;
  500. // TODO(Ricardo) Temporal
  501. //const CacheChange_t* change_;
  502. CacheChange_t* change_;
  503. FragmentNumberSet_t unsent_fragments_;
  504. };
  505. struct ChangeForReaderCmp
  506. {
  507. bool operator ()(
  508. const ChangeForReader_t& a,
  509. const ChangeForReader_t& b) const
  510. {
  511. return a.seq_num_ < b.seq_num_;
  512. }
  513. };
  514. #endif
  515. }
  516. }
  517. }
  518. #endif /* _FASTDDS_RTPS_CACHECHANGE_H_ */