RTPSMessageGroup.h 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. /**
  15. * @file RTPSMessageGroup.h
  16. *
  17. */
  18. #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
  19. #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
  20. #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
  21. #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
  22. #include <fastdds/rtps/messages/RTPSMessageCreator.h>
  23. #include <fastdds/rtps/common/FragmentNumber.h>
  24. #include <vector>
  25. #include <chrono>
  26. #include <cassert>
  27. #include <memory>
  28. namespace eprosima {
  29. namespace fastrtps {
  30. namespace rtps {
  31. class RTPSParticipantImpl;
  32. class Endpoint;
  33. class RTPSMessageGroup_t;
  34. /**
  35. * RTPSMessageGroup Class used to construct a RTPS message.
  36. * @ingroup WRITER_MODULE
  37. */
  38. class RTPSMessageGroup
  39. {
  40. public:
  41. class timeout : public std::runtime_error
  42. {
  43. public:
  44. timeout()
  45. : std::runtime_error("timeout") {}
  46. virtual ~timeout() = default;
  47. };
  48. /**
  49. * Basic constructor.
  50. * Constructs a RTPSMessageGroup allowing the destination endpoints to change.
  51. * @param participant Pointer to the participant sending data.
  52. * @param endpoint Pointer to the endpoint sending data.
  53. * @param msg_sender Reference to message sender interface.
  54. * @param max_blocking_time_point Future time point where blocking send should end.
  55. */
  56. RTPSMessageGroup(
  57. RTPSParticipantImpl* participant,
  58. Endpoint* endpoint,
  59. const RTPSMessageSenderInterface& msg_sender,
  60. std::chrono::steady_clock::time_point max_blocking_time_point =
  61. std::chrono::steady_clock::now() + std::chrono::hours(24));
  62. ~RTPSMessageGroup() noexcept(false);
  63. /**
  64. * Adds a DATA message to the group.
  65. * @param change Reference to the cache change to send.
  66. * @param expects_inline_qos True when one destination is expecting inline QOS.
  67. * @return True when message was added to the group.
  68. */
  69. bool add_data(
  70. const CacheChange_t& change,
  71. bool expects_inline_qos);
  72. /**
  73. * Adds a DATA_FRAG message to the group.
  74. * @param change Reference to the cache change to send.
  75. * @param fragment_number Index (1 based) of the fragment to send.
  76. * @param expects_inline_qos True when one destination is expecting inline QOS.
  77. * @return True when message was added to the group.
  78. */
  79. bool add_data_frag(
  80. const CacheChange_t& change,
  81. const uint32_t fragment_number,
  82. bool expects_inline_qos);
  83. /**
  84. * Adds a HEARTBEAT message to the group.
  85. * @param first_seq First available sequence number.
  86. * @param last_seq Last available sequence number.
  87. * @param count Counting identifier.
  88. * @param is_final Should final flag be set?
  89. * @param liveliness_flag Should liveliness flag be set?
  90. * @return True when message was added to the group.
  91. */
  92. bool add_heartbeat(
  93. const SequenceNumber_t& first_seq,
  94. const SequenceNumber_t& last_seq,
  95. Count_t count,
  96. bool is_final,
  97. bool liveliness_flag);
  98. /**
  99. * Adds one or more GAP messages to the group.
  100. * @param changes_seq_numbers Set of missed sequence numbers.
  101. * @return True when messages were added to the group.
  102. */
  103. bool add_gap(
  104. std::set<SequenceNumber_t>& changes_seq_numbers);
  105. /**
  106. * Adds one GAP message to the group.
  107. * @param gap_initial_sequence Start of consecutive sequence numbers.
  108. * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
  109. * @return True when message was added to the group.
  110. */
  111. bool add_gap(
  112. const SequenceNumber_t& gap_initial_sequence,
  113. const SequenceNumberSet_t& gap_bitmap);
  114. /**
  115. * Adds one GAP message to the group.
  116. * @param gap_initial_sequence Start of consecutive sequence numbers.
  117. * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
  118. * @param reader_guid GUID of the destination reader.
  119. * @return True when message was added to the group.
  120. */
  121. bool add_gap(
  122. const SequenceNumber_t& gap_initial_sequence,
  123. const SequenceNumberSet_t& gap_bitmap,
  124. const GUID_t& reader_guid);
  125. /**
  126. * Adds a ACKNACK message to the group.
  127. * @param seq_num_set Set of missing sequence numbers.
  128. * @param count Counting identifier.
  129. * @param final_flag Should final flag be set?
  130. * @return True when message was added to the group.
  131. */
  132. bool add_acknack(
  133. const SequenceNumberSet_t& seq_num_set,
  134. int32_t count,
  135. bool final_flag);
  136. /**
  137. * Adds a NACKFRAG message to the group.
  138. * @param seq_number Sequence number being nack'ed.
  139. * @param fn_state Set of missing fragment numbers.
  140. * @param count Counting identifier.
  141. * @return True when message was added to the group.
  142. */
  143. bool add_nackfrag(
  144. const SequenceNumber_t& seq_number,
  145. FragmentNumberSet_t fn_state,
  146. int32_t count);
  147. inline uint32_t get_current_bytes_processed() const
  148. {
  149. return currentBytesSent_ + full_msg_->length;
  150. }
  151. /**
  152. * To be used whenever destination locators/guids change between two add_xxx calls.
  153. * Automatically called inside add_xxx calls if destinations_have_changed() method of
  154. * RTPSMessageSenderInterface returns true.
  155. * May become private again with a refactor of RTPSMessageSenderInterface, adding a
  156. * group_has_been_flushed() method.
  157. */
  158. void flush_and_reset();
  159. //! Maximum fragment size minus the headers
  160. static inline constexpr uint32_t get_max_fragment_payload_size()
  161. {
  162. // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
  163. return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
  164. }
  165. private:
  166. static constexpr uint32_t data_frag_header_size_ = 28;
  167. static constexpr uint32_t max_inline_qos_size_ = 32;
  168. void reset_to_header();
  169. void flush();
  170. void send();
  171. void check_and_maybe_flush()
  172. {
  173. check_and_maybe_flush(sender_.destination_guid_prefix());
  174. }
  175. void check_and_maybe_flush(
  176. const GuidPrefix_t& destination_guid_prefix);
  177. bool insert_submessage(
  178. bool is_big_submessage)
  179. {
  180. return insert_submessage(sender_.destination_guid_prefix(), is_big_submessage);
  181. }
  182. bool insert_submessage(
  183. const GuidPrefix_t& destination_guid_prefix,
  184. bool is_big_submessage);
  185. bool add_info_dst_in_buffer(
  186. CDRMessage_t* buffer,
  187. const GuidPrefix_t& destination_guid_prefix);
  188. bool add_info_ts_in_buffer(
  189. const Time_t& timestamp);
  190. bool create_gap_submessage(
  191. const SequenceNumber_t& gap_initial_sequence,
  192. const SequenceNumberSet_t& gap_bitmap,
  193. const EntityId_t& reader_id);
  194. const RTPSMessageSenderInterface& sender_;
  195. Endpoint* endpoint_;
  196. CDRMessage_t* full_msg_;
  197. CDRMessage_t* submessage_msg_;
  198. uint32_t currentBytesSent_;
  199. GuidPrefix_t current_dst_;
  200. RTPSParticipantImpl* participant_;
  201. #if HAVE_SECURITY
  202. CDRMessage_t* encrypt_msg_;
  203. #endif
  204. std::chrono::steady_clock::time_point max_blocking_time_point_;
  205. std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
  206. };
  207. } /* namespace rtps */
  208. } /* namespace fastrtps */
  209. } /* namespace eprosima */
  210. #endif
  211. #endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */