RTPSWriter.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file RTPSWriter.h
  16. */
  17. #ifndef _FASTDDS_RTPS_RTPSWRITER_H_
  18. #define _FASTDDS_RTPS_RTPSWRITER_H_
  19. #include <fastdds/rtps/Endpoint.h>
  20. #include <fastdds/rtps/messages/RTPSMessageGroup.h>
  21. #include <fastdds/rtps/attributes/WriterAttributes.h>
  22. #include <fastrtps/qos/LivelinessLostStatus.h>
  23. #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
  24. #include <fastdds/rtps/common/LocatorSelector.hpp>
  25. #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
  26. #include <vector>
  27. #include <memory>
  28. #include <functional>
  29. #include <chrono>
  30. #include <mutex>
  31. namespace eprosima {
  32. namespace fastrtps {
  33. namespace rtps {
  34. class WriterListener;
  35. class WriterHistory;
  36. class FlowController;
  37. struct CacheChange_t;
  38. /**
  39. * Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.
  40. * @ingroup WRITER_MODULE
  41. */
  42. class RTPSWriter : public Endpoint, public RTPSMessageSenderInterface
  43. {
  44. friend class WriterHistory;
  45. friend class RTPSParticipantImpl;
  46. friend class RTPSMessageGroup;
  47. friend class AsyncInterestTree;
  48. protected:
  49. RTPSWriter(
  50. RTPSParticipantImpl*,
  51. const GUID_t& guid,
  52. const WriterAttributes& att,
  53. WriterHistory* hist,
  54. WriterListener* listen = nullptr);
  55. virtual ~RTPSWriter();
  56. public:
  57. /**
  58. * Create a new change based with the provided changeKind.
  59. * @param data Data of the change.
  60. * @param changeKind The type of change.
  61. * @param handle InstanceHandle to assign.
  62. * @return Pointer to the CacheChange or nullptr if incorrect.
  63. */
  64. template<typename T>
  65. CacheChange_t* new_change(
  66. T& data,
  67. ChangeKind_t changeKind,
  68. InstanceHandle_t handle = c_InstanceHandle_Unknown)
  69. {
  70. return new_change([data]() -> uint32_t {
  71. return (uint32_t)T::getCdrSerializedSize(data);
  72. }, changeKind, handle);
  73. }
  74. RTPS_DllAPI CacheChange_t* new_change(
  75. const std::function<uint32_t()>& dataCdrSerializedSize,
  76. ChangeKind_t changeKind,
  77. InstanceHandle_t handle = c_InstanceHandle_Unknown);
  78. /**
  79. * Add a matched reader.
  80. * @param data Pointer to the ReaderProxyData object added.
  81. * @return True if added.
  82. */
  83. RTPS_DllAPI virtual bool matched_reader_add(
  84. const ReaderProxyData& data) = 0;
  85. /**
  86. * Remove a matched reader.
  87. * @param reader_guid GUID of the reader to remove.
  88. * @return True if removed.
  89. */
  90. RTPS_DllAPI virtual bool matched_reader_remove(
  91. const GUID_t& reader_guid) = 0;
  92. /**
  93. * Tells us if a specific Reader is matched against this writer.
  94. * @param reader_guid GUID of the reader to check.
  95. * @return True if it was matched.
  96. */
  97. RTPS_DllAPI virtual bool matched_reader_is_matched(
  98. const GUID_t& reader_guid) = 0;
  99. /**
  100. * Check if a specific change has been acknowledged by all Readers.
  101. * Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.
  102. * @return True if acknowledged by all.
  103. */
  104. RTPS_DllAPI virtual bool is_acked_by_all(
  105. const CacheChange_t* /*a_change*/) const
  106. {
  107. return false;
  108. }
  109. /**
  110. * Waits until all changes were acknowledged or max_wait.
  111. * @return True if all were acknowledged.
  112. */
  113. RTPS_DllAPI virtual bool wait_for_all_acked(
  114. const Duration_t& /*max_wait*/)
  115. {
  116. return true;
  117. }
  118. /**
  119. * Update the Attributes of the Writer.
  120. * @param att New attributes
  121. */
  122. RTPS_DllAPI virtual void updateAttributes(
  123. const WriterAttributes& att) = 0;
  124. /**
  125. * This method triggers the send operation for unsent changes.
  126. * @return number of messages sent
  127. */
  128. RTPS_DllAPI virtual void send_any_unsent_changes() = 0;
  129. /**
  130. * Get Min Seq Num in History.
  131. * @return Minimum sequence number in history
  132. */
  133. RTPS_DllAPI SequenceNumber_t get_seq_num_min();
  134. /**
  135. * Get Max Seq Num in History.
  136. * @return Maximum sequence number in history
  137. */
  138. RTPS_DllAPI SequenceNumber_t get_seq_num_max();
  139. /**
  140. * Get maximum size of the serialized type
  141. * @return Maximum size of the serialized type
  142. */
  143. RTPS_DllAPI uint32_t getTypeMaxSerialized();
  144. //!Get maximum size of the data
  145. uint32_t getMaxDataSize();
  146. //! Calculates the maximum size of the data
  147. uint32_t calculateMaxDataSize(
  148. uint32_t length);
  149. /**
  150. * Get listener
  151. * @return Listener
  152. */
  153. RTPS_DllAPI inline WriterListener* getListener()
  154. {
  155. return mp_listener;
  156. }
  157. RTPS_DllAPI inline bool set_listener(
  158. WriterListener* listener)
  159. {
  160. mp_listener = listener;
  161. return true;
  162. }
  163. /**
  164. * Get the publication mode
  165. * @return publication mode
  166. */
  167. RTPS_DllAPI inline bool isAsync() const
  168. {
  169. return is_async_;
  170. }
  171. /**
  172. * Remove an specified max number of changes
  173. * @param max Maximum number of changes to remove.
  174. * @return at least one change has been removed
  175. */
  176. RTPS_DllAPI bool remove_older_changes(
  177. unsigned int max = 0);
  178. /**
  179. * Tries to remove a change waiting a maximum of the provided microseconds.
  180. * @param max_blocking_time_point Maximum time to wait for.
  181. * @param lock Lock of the Change list.
  182. * @return at least one change has been removed
  183. */
  184. virtual bool try_remove_change(
  185. const std::chrono::steady_clock::time_point& max_blocking_time_point,
  186. std::unique_lock<RecursiveTimedMutex>& lock) = 0;
  187. /*
  188. * Adds a flow controller that will apply to this writer exclusively.
  189. * @param controller
  190. */
  191. virtual void add_flow_controller(
  192. std::unique_ptr<FlowController> controller) = 0;
  193. /**
  194. * Get RTPS participant
  195. * @return RTPS participant
  196. */
  197. inline RTPSParticipantImpl* getRTPSParticipant() const
  198. {
  199. return mp_RTPSParticipant;
  200. }
  201. /**
  202. * Enable or disable sending data to readers separately
  203. * NOTE: This will only work for synchronous writers
  204. * @param enable If separate sending should be enabled
  205. */
  206. void set_separate_sending (
  207. bool enable)
  208. {
  209. m_separateSendingEnabled = enable;
  210. }
  211. /**
  212. * Inform if data is sent to readers separatedly
  213. * @return true if separate sending is enabled
  214. */
  215. bool get_separate_sending () const
  216. {
  217. return m_separateSendingEnabled;
  218. }
  219. /**
  220. * Process an incoming ACKNACK submessage.
  221. * @param[in] writer_guid GUID of the writer the submessage is directed to.
  222. * @param[in] reader_guid GUID of the reader originating the submessage.
  223. * @param[in] ack_count Count field of the submessage.
  224. * @param[in] sn_set Sequence number bitmap field of the submessage.
  225. * @param[in] final_flag Final flag field of the submessage.
  226. * @param[out] result true if the writer could process the submessage.
  227. * Only valid when returned value is true.
  228. * @return true when the submessage was destinated to this writer, false otherwise.
  229. */
  230. virtual bool process_acknack(
  231. const GUID_t& writer_guid,
  232. const GUID_t& reader_guid,
  233. uint32_t ack_count,
  234. const SequenceNumberSet_t& sn_set,
  235. bool final_flag,
  236. bool& result)
  237. {
  238. (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;
  239. result = false;
  240. return writer_guid == m_guid;
  241. }
  242. /**
  243. * Process an incoming NACKFRAG submessage.
  244. * @param[in] writer_guid GUID of the writer the submessage is directed to.
  245. * @param[in] reader_guid GUID of the reader originating the submessage.
  246. * @param[in] ack_count Count field of the submessage.
  247. * @param[in] seq_num Sequence number field of the submessage.
  248. * @param[in] fragments_state Fragment number bitmap field of the submessage.
  249. * @param[out] result true if the writer could process the submessage.
  250. * Only valid when returned value is true.
  251. * @return true when the submessage was destinated to this writer, false otherwise.
  252. */
  253. virtual bool process_nack_frag(
  254. const GUID_t& writer_guid,
  255. const GUID_t& reader_guid,
  256. uint32_t ack_count,
  257. const SequenceNumber_t& seq_num,
  258. const FragmentNumberSet_t fragments_state,
  259. bool& result)
  260. {
  261. (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;
  262. result = false;
  263. return writer_guid == m_guid;
  264. }
  265. /**
  266. * @brief A method to retrieve the liveliness kind
  267. * @return Liveliness kind
  268. */
  269. const LivelinessQosPolicyKind& get_liveliness_kind() const;
  270. /**
  271. * @brief A method to retrieve the liveliness lease duration
  272. * @return Lease durtation
  273. */
  274. const Duration_t& get_liveliness_lease_duration() const;
  275. /**
  276. * @brief A method to return the liveliness announcement period
  277. * @return The announcement period
  278. */
  279. const Duration_t& get_liveliness_announcement_period() const;
  280. //! Liveliness lost status of this writer
  281. LivelinessLostStatus liveliness_lost_status_;
  282. /**
  283. * Check if the destinations managed by this sender interface have changed.
  284. *
  285. * @return true if destinations have changed, false otherwise.
  286. */
  287. bool destinations_have_changed() const override;
  288. /**
  289. * Get a GUID prefix representing all destinations.
  290. *
  291. * @return When all the destinations share the same prefix (i.e. belong to the same participant)
  292. * that prefix is returned. When there are no destinations, or they belong to different
  293. * participants, c_GuidPrefix_Unknown is returned.
  294. */
  295. GuidPrefix_t destination_guid_prefix() const override;
  296. /**
  297. * Get the GUID prefix of all the destination participants.
  298. *
  299. * @return a const reference to a vector with the GUID prefix of all destination participants.
  300. */
  301. const std::vector<GuidPrefix_t>& remote_participants() const override;
  302. /**
  303. * Get the GUID of all destinations.
  304. *
  305. * @return a const reference to a vector with the GUID of all destinations.
  306. */
  307. const std::vector<GUID_t>& remote_guids() const override;
  308. /**
  309. * Send a message through this interface.
  310. *
  311. * @param message Pointer to the buffer with the message already serialized.
  312. * @param max_blocking_time_point Future timepoint where blocking send should end.
  313. */
  314. bool send(
  315. CDRMessage_t* message,
  316. std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
  317. protected:
  318. //!Is the data sent directly or announced by HB and THEN send to the ones who ask for it?.
  319. bool m_pushMode;
  320. //!WriterHistory
  321. WriterHistory* mp_history;
  322. //!Listener
  323. WriterListener* mp_listener;
  324. //!Asynchronous publication activated
  325. bool is_async_;
  326. //!Separate sending activated
  327. bool m_separateSendingEnabled;
  328. LocatorSelector locator_selector_;
  329. ResourceLimitedVector<GUID_t> all_remote_readers_;
  330. ResourceLimitedVector<GuidPrefix_t> all_remote_participants_;
  331. void add_guid(
  332. const GUID_t& remote_guid);
  333. void compute_selected_guids();
  334. void update_cached_info_nts();
  335. /**
  336. * Initialize the header of hte CDRMessages.
  337. */
  338. void init_header();
  339. /**
  340. * Add a change to the unsent list.
  341. * @param change Pointer to the change to add.
  342. * @param max_blocking_time
  343. */
  344. virtual void unsent_change_added_to_history(
  345. CacheChange_t* change,
  346. const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
  347. /**
  348. * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
  349. * @param a_change Pointer to the change that is going to be removed.
  350. * @return True if removed correctly.
  351. */
  352. virtual bool change_removed_by_history(
  353. CacheChange_t* a_change) = 0;
  354. #if HAVE_SECURITY
  355. SerializedPayload_t encrypt_payload_;
  356. bool encrypt_cachechange(
  357. CacheChange_t* change);
  358. #endif
  359. //! The liveliness kind of this writer
  360. LivelinessQosPolicyKind liveliness_kind_;
  361. //! The liveliness lease duration of this writer
  362. Duration_t liveliness_lease_duration_;
  363. //! The liveliness announcement period
  364. Duration_t liveliness_announcement_period_;
  365. private:
  366. RTPSWriter& operator =(
  367. const RTPSWriter&) = delete;
  368. RTPSWriter* next_[2];
  369. };
  370. }
  371. } /* namespace rtps */
  372. } /* namespace eprosima */
  373. #endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */