StatefulWriter.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file StatefulWriter.h
  16. */
  17. #ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
  18. #define _FASTDDS_RTPS_STATEFULWRITER_H_
  19. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  20. #include <fastdds/rtps/writer/RTPSWriter.h>
  21. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  22. #include <condition_variable>
  23. #include <mutex>
  24. namespace eprosima {
  25. namespace fastrtps {
  26. namespace rtps {
  27. class ReaderProxy;
  28. class TimedEvent;
  29. /**
  30. * Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.
  31. * @ingroup WRITER_MODULE
  32. */
  33. class StatefulWriter : public RTPSWriter
  34. {
  35. friend class RTPSParticipantImpl;
  36. friend class ReaderProxy;
  37. public:
  38. //!Destructor
  39. virtual ~StatefulWriter();
  40. protected:
  41. //!Constructor
  42. StatefulWriter(
  43. RTPSParticipantImpl*,
  44. const GUID_t& guid,
  45. const WriterAttributes& att,
  46. WriterHistory* hist,
  47. WriterListener* listen = nullptr);
  48. private:
  49. //!Timed Event to manage the periodic HB to the Reader.
  50. TimedEvent* periodic_hb_event_;
  51. //! Timed Event to manage the Acknack response delay.
  52. TimedEvent* nack_response_event_;
  53. //! A timed event to mark samples as acknowledget (used only if disable positive ACKs QoS is enabled)
  54. TimedEvent* ack_event_;
  55. //!Count of the sent heartbeats.
  56. Count_t m_heartbeatCount;
  57. //!WriterTimes
  58. WriterTimes m_times;
  59. //! Vector containing all the active ReaderProxies.
  60. ResourceLimitedVector<ReaderProxy*> matched_readers_;
  61. //! Vector containing all the inactive, ready for reuse, ReaderProxies.
  62. ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;
  63. using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
  64. using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;
  65. //!To avoid notifying twice of the same sequence number
  66. SequenceNumber_t next_all_acked_notify_sequence_;
  67. // TODO Join this mutex when main mutex would not be recursive.
  68. std::mutex all_acked_mutex_;
  69. std::condition_variable all_acked_cond_;
  70. // TODO Also remove when main mutex not recursive.
  71. bool all_acked_;
  72. std::condition_variable_any may_remove_change_cond_;
  73. unsigned int may_remove_change_;
  74. public:
  75. /**
  76. * Add a specific change to all ReaderLocators.
  77. * @param p Pointer to the change.
  78. * @param max_blocking_time
  79. */
  80. void unsent_change_added_to_history(
  81. CacheChange_t* p,
  82. const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
  83. /**
  84. * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
  85. * @param a_change Pointer to the change that is going to be removed.
  86. * @return True if removed correctly.
  87. */
  88. bool change_removed_by_history(
  89. CacheChange_t* a_change) override;
  90. /**
  91. * Method to indicate that there are changes not sent in some of all ReaderProxy.
  92. */
  93. void send_any_unsent_changes() override;
  94. /**
  95. * Sends a change directly to a intraprocess reader.
  96. */
  97. bool intraprocess_delivery(
  98. CacheChange_t* change,
  99. ReaderProxy* reader_proxy);
  100. bool intraprocess_gap(
  101. ReaderProxy* reader_proxy,
  102. const SequenceNumber_t& seq_num);
  103. bool intraprocess_heartbeat(
  104. ReaderProxy* reader_proxy,
  105. bool liveliness = false);
  106. //!Increment the HB count.
  107. inline void incrementHBCount()
  108. {
  109. ++m_heartbeatCount;
  110. }
  111. /**
  112. * Add a matched reader.
  113. * @param data Pointer to the ReaderProxyData object added.
  114. * @return True if added.
  115. */
  116. bool matched_reader_add(
  117. const ReaderProxyData& data) override;
  118. /**
  119. * Remove a matched reader.
  120. * @param reader_guid GUID of the reader to remove.
  121. * @return True if removed.
  122. */
  123. bool matched_reader_remove(
  124. const GUID_t& reader_guid) override;
  125. /**
  126. * Tells us if a specific Reader is matched against this writer
  127. * @param reader_guid GUID of the reader to check.
  128. * @return True if it was matched.
  129. */
  130. bool matched_reader_is_matched(
  131. const GUID_t& reader_guid) override;
  132. bool is_acked_by_all(
  133. const CacheChange_t* a_change) const override;
  134. bool wait_for_all_acked(
  135. const Duration_t& max_wait) override;
  136. bool all_readers_updated();
  137. /**
  138. * Remove the change with the minimum SequenceNumber
  139. * @return True if removed.
  140. */
  141. bool try_remove_change(
  142. const std::chrono::steady_clock::time_point& max_blocking_time_point,
  143. std::unique_lock<RecursiveTimedMutex>& lock) override;
  144. /**
  145. * Update the Attributes of the Writer.
  146. * @param att New attributes
  147. */
  148. void updateAttributes(
  149. const WriterAttributes& att) override;
  150. /**
  151. * Find a Reader Proxy in this writer.
  152. * @param[in] readerGuid The GUID_t of the reader.
  153. * @param[out] RP Pointer to pointer to return the ReaderProxy.
  154. * @return True if correct.
  155. */
  156. bool matched_reader_lookup(
  157. GUID_t& readerGuid,
  158. ReaderProxy** RP);
  159. /** Get count of heartbeats
  160. * @return count of heartbeats
  161. */
  162. inline Count_t getHeartbeatCount() const
  163. {
  164. return this->m_heartbeatCount;
  165. }
  166. /**
  167. * Get the RTPS participant
  168. * @return RTPS participant
  169. */
  170. inline RTPSParticipantImpl* getRTPSParticipant() const
  171. {
  172. return mp_RTPSParticipant;
  173. }
  174. /**
  175. * Get the number of matched readers
  176. * @return Number of the matched readers
  177. */
  178. inline size_t getMatchedReadersSize() const
  179. {
  180. std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
  181. return matched_readers_.size();
  182. }
  183. /**
  184. * @brief Returns true if disable positive ACKs QoS is enabled
  185. * @return True if positive acks are disabled, false otherwise
  186. */
  187. inline bool get_disable_positive_acks() const
  188. {
  189. return disable_positive_acks_;
  190. }
  191. /**
  192. * Update the WriterTimes attributes of all associated ReaderProxy.
  193. * @param times WriterTimes parameter.
  194. */
  195. void updateTimes(
  196. const WriterTimes& times);
  197. void add_flow_controller(
  198. std::unique_ptr<FlowController> controller) override;
  199. SequenceNumber_t next_sequence_number() const;
  200. /**
  201. * @brief Sends a periodic heartbeat
  202. * @param final Final flag
  203. * @param liveliness Liveliness flag
  204. * @return True on success
  205. */
  206. bool send_periodic_heartbeat(
  207. bool final = false,
  208. bool liveliness = false);
  209. /*!
  210. * @brief Sends a heartbeat to a remote reader.
  211. * @remarks This function is non thread-safe.
  212. */
  213. void send_heartbeat_to_nts(
  214. ReaderProxy& remoteReaderProxy,
  215. bool liveliness = false);
  216. void perform_nack_response();
  217. void perform_nack_supression(
  218. const GUID_t& reader_guid);
  219. /**
  220. * Process an incoming ACKNACK submessage.
  221. * @param[in] writer_guid GUID of the writer the submessage is directed to.
  222. * @param[in] reader_guid GUID of the reader originating the submessage.
  223. * @param[in] ack_count Count field of the submessage.
  224. * @param[in] sn_set Sequence number bitmap field of the submessage.
  225. * @param[in] final_flag Final flag field of the submessage.
  226. * @param[out] result true if the writer could process the submessage.
  227. * Only valid when returned value is true.
  228. * @return true when the submessage was destinated to this writer, false otherwise.
  229. */
  230. bool process_acknack(
  231. const GUID_t& writer_guid,
  232. const GUID_t& reader_guid,
  233. uint32_t ack_count,
  234. const SequenceNumberSet_t& sn_set,
  235. bool final_flag,
  236. bool& result) override;
  237. /**
  238. * Process an incoming NACKFRAG submessage.
  239. * @param[in] writer_guid GUID of the writer the submessage is directed to.
  240. * @param[in] reader_guid GUID of the reader originating the submessage.
  241. * @param[in] ack_count Count field of the submessage.
  242. * @param[in] seq_num Sequence number field of the submessage.
  243. * @param[in] fragments_state Sequence number field of the submessage.
  244. * @param[out] result true if the writer could process the submessage.
  245. * Only valid when returned value is true.
  246. * @return true when the submessage was destinated to this writer, false otherwise.
  247. */
  248. virtual bool process_nack_frag(
  249. const GUID_t& writer_guid,
  250. const GUID_t& reader_guid,
  251. uint32_t ack_count,
  252. const SequenceNumber_t& seq_num,
  253. const FragmentNumberSet_t fragments_state,
  254. bool& result) override;
  255. private:
  256. void update_reader_info(
  257. bool create_sender_resources);
  258. void send_heartbeat_piggyback_nts_(
  259. ReaderProxy* reader,
  260. RTPSMessageGroup& message_group,
  261. uint32_t& last_bytes_processed);
  262. void send_heartbeat_nts_(
  263. size_t number_of_readers,
  264. RTPSMessageGroup& message_group,
  265. bool final,
  266. bool liveliness = false);
  267. void check_acked_status();
  268. /**
  269. * @brief A method called when the ack timer expires
  270. * @details Only used if disable positive ACKs QoS is enabled
  271. */
  272. bool ack_timer_expired();
  273. void send_heartbeat_to_all_readers();
  274. void send_changes_separatedly(
  275. SequenceNumber_t max_sequence,
  276. bool& activateHeartbeatPeriod);
  277. void send_all_intraprocess_changes(
  278. SequenceNumber_t max_sequence);
  279. void send_all_unsent_changes(
  280. SequenceNumber_t max_sequence,
  281. bool& activateHeartbeatPeriod);
  282. void send_unsent_changes_with_flow_control(
  283. SequenceNumber_t max_sequence,
  284. bool& activateHeartbeatPeriod);
  285. bool send_hole_gaps_to_group(
  286. RTPSMessageGroup& group);
  287. //! True to disable piggyback heartbeats
  288. bool disable_heartbeat_piggyback_;
  289. //! True to disable positive ACKs
  290. bool disable_positive_acks_;
  291. //! Keep duration for disable positive ACKs QoS, in microseconds
  292. std::chrono::duration<double, std::ratio<1, 1000000> > keep_duration_us_;
  293. //! Last acknowledged cache change (only used if using disable positive ACKs QoS)
  294. SequenceNumber_t last_sequence_number_;
  295. //! Biggest sequence number removed from history
  296. SequenceNumber_t biggest_removed_sequence_number_;
  297. const uint32_t sendBufferSize_;
  298. int32_t currentUsageSendBufferSize_;
  299. std::vector<std::unique_ptr<FlowController> > m_controllers;
  300. bool there_are_remote_readers_ = false;
  301. bool there_are_local_readers_ = false;
  302. StatefulWriter& operator =(
  303. const StatefulWriter&) = delete;
  304. };
  305. } /* namespace rtps */
  306. } /* namespace fastrtps */
  307. } /* namespace eprosima */
  308. #endif
  309. #endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */