ReaderProxy.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. // Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file ReaderProxy.h
  16. */
  17. #ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
  18. #define _FASTDDS_RTPS_WRITER_READERPROXY_H_
  19. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  20. #include <algorithm>
  21. #include <mutex>
  22. #include <set>
  23. #include <atomic>
  24. #include <fastdds/rtps/builtin/data/ReaderProxyData.h>
  25. #include <fastdds/rtps/writer/ReaderLocator.h>
  26. #include <fastdds/rtps/common/Types.h>
  27. #include <fastdds/rtps/common/Locator.h>
  28. #include <fastdds/rtps/common/SequenceNumber.h>
  29. #include <fastdds/rtps/common/CacheChange.h>
  30. #include <fastdds/rtps/common/FragmentNumber.h>
  31. #include <fastdds/rtps/attributes/WriterAttributes.h>
  32. #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
  33. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  34. namespace eprosima {
  35. namespace fastrtps {
  36. namespace rtps {
  37. class StatefulWriter;
  38. class TimedEvent;
  39. class RTPSReader;
  40. /**
  41. * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.
  42. * @ingroup WRITER_MODULE
  43. */
  44. class ReaderProxy
  45. {
  46. public:
  47. ~ReaderProxy();
  48. /**
  49. * Constructor.
  50. * @param times WriterTimes to use in the ReaderProxy.
  51. * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
  52. * @param writer Pointer to the StatefulWriter creating the reader proxy.
  53. */
  54. ReaderProxy(
  55. const WriterTimes& times,
  56. const RemoteLocatorsAllocationAttributes& loc_alloc,
  57. StatefulWriter* writer);
  58. /**
  59. * Activate this proxy associating it to a remote reader.
  60. * @param reader_attributes ReaderProxyData of the reader for which to keep state.
  61. */
  62. void start(
  63. const ReaderProxyData& reader_attributes);
  64. /**
  65. * Update information about the remote reader.
  66. * @param reader_attributes ReaderProxyData with updated information of the reader.
  67. * @return true if data was modified, false otherwise.
  68. */
  69. bool update(
  70. const ReaderProxyData& reader_attributes);
  71. /**
  72. * Disable this proxy.
  73. */
  74. void stop();
  75. /**
  76. * Called when a change is added to the writer's history.
  77. * @param change Information regarding the change added.
  78. * @param restart_nack_supression Whether nack-supression event should be restarted.
  79. */
  80. void add_change(
  81. const ChangeForReader_t& change,
  82. bool restart_nack_supression);
  83. void add_change(
  84. const ChangeForReader_t& change,
  85. bool restart_nack_supression,
  86. const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
  87. /**
  88. * Check if there are changes pending for this reader.
  89. * @return true when there are pending changes, false otherwise.
  90. */
  91. bool has_changes() const;
  92. /**
  93. * Check if a specific change has been already acknowledged for this reader.
  94. * @param seq_num Sequence number of the change to be checked.
  95. * @return true when the change is irrelevant or has been already acknowledged, false otherwise.
  96. */
  97. bool change_is_acked(
  98. const SequenceNumber_t& seq_num) const;
  99. /**
  100. * Check if a specific change is marked to be sent to this reader.
  101. * @param[in] seq_num Sequence number of the change to be checked.
  102. * @param[out] is_irrelevant Will be forced to true if change is irrelevant for this reader.
  103. * @return true when the change is marked to be sent, false otherwise.
  104. */
  105. bool change_is_unsent(
  106. const SequenceNumber_t& seq_num,
  107. bool& is_irrelevant) const;
  108. /**
  109. * Mark all changes up to the one indicated by seq_num as Acknowledged.
  110. * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
  111. * @param seq_num Sequence number of the first change not to be marked as acknowledged.
  112. */
  113. void acked_changes_set(
  114. const SequenceNumber_t& seq_num);
  115. /**
  116. * Mark all changes in the vector as requested.
  117. * @param seq_num_set Bitmap of sequence numbers.
  118. * @return true if at least one change has been marked as REQUESTED, false otherwise.
  119. */
  120. bool requested_changes_set(
  121. const SequenceNumberSet_t& seq_num_set);
  122. /**
  123. * Performs processing of preemptive acknack
  124. * @return true if a heartbeat should be sent, false otherwise.
  125. */
  126. bool process_initial_acknack();
  127. /**
  128. * Applies the given function object to every unsent change.
  129. * @param max_seq Maximum sequence number to be considered without including it.
  130. * @param f Function to apply.
  131. * Will receive a SequenceNumber_t and a ChangeForReader_t*.
  132. * The second argument may be nullptr for irrelevant changes.
  133. */
  134. template <class BinaryFunction>
  135. void for_each_unsent_change(
  136. const SequenceNumber_t& max_seq,
  137. BinaryFunction f) const
  138. {
  139. if (!changes_for_reader_.empty())
  140. {
  141. SequenceNumber_t current_seq = changes_low_mark_ + 1;
  142. ChangeConstIterator it = changes_for_reader_.begin();
  143. while (it != changes_for_reader_.end())
  144. {
  145. // Holes before this change are informed as irrelevant.
  146. SequenceNumber_t change_seq = it->getSequenceNumber();
  147. for (; current_seq < change_seq; ++current_seq)
  148. {
  149. f(current_seq, nullptr);
  150. }
  151. // We then inform of this change if it is unsent, and go to the next one.
  152. if (it->getStatus() == UNSENT)
  153. {
  154. f(current_seq, &(*it));
  155. }
  156. ++current_seq;
  157. ++it;
  158. }
  159. // After the last change has been checked, there may be a hole at the end.
  160. for (; current_seq < max_seq; ++current_seq)
  161. {
  162. f(current_seq, nullptr);
  163. }
  164. }
  165. else
  166. {
  167. // This may be entered if all changes where removed before being acknowledged.
  168. for (SequenceNumber_t seq = changes_low_mark_ + 1; seq < max_seq; ++seq)
  169. {
  170. f(seq, nullptr);
  171. }
  172. }
  173. }
  174. /*!
  175. * @brief Sets a change to a particular status (if present in the ReaderProxy)
  176. * @param seq_num Sequence number of the change to update.
  177. * @param status Status to apply.
  178. * @param restart_nack_supression Whether nack supression event should be restarted or not.
  179. * @return true when a status has changed, false otherwise.
  180. */
  181. bool set_change_to_status(
  182. const SequenceNumber_t& seq_num,
  183. ChangeForReaderStatus_t status,
  184. bool restart_nack_supression);
  185. /**
  186. * @brief Mark a particular fragment as sent.
  187. * @param[in] seq_num Sequence number of the change to update.
  188. * @param[in] frag_num Fragment number to mark as sent.
  189. * @param[out] was_last_fragment Indicates if the fragment was the last one pending.
  190. * @return true when the change was found, false otherwise.
  191. */
  192. bool mark_fragment_as_sent_for_change(
  193. const SequenceNumber_t& seq_num,
  194. FragmentNumber_t frag_num,
  195. bool& was_last_fragment);
  196. /**
  197. * Turns all UNDERWAY changes into UNACKNOWLEDGED.
  198. * @return true if at least one change changed its status, false otherwise.
  199. */
  200. bool perform_nack_supression();
  201. /**
  202. * Turns all REQUESTED changes into UNSENT.
  203. * @return true if at least one change changed its status, false otherwise.
  204. */
  205. bool perform_acknack_response();
  206. /**
  207. * Call this to inform a change was removed from history.
  208. * @param seq_num Sequence number of the removed change.
  209. */
  210. void change_has_been_removed(
  211. const SequenceNumber_t& seq_num);
  212. /*!
  213. * @brief Returns there is some UNACKNOWLEDGED change.
  214. * @return There is some UNACKNOWLEDGED change.
  215. */
  216. bool has_unacknowledged() const;
  217. /**
  218. * Get the GUID of the reader represented by this proxy.
  219. * @return the GUID of the reader represented by this proxy.
  220. */
  221. inline const GUID_t& guid() const
  222. {
  223. return locator_info_.remote_guid();
  224. }
  225. /**
  226. * Get the durability of the reader represented by this proxy.
  227. * @return the durability of the reader represented by this proxy.
  228. */
  229. inline DurabilityKind_t durability_kind() const
  230. {
  231. return durability_kind_;
  232. }
  233. /**
  234. * Check if the reader represented by this proxy expexts inline QOS to be received.
  235. * @return true if the reader represented by this proxy expexts inline QOS to be received.
  236. */
  237. inline bool expects_inline_qos() const
  238. {
  239. return expects_inline_qos_;
  240. }
  241. /**
  242. * Check if the reader represented by this proxy is reliable.
  243. * @return true if the reader represented by this proxy is reliable.
  244. */
  245. inline bool is_reliable() const
  246. {
  247. return is_reliable_;
  248. }
  249. inline bool disable_positive_acks() const
  250. {
  251. return disable_positive_acks_;
  252. }
  253. /**
  254. * Check if the reader represented by this proxy is remote and reliable.
  255. * @return true if the reader represented by this proxy is remote and reliable.
  256. */
  257. inline bool is_remote_and_reliable() const
  258. {
  259. return !locator_info_.is_local_reader() && is_reliable_;
  260. }
  261. /**
  262. * Check if the reader is on the same process.
  263. * @return true if the reader is no the same process.
  264. */
  265. inline bool is_local_reader()
  266. {
  267. return locator_info_.is_local_reader();
  268. }
  269. /**
  270. * Get the local reader on the same process (if any).
  271. * @return The local reader on the same process.
  272. */
  273. inline RTPSReader* local_reader()
  274. {
  275. return locator_info_.local_reader();
  276. }
  277. /**
  278. * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
  279. * @param acknack_count The count of the received ACKNACK.
  280. * @return true if internal count changed (i.e. new ACKNACK is accepted)
  281. */
  282. bool check_and_set_acknack_count(
  283. uint32_t acknack_count)
  284. {
  285. if (last_acknack_count_ < acknack_count)
  286. {
  287. last_acknack_count_ = acknack_count;
  288. return true;
  289. }
  290. return false;
  291. }
  292. /**
  293. * Process an incoming NACKFRAG submessage.
  294. * @param reader_guid Destination guid of the submessage.
  295. * @param nack_count Counter field of the submessage.
  296. * @param seq_num Sequence number field of the submessage.
  297. * @param fragments_state Bitmap indicating the requested fragments.
  298. * @return true if a change was modified, false otherwise.
  299. */
  300. bool process_nack_frag(
  301. const GUID_t& reader_guid,
  302. uint32_t nack_count,
  303. const SequenceNumber_t& seq_num,
  304. const FragmentNumberSet_t& fragments_state);
  305. /**
  306. * Filter a CacheChange_t, in this version always returns true.
  307. * @param change
  308. * @return true if the change is relevant, false otherwise.
  309. */
  310. inline bool rtps_is_relevant(
  311. CacheChange_t* change)
  312. {
  313. (void)change; return true;
  314. }
  315. /**
  316. * Get the highest fully acknowledged sequence number.
  317. * @return the highest fully acknowledged sequence number.
  318. */
  319. SequenceNumber_t changes_low_mark() const
  320. {
  321. return changes_low_mark_;
  322. }
  323. /**
  324. * Change the interval of nack-supression event.
  325. * @param interval Time from data sending to acknack processing.
  326. */
  327. void update_nack_supression_interval(
  328. const Duration_t& interval);
  329. /**
  330. * Check if there are gaps in the list of ChangeForReader_t.
  331. * @return True if there are gaps, else false.
  332. */
  333. bool are_there_gaps();
  334. LocatorSelectorEntry* locator_selector_entry()
  335. {
  336. return locator_info_.locator_selector_entry();
  337. }
  338. const RTPSMessageSenderInterface& message_sender() const
  339. {
  340. return locator_info_;
  341. }
  342. private:
  343. //!Is this proxy active? I.e. does it have a remote reader associated?
  344. bool is_active_;
  345. //!Reader locator information
  346. ReaderLocator locator_info_;
  347. //!Taken from QoS
  348. DurabilityKind_t durability_kind_;
  349. //!Taken from QoS
  350. bool expects_inline_qos_;
  351. //!Taken from QoS
  352. bool is_reliable_;
  353. //!Taken from QoS
  354. bool disable_positive_acks_;
  355. //!Pointer to the associated StatefulWriter.
  356. StatefulWriter* writer_;
  357. //!Set of the changes and its state.
  358. ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
  359. //! Timed Event to manage the delay to mark a change as UNACKED after sending it.
  360. TimedEvent* nack_supression_event_;
  361. TimedEvent* initial_heartbeat_event_;
  362. //! Are timed events enabled?
  363. std::atomic_bool timers_enabled_;
  364. //! Last ack/nack count
  365. uint32_t last_acknack_count_;
  366. //! Last NACKFRAG count.
  367. uint32_t last_nackfrag_count_;
  368. SequenceNumber_t changes_low_mark_;
  369. using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
  370. using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
  371. void disable_timers();
  372. /*
  373. * Converts all changes with a given status to a different status.
  374. * @param previous Status to change.
  375. * @param next Status to adopt.
  376. * @return true when at least one change has been modified, false otherwise.
  377. */
  378. bool convert_status_on_all_changes(
  379. ChangeForReaderStatus_t previous,
  380. ChangeForReaderStatus_t next);
  381. /*!
  382. * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay.
  383. * @param[in] seq_num Sequence number to be paired with the requested fragments.
  384. * @param[in] frag_set set containing the requested fragments to be sent.
  385. * @return True if there is at least one requested fragment. False in other case.
  386. */
  387. bool requested_fragment_set(
  388. const SequenceNumber_t& seq_num,
  389. const FragmentNumberSet_t& frag_set);
  390. void add_change(
  391. const ChangeForReader_t& change);
  392. /**
  393. * @brief Find a change with the specified sequence number.
  394. * @param seq_num Sequence number to find.
  395. * @param exact When false, the first change with a sequence number not less than seq_num will be returned.
  396. * When true, the change with a sequence number value of seq_num will be returned.
  397. * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
  398. */
  399. ChangeIterator find_change(
  400. const SequenceNumber_t& seq_num,
  401. bool exact);
  402. /**
  403. * @brief Find a change with the specified sequence number.
  404. * @param seq_num Sequence number to find.
  405. * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
  406. */
  407. ChangeConstIterator find_change(
  408. const SequenceNumber_t& seq_num) const;
  409. };
  410. } /* namespace rtps */
  411. } /* namespace fastrtps */
  412. } /* namespace eprosima */
  413. #endif
  414. #endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */