StatefulReader.h 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file StatefulReader.h
  16. */
  17. #ifndef _FASTDDS_RTPS_READER_STATEFULREADER_H_
  18. #define _FASTDDS_RTPS_READER_STATEFULREADER_H_
  19. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  20. #include <fastdds/rtps/reader/RTPSReader.h>
  21. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  22. #include <fastdds/rtps/common/CDRMessage_t.h>
  23. #include <fastdds/rtps/messages/RTPSMessageGroup.h>
  24. #include <mutex>
  25. namespace eprosima {
  26. namespace fastrtps {
  27. namespace rtps {
  28. class WriterProxy;
  29. class RTPSMessageSenderInterface;
  30. /**
  31. * Class StatefulReader, specialization of RTPSReader than stores the state of the matched writers.
  32. * @ingroup READER_MODULE
  33. */
  34. class StatefulReader : public RTPSReader
  35. {
  36. public:
  37. friend class RTPSParticipantImpl;
  38. virtual ~StatefulReader();
  39. protected:
  40. StatefulReader(
  41. RTPSParticipantImpl*,
  42. const GUID_t& guid,
  43. const ReaderAttributes& att,
  44. ReaderHistory* hist,
  45. ReaderListener* listen = nullptr);
  46. public:
  47. /**
  48. * Add a matched writer represented by its attributes.
  49. * @param wdata Attributes of the writer to add.
  50. * @return True if correctly added.
  51. */
  52. bool matched_writer_add(const WriterProxyData& wdata) override;
  53. /**
  54. * Remove a WriterProxyData from the matached writers.
  55. * @param writer_guid GUID of the writer to remove.
  56. * @return True if correct.
  57. */
  58. bool matched_writer_remove(const GUID_t& writer_guid) override;
  59. /**
  60. * Tells us if a specific Writer is matched against this reader.
  61. * @param writer_guid GUID of the writer to check.
  62. * @return True if it is matched.
  63. */
  64. bool matched_writer_is_matched(const GUID_t& writer_guid) override;
  65. /**
  66. * Look for a specific WriterProxy.
  67. * @param writerGUID GUID_t of the writer we are looking for.
  68. * @param WP Pointer to pointer to a WriterProxy.
  69. * @return True if found.
  70. */
  71. bool matched_writer_lookup(
  72. const GUID_t& writerGUID,
  73. WriterProxy** WP);
  74. /**
  75. * Processes a new DATA message.
  76. * @param change Pointer to the CacheChange_t.
  77. * @return true if the reader accepts messages.
  78. */
  79. bool processDataMsg(CacheChange_t* change) override;
  80. /**
  81. * Processes a new DATA FRAG message.
  82. *
  83. * @param change Pointer to the CacheChange_t.
  84. * @param sampleSize Size of the complete, assembled message.
  85. * @param fragmentStartingNum Starting number of this particular message.
  86. * @param fragmentsInSubmessage Number of fragments on this particular message.
  87. * @return true if the reader accepts message.
  88. */
  89. bool processDataFragMsg(
  90. CacheChange_t* change,
  91. uint32_t sampleSize,
  92. uint32_t fragmentStartingNum,
  93. uint16_t fragmentsInSubmessage) override;
  94. /**
  95. * Processes a new HEARTBEAT message.
  96. *
  97. * @return true if the reader accepts messages.
  98. */
  99. bool processHeartbeatMsg(
  100. const GUID_t& writerGUID,
  101. uint32_t hbCount,
  102. const SequenceNumber_t& firstSN,
  103. const SequenceNumber_t& lastSN,
  104. bool finalFlag,
  105. bool livelinessFlag) override;
  106. bool processGapMsg(
  107. const GUID_t& writerGUID,
  108. const SequenceNumber_t& gapStart,
  109. const SequenceNumberSet_t& gapList) override;
  110. /**
  111. * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
  112. * @param change Pointer to the CacheChange_t.
  113. * @param prox Pointer to the WriterProxy.
  114. * @return True if correctly removed.
  115. */
  116. bool change_removed_by_history(
  117. CacheChange_t* change,
  118. WriterProxy* prox = nullptr) override;
  119. /**
  120. * This method is called when a new change is received. This method calls the received_change of the History
  121. * and depending on the implementation performs different actions.
  122. * @param a_change Pointer of the change to add.
  123. * @param prox Pointer to the WriterProxy that adds the Change.
  124. * @return True if added.
  125. */
  126. bool change_received(
  127. CacheChange_t* a_change,
  128. WriterProxy* prox);
  129. /**
  130. * Get the RTPS participant
  131. * @return Associated RTPS participant
  132. */
  133. inline RTPSParticipantImpl* getRTPSParticipant() const
  134. {
  135. return mp_RTPSParticipant;
  136. }
  137. /**
  138. * Read the next unread CacheChange_t from the history
  139. * @param change Pointer to pointer of CacheChange_t
  140. * @param wpout Pointer to pointer the matched writer proxy
  141. * @return True if read.
  142. */
  143. bool nextUnreadCache(
  144. CacheChange_t** change,
  145. WriterProxy** wpout = nullptr) override;
  146. /**
  147. * Take the next CacheChange_t from the history;
  148. * @param change Pointer to pointer of CacheChange_t
  149. * @param wpout Pointer to pointer the matched writer proxy
  150. * @return True if read.
  151. */
  152. bool nextUntakenCache(
  153. CacheChange_t** change,
  154. WriterProxy** wpout = nullptr) override;
  155. /**
  156. * Update the times parameters of the Reader.
  157. * @param times ReaderTimes reference.
  158. * @return True if correctly updated.
  159. */
  160. bool updateTimes(const ReaderTimes& times);
  161. /**
  162. *
  163. * @return Reference to the ReaderTimes.
  164. */
  165. inline ReaderTimes& getTimes()
  166. {
  167. return times_;
  168. }
  169. /**
  170. * Get the number of matched writers
  171. * @return Number of matched writers
  172. */
  173. inline size_t getMatchedWritersSize() const
  174. {
  175. return matched_writers_.size();
  176. }
  177. /*!
  178. * @brief Returns there is a clean state with all Writers.
  179. * It occurs when the Reader received all samples sent by Writers. In other words,
  180. * its WriterProxies are up to date.
  181. * @return There is a clean state with all Writers.
  182. */
  183. bool isInCleanState() override;
  184. /**
  185. * Sends an acknack message from this reader.
  186. * @param writer Pointer to the info of the remote writer.
  187. * @param sns Sequence number bitmap with the acknack information.
  188. * @param sender Message sender interface.
  189. * @param is_final Value for final flag.
  190. */
  191. void send_acknack(
  192. const WriterProxy* writer,
  193. const SequenceNumberSet_t& sns,
  194. const RTPSMessageSenderInterface& sender,
  195. bool is_final);
  196. /**
  197. * Sends an acknack message from this reader in response to a heartbeat.
  198. * @param writer Pointer to the proxy representing the writer to send the acknack to.
  199. * @param sender Message sender interface.
  200. * @param heartbeat_was_final Final flag of the last received heartbeat.
  201. */
  202. void send_acknack(
  203. const WriterProxy* writer,
  204. const RTPSMessageSenderInterface& sender,
  205. bool heartbeat_was_final);
  206. /**
  207. *Use the participant of this reader to send a message to certain locator.
  208. *@param message Message to be sent.
  209. *@param locators_begin Destination locators iterator begin.
  210. *@param locators_end Destination locators iterator end.
  211. *@param max_blocking_time_point Future time point where any blocking should end.
  212. */
  213. bool send_sync_nts(
  214. CDRMessage_t* message,
  215. const Locators& locators_begin,
  216. const Locators& locators_end,
  217. std::chrono::steady_clock::time_point& max_blocking_time_point);
  218. private:
  219. bool acceptMsgFrom(
  220. const GUID_t& entityGUID,
  221. WriterProxy** wp) const;
  222. /*!
  223. * @remarks Nn thread-safe.
  224. */
  225. bool findWriterProxy(
  226. const GUID_t& writerGUID,
  227. WriterProxy** wp) const;
  228. void NotifyChanges(WriterProxy* wp);
  229. //! Acknack Count
  230. uint32_t acknack_count_;
  231. //! NACKFRAG Count
  232. uint32_t nackfrag_count_;
  233. //!ReaderTimes of the StatefulReader.
  234. ReaderTimes times_;
  235. //! Vector containing pointers to all the active WriterProxies.
  236. ResourceLimitedVector<WriterProxy*> matched_writers_;
  237. //! Vector containing pointers to all the inactive, ready for reuse, WriterProxies.
  238. ResourceLimitedVector<WriterProxy*> matched_writers_pool_;
  239. //!
  240. ResourceLimitedContainerConfig proxy_changes_config_;
  241. //! True to disable positive ACKs
  242. bool disable_positive_acks_;
  243. //! False when being destroyed
  244. bool is_alive_;
  245. };
  246. } /* namespace rtps */
  247. } /* namespace fastrtps */
  248. } /* namespace eprosima */
  249. #endif
  250. #endif // _FASTDDS_RTPS_READER_STATEFULREADER_H_