RTPSReader.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file RTPSReader.h
  16. */
  17. #ifndef _FASTDDS_RTPS_READER_RTPSREADER_H_
  18. #define _FASTDDS_RTPS_READER_RTPSREADER_H_
  19. #include <fastdds/rtps/Endpoint.h>
  20. #include <fastdds/rtps/attributes/ReaderAttributes.h>
  21. #include <fastdds/rtps/common/SequenceNumber.h>
  22. #include <fastrtps/qos/LivelinessChangedStatus.h>
  23. #include <fastdds/rtps/common/Time_t.h>
  24. #include <fastdds/rtps/builtin/data/WriterProxyData.h>
  25. #include <fastrtps/utils/TimedConditionVariable.hpp>
  26. #include "../history/ReaderHistory.h"
  27. namespace eprosima {
  28. namespace fastrtps {
  29. namespace rtps {
  30. // Forward declarations
  31. class LivelinessManager;
  32. class ReaderListener;
  33. class WriterProxy;
  34. struct CacheChange_t;
  35. struct ReaderHistoryState;
  36. class WriterProxyData;
  37. /**
  38. * Class RTPSReader, manages the reception of data from its matched writers.
  39. * @ingroup READER_MODULE
  40. */
  41. class RTPSReader : public Endpoint
  42. {
  43. friend class ReaderHistory;
  44. friend class RTPSParticipantImpl;
  45. friend class MessageReceiver;
  46. friend class EDP;
  47. friend class WLP;
  48. protected:
  49. RTPSReader(
  50. RTPSParticipantImpl*,
  51. const GUID_t& guid,
  52. const ReaderAttributes& att,
  53. ReaderHistory* hist,
  54. ReaderListener* listen = nullptr);
  55. virtual ~RTPSReader();
  56. public:
  57. /**
  58. * Add a matched writer represented by its attributes.
  59. * @param wdata Attributes of the writer to add.
  60. * @return True if correctly added.
  61. */
  62. RTPS_DllAPI virtual bool matched_writer_add(
  63. const WriterProxyData& wdata) = 0;
  64. /**
  65. * Remove a writer represented by its attributes from the matched writers.
  66. * @param writer_guid GUID of the writer to remove.
  67. * @return True if correctly removed.
  68. */
  69. RTPS_DllAPI virtual bool matched_writer_remove(
  70. const GUID_t& writer_guid) = 0;
  71. /**
  72. * Tells us if a specific Writer is matched against this reader.
  73. * @param writer_guid GUID of the writer to check.
  74. * @return True if it is matched.
  75. */
  76. RTPS_DllAPI virtual bool matched_writer_is_matched(
  77. const GUID_t& writer_guid) = 0;
  78. /**
  79. * Processes a new DATA message. Previously the message must have been accepted by function acceptMsgDirectedTo.
  80. *
  81. * @param change Pointer to the CacheChange_t.
  82. * @return true if the reader accepts messages from the.
  83. */
  84. RTPS_DllAPI virtual bool processDataMsg(
  85. CacheChange_t* change) = 0;
  86. /**
  87. * Processes a new DATA FRAG message.
  88. *
  89. * @param change Pointer to the CacheChange_t.
  90. * @param sampleSize Size of the complete, assembled message.
  91. * @param fragmentStartingNum Starting number of this particular message.
  92. * @param fragmentsInSubmessage Number of fragments on this particular message.
  93. * @return true if the reader accepts message.
  94. */
  95. RTPS_DllAPI virtual bool processDataFragMsg(
  96. CacheChange_t* change,
  97. uint32_t sampleSize,
  98. uint32_t fragmentStartingNum,
  99. uint16_t fragmentsInSubmessage) = 0;
  100. /**
  101. * Processes a new HEARTBEAT message.
  102. * @param writerGUID
  103. * @param hbCount
  104. * @param firstSN
  105. * @param lastSN
  106. * @param finalFlag
  107. * @param livelinessFlag
  108. * @return true if the reader accepts messages from the.
  109. */
  110. RTPS_DllAPI virtual bool processHeartbeatMsg(
  111. const GUID_t& writerGUID,
  112. uint32_t hbCount,
  113. const SequenceNumber_t& firstSN,
  114. const SequenceNumber_t& lastSN,
  115. bool finalFlag,
  116. bool livelinessFlag) = 0;
  117. /**
  118. * Processes a new GAP message.
  119. * @param writerGUID
  120. * @param gapStart
  121. * @param gapList
  122. * @return true if the reader accepts messages from the.
  123. */
  124. RTPS_DllAPI virtual bool processGapMsg(
  125. const GUID_t& writerGUID,
  126. const SequenceNumber_t& gapStart,
  127. const SequenceNumberSet_t& gapList) = 0;
  128. /**
  129. * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
  130. * @param change Pointer to the CacheChange_t.
  131. * @param prox Pointer to the WriterProxy.
  132. * @return True if correctly removed.
  133. */
  134. RTPS_DllAPI virtual bool change_removed_by_history(
  135. CacheChange_t* change,
  136. WriterProxy* prox = nullptr) = 0;
  137. /**
  138. * Get the associated listener, secondary attached Listener in case it is of coumpound type
  139. * @return Pointer to the associated reader listener.
  140. */
  141. RTPS_DllAPI ReaderListener* getListener() const;
  142. /**
  143. * Switch the ReaderListener kind for the Reader.
  144. * If the RTPSReader does not belong to the built-in protocols it switches out the old one.
  145. * If it belongs to the built-in protocols, it sets the new ReaderListener callbacks to be called after the
  146. * built-in ReaderListener ones.
  147. * @param target Pointed to ReaderLister to attach
  148. * @return True is correctly set.
  149. */
  150. RTPS_DllAPI bool setListener(
  151. ReaderListener* target);
  152. /**
  153. * Reserve a CacheChange_t.
  154. * @param change Pointer to pointer to the Cache.
  155. * @param dataCdrSerializedSize Size of the Cache.
  156. * @return True if correctly reserved.
  157. */
  158. RTPS_DllAPI bool reserveCache(
  159. CacheChange_t** change,
  160. uint32_t dataCdrSerializedSize);
  161. /**
  162. * Release a cacheChange.
  163. */
  164. RTPS_DllAPI void releaseCache(
  165. CacheChange_t* change);
  166. /**
  167. * Read the next unread CacheChange_t from the history
  168. * @param change Pointer to pointer of CacheChange_t
  169. * @param wp Pointer to pointer to the WriterProxy
  170. * @return True if read.
  171. */
  172. RTPS_DllAPI virtual bool nextUnreadCache(
  173. CacheChange_t** change,
  174. WriterProxy** wp) = 0;
  175. /**
  176. * Get the next CacheChange_t from the history to take.
  177. * @param change Pointer to pointer of CacheChange_t.
  178. * @param wp Pointer to pointer to the WriterProxy.
  179. * @return True if read.
  180. */
  181. RTPS_DllAPI virtual bool nextUntakenCache(
  182. CacheChange_t** change,
  183. WriterProxy** wp) = 0;
  184. RTPS_DllAPI bool wait_for_unread_cache(
  185. const eprosima::fastrtps::Duration_t& timeout);
  186. RTPS_DllAPI uint64_t get_unread_count() const;
  187. /**
  188. * @return True if the reader expects Inline QOS.
  189. */
  190. RTPS_DllAPI inline bool expectsInlineQos()
  191. {
  192. return m_expectsInlineQos;
  193. }
  194. //! Returns a pointer to the associated History.
  195. RTPS_DllAPI inline ReaderHistory* getHistory()
  196. {
  197. return mp_history;
  198. }
  199. /*!
  200. * @brief Returns there is a clean state with all Writers.
  201. * It occurs when the Reader received all samples sent by Writers. In other words,
  202. * its WriterProxies are up to date.
  203. * @return There is a clean state with all Writers.
  204. */
  205. virtual bool isInCleanState() = 0;
  206. //! The liveliness changed status struct as defined in the DDS
  207. LivelinessChangedStatus liveliness_changed_status_;
  208. inline void enableMessagesFromUnkownWriters(
  209. bool enable)
  210. {
  211. m_acceptMessagesFromUnkownWriters = enable;
  212. }
  213. void setTrustedWriter(
  214. const EntityId_t& writer)
  215. {
  216. m_acceptMessagesFromUnkownWriters = false;
  217. m_trustedWriterEntityId = writer;
  218. }
  219. protected:
  220. /*!
  221. * @brief Add a remote writer to the persistence_guid map
  222. * @param guid GUID of the remote writer
  223. * @param persistence_guid Persistence GUID of the remote writer
  224. */
  225. void add_persistence_guid(
  226. const GUID_t& guid,
  227. const GUID_t& persistence_guid);
  228. /*!
  229. * @brief Remove a remote writer from the persistence_guid map
  230. * @param guid GUID of the remote writer
  231. * @param persistence_guid Persistence GUID of the remote writer
  232. */
  233. void remove_persistence_guid(
  234. const GUID_t& guid,
  235. const GUID_t& persistence_guid);
  236. /*!
  237. * @brief Get the last notified sequence for a RTPS guid
  238. * @param guid The RTPS guid to query
  239. * @return Last notified sequence number for input guid
  240. * @remarks Takes persistence_guid into consideration
  241. */
  242. SequenceNumber_t get_last_notified(
  243. const GUID_t& guid);
  244. /*!
  245. * @brief Update the last notified sequence for a RTPS guid
  246. * @param guid The RTPS guid of the writer
  247. * @param seq Max sequence number available on writer
  248. * @return Previous value of last notified sequence number for input guid
  249. * @remarks Takes persistence_guid into consideration
  250. */
  251. SequenceNumber_t update_last_notified(
  252. const GUID_t& guid,
  253. const SequenceNumber_t& seq);
  254. /*!
  255. * @brief Set the last notified sequence for a persistence guid
  256. * @param persistence_guid The persistence guid to update
  257. * @param seq Sequence number to set for input guid
  258. * @remarks Persistent readers will write to DB
  259. */
  260. virtual void set_last_notified(
  261. const GUID_t& persistence_guid,
  262. const SequenceNumber_t& seq);
  263. /*!
  264. * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t,
  265. * waiting to be completed because it is fragmented.
  266. * @param sequence_number SequenceNumber_t of the searched CacheChange_t.
  267. * @param writer_guid writer GUID_t of the searched CacheChange_t.
  268. * @param change If a CacheChange_t was found, this argument will fill with its pointer.
  269. * In other case nullptr is returned.
  270. * @param hint Iterator since the search will start.
  271. * Used to improve the search.
  272. * @return Iterator pointing to the position were CacheChange_t was found.
  273. * It can be used to improve next search.
  274. */
  275. History::const_iterator findCacheInFragmentedProcess(
  276. const SequenceNumber_t& sequence_number,
  277. const GUID_t& writer_guid,
  278. CacheChange_t** change,
  279. History::const_iterator hint) const;
  280. //!ReaderHistory
  281. ReaderHistory* mp_history;
  282. //!Listener
  283. ReaderListener* mp_listener;
  284. //!Accept msg to unknwon readers (default=true)
  285. bool m_acceptMessagesToUnknownReaders;
  286. //!Accept msg from unknwon writers (BE-true,RE-false)
  287. bool m_acceptMessagesFromUnkownWriters;
  288. //!Trusted writer (for Builtin)
  289. EntityId_t m_trustedWriterEntityId;
  290. //!Expects Inline Qos.
  291. bool m_expectsInlineQos;
  292. //!ReaderHistoryState
  293. ReaderHistoryState* history_state_;
  294. uint64_t total_unread_ = 0;
  295. TimedConditionVariable new_notification_cv_;
  296. //! The liveliness kind of this reader
  297. LivelinessQosPolicyKind liveliness_kind_;
  298. //! The liveliness lease duration of this reader
  299. Duration_t liveliness_lease_duration_;
  300. private:
  301. RTPSReader& operator =(
  302. const RTPSReader&) = delete;
  303. };
  304. } /* namespace rtps */
  305. } /* namespace fastrtps */
  306. } /* namespace eprosima */
  307. #endif /* _FASTDDS_RTPS_READER_RTPSREADER_H_ */