StatelessReader.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file StatelessReader.h
  16. */
  17. #ifndef _FASTDDS_RTPS_READER_STATELESSREADER_H_
  18. #define _FASTDDS_RTPS_READER_STATELESSREADER_H_
  19. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  20. #include <fastdds/rtps/reader/RTPSReader.h>
  21. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  22. #include <mutex>
  23. #include <map>
  24. namespace eprosima {
  25. namespace fastrtps {
  26. namespace rtps {
  27. /**
  28. * Class StatelessReader, specialization of the RTPSReader for Best Effort Readers.
  29. * @ingroup READER_MODULE
  30. */
  31. class StatelessReader : public RTPSReader
  32. {
  33. friend class RTPSParticipantImpl;
  34. public:
  35. virtual ~StatelessReader();
  36. protected:
  37. StatelessReader(
  38. RTPSParticipantImpl* pimpl,
  39. const GUID_t& guid,
  40. const ReaderAttributes& att,
  41. ReaderHistory* hist,
  42. ReaderListener* listen = nullptr);
  43. public:
  44. /**
  45. * Add a matched writer represented by a WriterProxyData object.
  46. * @param wdata Pointer to the WPD object to add.
  47. * @return True if correctly added.
  48. */
  49. bool matched_writer_add(
  50. const WriterProxyData& wdata) override;
  51. /**
  52. * Remove a WriterProxyData from the matached writers.
  53. * @param writer_guid GUID of the writer to remove.
  54. * @return True if correct.
  55. */
  56. bool matched_writer_remove(
  57. const GUID_t& writer_guid) override;
  58. /**
  59. * Tells us if a specific Writer is matched against this reader.
  60. * @param writer_guid GUID of the writer to check.
  61. * @return True if it is matched.
  62. */
  63. bool matched_writer_is_matched(
  64. const GUID_t& writer_guid) override;
  65. /**
  66. * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
  67. * @param change Pointer to the CacheChange_t.
  68. * @param prox Pointer to the WriterProxy.
  69. * @return True if correctly removed.
  70. */
  71. bool change_removed_by_history(
  72. CacheChange_t* change,
  73. WriterProxy* prox = nullptr) override;
  74. /**
  75. * Processes a new DATA message.
  76. *
  77. * @param change Pointer to the CacheChange_t.
  78. * @return true if the reader accepts messages from the.
  79. */
  80. bool processDataMsg(
  81. CacheChange_t* change) override;
  82. /**
  83. * Processes a new DATA FRAG message.
  84. *
  85. * @param change Pointer to the CacheChange_t.
  86. * @param sampleSize Size of the complete, assembled message.
  87. * @param fragmentStartingNum Starting number of this particular message.
  88. * @param fragmentsInSubmessage Number of fragments on this particular message.
  89. * @return true if the reader accepts message.
  90. */
  91. bool processDataFragMsg(
  92. CacheChange_t* change,
  93. uint32_t sampleSize,
  94. uint32_t fragmentStartingNum,
  95. uint16_t fragmentsInSubmessage) override;
  96. /**
  97. * Processes a new HEARTBEAT message.
  98. *
  99. * @return true if the reader accepts messages from the.
  100. */
  101. bool processHeartbeatMsg(
  102. const GUID_t& writerGUID,
  103. uint32_t hbCount,
  104. const SequenceNumber_t& firstSN,
  105. const SequenceNumber_t& lastSN,
  106. bool finalFlag,
  107. bool livelinessFlag) override;
  108. bool processGapMsg(
  109. const GUID_t& writerGUID,
  110. const SequenceNumber_t& gapStart,
  111. const SequenceNumberSet_t& gapList) override;
  112. /**
  113. * This method is called when a new change is received. This method calls the received_change of the History
  114. * and depending on the implementation performs different actions.
  115. * @param a_change Pointer of the change to add.
  116. * @return True if added.
  117. */
  118. bool change_received(
  119. CacheChange_t* a_change);
  120. /**
  121. * Read the next unread CacheChange_t from the history
  122. * @param change Pointer to pointer of CacheChange_t
  123. * @param wpout Pointer to pointer of the matched writer proxy
  124. * @return True if read.
  125. */
  126. bool nextUnreadCache(
  127. CacheChange_t** change,
  128. WriterProxy** wpout = nullptr) override;
  129. /**
  130. * Take the next CacheChange_t from the history;
  131. * @param change Pointer to pointer of CacheChange_t
  132. * @param wpout Pointer to pointer of the matched writer proxy
  133. * @return True if read.
  134. */
  135. bool nextUntakenCache(
  136. CacheChange_t** change,
  137. WriterProxy** wpout = nullptr) override;
  138. /**
  139. * Get the number of matched writers
  140. * @return Number of matched writers
  141. */
  142. inline size_t getMatchedWritersSize() const
  143. {
  144. return matched_writers_.size();
  145. }
  146. /*!
  147. * @brief Returns there is a clean state with all Writers.
  148. * StatelessReader allways return true;
  149. * @return true
  150. */
  151. bool isInCleanState() override
  152. {
  153. return true;
  154. }
  155. /**
  156. * Get the RTPS participant
  157. * @return Associated RTPS participant
  158. */
  159. inline RTPSParticipantImpl* getRTPSParticipant() const
  160. {
  161. return mp_RTPSParticipant;
  162. }
  163. private:
  164. struct RemoteWriterInfo_t
  165. {
  166. GUID_t guid;
  167. GUID_t persistence_guid;
  168. bool has_manual_topic_liveliness = false;
  169. CacheChange_t* fragmented_change = nullptr;
  170. };
  171. bool acceptMsgFrom(
  172. const GUID_t& entityId,
  173. ChangeKind_t change_kind);
  174. bool thereIsUpperRecordOf(
  175. const GUID_t& guid,
  176. const SequenceNumber_t& seq);
  177. /**
  178. * @brief Assert liveliness of remote writer
  179. * @param guid The guid of the remote writer
  180. */
  181. void assert_writer_liveliness(
  182. const GUID_t& guid);
  183. /**
  184. * @brief A method to check if a matched writer has manual_by_topic liveliness
  185. * @param guid The guid of the remote writer
  186. * @return True if writer has manual_by_topic livelinesss
  187. */
  188. bool writer_has_manual_liveliness(
  189. const GUID_t& guid);
  190. //!List of GUID_t os matched writers.
  191. //!Is only used in the Discovery, to correctly notify the user using SubscriptionListener::onSubscriptionMatched();
  192. ResourceLimitedVector<RemoteWriterInfo_t> matched_writers_;
  193. };
  194. } /* namespace rtps */
  195. } /* namespace fastrtps */
  196. } /* namespace eprosima */
  197. #endif
  198. #endif /* _FASTDDS_RTPS_READER_STATELESSREADER_H_ */