123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
- #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
- #include <fastdds/rtps/messages/RTPSMessageCreator.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <vector>
- #include <chrono>
- #include <cassert>
- #include <memory>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class RTPSParticipantImpl;
- class Endpoint;
- class RTPSMessageGroup_t;
- class RTPSMessageGroup
- {
- public:
- class timeout : public std::runtime_error
- {
- public:
- timeout()
- : std::runtime_error("timeout") {}
- virtual ~timeout() = default;
- };
-
- RTPSMessageGroup(
- RTPSParticipantImpl* participant,
- Endpoint* endpoint,
- const RTPSMessageSenderInterface& msg_sender,
- std::chrono::steady_clock::time_point max_blocking_time_point =
- std::chrono::steady_clock::now() + std::chrono::hours(24));
- ~RTPSMessageGroup() noexcept(false);
-
- bool add_data(
- const CacheChange_t& change,
- bool expects_inline_qos);
-
- bool add_data_frag(
- const CacheChange_t& change,
- const uint32_t fragment_number,
- bool expects_inline_qos);
-
- bool add_heartbeat(
- const SequenceNumber_t& first_seq,
- const SequenceNumber_t& last_seq,
- Count_t count,
- bool is_final,
- bool liveliness_flag);
-
- bool add_gap(
- std::set<SequenceNumber_t>& changes_seq_numbers);
-
- bool add_gap(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap);
-
- bool add_gap(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap,
- const GUID_t& reader_guid);
-
- bool add_acknack(
- const SequenceNumberSet_t& seq_num_set,
- int32_t count,
- bool final_flag);
-
- bool add_nackfrag(
- const SequenceNumber_t& seq_number,
- FragmentNumberSet_t fn_state,
- int32_t count);
- inline uint32_t get_current_bytes_processed() const
- {
- return currentBytesSent_ + full_msg_->length;
- }
-
- void flush_and_reset();
-
- static inline constexpr uint32_t get_max_fragment_payload_size()
- {
-
- return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
- }
- private:
- static constexpr uint32_t data_frag_header_size_ = 28;
- static constexpr uint32_t max_inline_qos_size_ = 32;
- void reset_to_header();
- void flush();
- void send();
- void check_and_maybe_flush()
- {
- check_and_maybe_flush(sender_.destination_guid_prefix());
- }
- void check_and_maybe_flush(
- const GuidPrefix_t& destination_guid_prefix);
- bool insert_submessage(
- bool is_big_submessage)
- {
- return insert_submessage(sender_.destination_guid_prefix(), is_big_submessage);
- }
- bool insert_submessage(
- const GuidPrefix_t& destination_guid_prefix,
- bool is_big_submessage);
- bool add_info_dst_in_buffer(
- CDRMessage_t* buffer,
- const GuidPrefix_t& destination_guid_prefix);
- bool add_info_ts_in_buffer(
- const Time_t& timestamp);
- bool create_gap_submessage(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap,
- const EntityId_t& reader_id);
- const RTPSMessageSenderInterface& sender_;
- Endpoint* endpoint_;
- CDRMessage_t* full_msg_;
- CDRMessage_t* submessage_msg_;
- uint32_t currentBytesSent_;
- GuidPrefix_t current_dst_;
- RTPSParticipantImpl* participant_;
- #if HAVE_SECURITY
-
- CDRMessage_t* encrypt_msg_;
-
- #endif
- std::chrono::steady_clock::time_point max_blocking_time_point_;
- std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
- };
- }
- }
- }
- #endif
- #endif
|