123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /**
- * @file RTPSMessageGroup.h
- *
- */
- #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
- #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
- #include <fastdds/rtps/messages/RTPSMessageCreator.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <vector>
- #include <chrono>
- #include <cassert>
- #include <memory>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class RTPSParticipantImpl;
- class Endpoint;
- class RTPSMessageGroup_t;
- /**
- * RTPSMessageGroup Class used to construct a RTPS message.
- * @ingroup WRITER_MODULE
- */
- class RTPSMessageGroup
- {
- public:
- class timeout : public std::runtime_error
- {
- public:
- timeout()
- : std::runtime_error("timeout") {}
- virtual ~timeout() = default;
- };
- /**
- * Basic constructor.
- * Constructs a RTPSMessageGroup allowing the destination endpoints to change.
- * @param participant Pointer to the participant sending data.
- * @param endpoint Pointer to the endpoint sending data.
- * @param msg_sender Reference to message sender interface.
- * @param max_blocking_time_point Future time point where blocking send should end.
- */
- RTPSMessageGroup(
- RTPSParticipantImpl* participant,
- Endpoint* endpoint,
- const RTPSMessageSenderInterface& msg_sender,
- std::chrono::steady_clock::time_point max_blocking_time_point =
- std::chrono::steady_clock::now() + std::chrono::hours(24));
- ~RTPSMessageGroup() noexcept(false);
- /**
- * Adds a DATA message to the group.
- * @param change Reference to the cache change to send.
- * @param expects_inline_qos True when one destination is expecting inline QOS.
- * @return True when message was added to the group.
- */
- bool add_data(
- const CacheChange_t& change,
- bool expects_inline_qos);
- /**
- * Adds a DATA_FRAG message to the group.
- * @param change Reference to the cache change to send.
- * @param fragment_number Index (1 based) of the fragment to send.
- * @param expects_inline_qos True when one destination is expecting inline QOS.
- * @return True when message was added to the group.
- */
- bool add_data_frag(
- const CacheChange_t& change,
- const uint32_t fragment_number,
- bool expects_inline_qos);
- /**
- * Adds a HEARTBEAT message to the group.
- * @param first_seq First available sequence number.
- * @param last_seq Last available sequence number.
- * @param count Counting identifier.
- * @param is_final Should final flag be set?
- * @param liveliness_flag Should liveliness flag be set?
- * @return True when message was added to the group.
- */
- bool add_heartbeat(
- const SequenceNumber_t& first_seq,
- const SequenceNumber_t& last_seq,
- Count_t count,
- bool is_final,
- bool liveliness_flag);
- /**
- * Adds one or more GAP messages to the group.
- * @param changes_seq_numbers Set of missed sequence numbers.
- * @return True when messages were added to the group.
- */
- bool add_gap(
- std::set<SequenceNumber_t>& changes_seq_numbers);
- /**
- * Adds one GAP message to the group.
- * @param gap_initial_sequence Start of consecutive sequence numbers.
- * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
- * @return True when message was added to the group.
- */
- bool add_gap(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap);
- /**
- * Adds one GAP message to the group.
- * @param gap_initial_sequence Start of consecutive sequence numbers.
- * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
- * @param reader_guid GUID of the destination reader.
- * @return True when message was added to the group.
- */
- bool add_gap(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap,
- const GUID_t& reader_guid);
- /**
- * Adds a ACKNACK message to the group.
- * @param seq_num_set Set of missing sequence numbers.
- * @param count Counting identifier.
- * @param final_flag Should final flag be set?
- * @return True when message was added to the group.
- */
- bool add_acknack(
- const SequenceNumberSet_t& seq_num_set,
- int32_t count,
- bool final_flag);
- /**
- * Adds a NACKFRAG message to the group.
- * @param seq_number Sequence number being nack'ed.
- * @param fn_state Set of missing fragment numbers.
- * @param count Counting identifier.
- * @return True when message was added to the group.
- */
- bool add_nackfrag(
- const SequenceNumber_t& seq_number,
- FragmentNumberSet_t fn_state,
- int32_t count);
- inline uint32_t get_current_bytes_processed() const
- {
- return currentBytesSent_ + full_msg_->length;
- }
- /**
- * To be used whenever destination locators/guids change between two add_xxx calls.
- * Automatically called inside add_xxx calls if destinations_have_changed() method of
- * RTPSMessageSenderInterface returns true.
- * May become private again with a refactor of RTPSMessageSenderInterface, adding a
- * group_has_been_flushed() method.
- */
- void flush_and_reset();
- //! Maximum fragment size minus the headers
- static inline constexpr uint32_t get_max_fragment_payload_size()
- {
- // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
- return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
- }
- private:
- static constexpr uint32_t data_frag_header_size_ = 28;
- static constexpr uint32_t max_inline_qos_size_ = 32;
- void reset_to_header();
- void flush();
- void send();
- void check_and_maybe_flush()
- {
- check_and_maybe_flush(sender_.destination_guid_prefix());
- }
- void check_and_maybe_flush(
- const GuidPrefix_t& destination_guid_prefix);
- bool insert_submessage(
- bool is_big_submessage)
- {
- return insert_submessage(sender_.destination_guid_prefix(), is_big_submessage);
- }
- bool insert_submessage(
- const GuidPrefix_t& destination_guid_prefix,
- bool is_big_submessage);
- bool add_info_dst_in_buffer(
- CDRMessage_t* buffer,
- const GuidPrefix_t& destination_guid_prefix);
- bool add_info_ts_in_buffer(
- const Time_t& timestamp);
- bool create_gap_submessage(
- const SequenceNumber_t& gap_initial_sequence,
- const SequenceNumberSet_t& gap_bitmap,
- const EntityId_t& reader_id);
- const RTPSMessageSenderInterface& sender_;
- Endpoint* endpoint_;
- CDRMessage_t* full_msg_;
- CDRMessage_t* submessage_msg_;
- uint32_t currentBytesSent_;
- GuidPrefix_t current_dst_;
- RTPSParticipantImpl* participant_;
- #if HAVE_SECURITY
-
- CDRMessage_t* encrypt_msg_;
-
- #endif
- std::chrono::steady_clock::time_point max_blocking_time_point_;
- std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
- };
- } /* namespace rtps */
- } /* namespace fastrtps */
- } /* namespace eprosima */
- #endif
- #endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */
|