// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. /** * @file RTPSMessageGroup.h * */ #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp> #include <fastdds/rtps/messages/RTPSMessageCreator.h> #include <fastdds/rtps/common/FragmentNumber.h> #include <vector> #include <chrono> #include <cassert> #include <memory> namespace eprosima { namespace fastrtps { namespace rtps { class RTPSParticipantImpl; class Endpoint; class RTPSMessageGroup_t; /** * RTPSMessageGroup Class used to construct a RTPS message. * @ingroup WRITER_MODULE */ class RTPSMessageGroup { public: class timeout : public std::runtime_error { public: timeout() : std::runtime_error("timeout") {} virtual ~timeout() = default; }; /** * Basic constructor. * Constructs a RTPSMessageGroup allowing the destination endpoints to change. * @param participant Pointer to the participant sending data. * @param endpoint Pointer to the endpoint sending data. * @param msg_sender Reference to message sender interface. * @param max_blocking_time_point Future time point where blocking send should end. */ RTPSMessageGroup( RTPSParticipantImpl* participant, Endpoint* endpoint, const RTPSMessageSenderInterface& msg_sender, std::chrono::steady_clock::time_point max_blocking_time_point = std::chrono::steady_clock::now() + std::chrono::hours(24)); ~RTPSMessageGroup() noexcept(false); /** * Adds a DATA message to the group. * @param change Reference to the cache change to send. * @param expects_inline_qos True when one destination is expecting inline QOS. * @return True when message was added to the group. */ bool add_data( const CacheChange_t& change, bool expects_inline_qos); /** * Adds a DATA_FRAG message to the group. * @param change Reference to the cache change to send. * @param fragment_number Index (1 based) of the fragment to send. * @param expects_inline_qos True when one destination is expecting inline QOS. * @return True when message was added to the group. */ bool add_data_frag( const CacheChange_t& change, const uint32_t fragment_number, bool expects_inline_qos); /** * Adds a HEARTBEAT message to the group. * @param first_seq First available sequence number. * @param last_seq Last available sequence number. * @param count Counting identifier. * @param is_final Should final flag be set? * @param liveliness_flag Should liveliness flag be set? * @return True when message was added to the group. */ bool add_heartbeat( const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq, Count_t count, bool is_final, bool liveliness_flag); /** * Adds one or more GAP messages to the group. * @param changes_seq_numbers Set of missed sequence numbers. * @return True when messages were added to the group. */ bool add_gap( std::set<SequenceNumber_t>& changes_seq_numbers); /** * Adds one GAP message to the group. * @param gap_initial_sequence Start of consecutive sequence numbers. * @param gap_bitmap Bitmap of non-consecutive sequence numbers. * @return True when message was added to the group. */ bool add_gap( const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap); /** * Adds one GAP message to the group. * @param gap_initial_sequence Start of consecutive sequence numbers. * @param gap_bitmap Bitmap of non-consecutive sequence numbers. * @param reader_guid GUID of the destination reader. * @return True when message was added to the group. */ bool add_gap( const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap, const GUID_t& reader_guid); /** * Adds a ACKNACK message to the group. * @param seq_num_set Set of missing sequence numbers. * @param count Counting identifier. * @param final_flag Should final flag be set? * @return True when message was added to the group. */ bool add_acknack( const SequenceNumberSet_t& seq_num_set, int32_t count, bool final_flag); /** * Adds a NACKFRAG message to the group. * @param seq_number Sequence number being nack'ed. * @param fn_state Set of missing fragment numbers. * @param count Counting identifier. * @return True when message was added to the group. */ bool add_nackfrag( const SequenceNumber_t& seq_number, FragmentNumberSet_t fn_state, int32_t count); inline uint32_t get_current_bytes_processed() const { return currentBytesSent_ + full_msg_->length; } /** * To be used whenever destination locators/guids change between two add_xxx calls. * Automatically called inside add_xxx calls if destinations_have_changed() method of * RTPSMessageSenderInterface returns true. * May become private again with a refactor of RTPSMessageSenderInterface, adding a * group_has_been_flushed() method. */ void flush_and_reset(); //! Maximum fragment size minus the headers static inline constexpr uint32_t get_max_fragment_payload_size() { // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment) return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3; } private: static constexpr uint32_t data_frag_header_size_ = 28; static constexpr uint32_t max_inline_qos_size_ = 32; void reset_to_header(); void flush(); void send(); void check_and_maybe_flush() { check_and_maybe_flush(sender_.destination_guid_prefix()); } void check_and_maybe_flush( const GuidPrefix_t& destination_guid_prefix); bool insert_submessage( bool is_big_submessage) { return insert_submessage(sender_.destination_guid_prefix(), is_big_submessage); } bool insert_submessage( const GuidPrefix_t& destination_guid_prefix, bool is_big_submessage); bool add_info_dst_in_buffer( CDRMessage_t* buffer, const GuidPrefix_t& destination_guid_prefix); bool add_info_ts_in_buffer( const Time_t& timestamp); bool create_gap_submessage( const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap, const EntityId_t& reader_id); const RTPSMessageSenderInterface& sender_; Endpoint* endpoint_; CDRMessage_t* full_msg_; CDRMessage_t* submessage_msg_; uint32_t currentBytesSent_; GuidPrefix_t current_dst_; RTPSParticipantImpl* participant_; #if HAVE_SECURITY CDRMessage_t* encrypt_msg_; #endif std::chrono::steady_clock::time_point max_blocking_time_point_; std::unique_ptr<RTPSMessageGroup_t> send_buffer_; }; } /* namespace rtps */ } /* namespace fastrtps */ } /* namespace eprosima */ #endif #endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */