|
- #ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #define _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <algorithm>
- #include <mutex>
- #include <set>
- #include <atomic>
- #include <fastdds/rtps/builtin/data/ReaderProxyData.h>
- #include <fastdds/rtps/writer/ReaderLocator.h>
- #include <fastdds/rtps/common/Types.h>
- #include <fastdds/rtps/common/Locator.h>
- #include <fastdds/rtps/common/SequenceNumber.h>
- #include <fastdds/rtps/common/CacheChange.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <fastdds/rtps/attributes/WriterAttributes.h>
- #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class StatefulWriter;
- class TimedEvent;
- class RTPSReader;
- class ReaderProxy
- {
- public:
- ~ReaderProxy();
-
- ReaderProxy(
- const WriterTimes& times,
- const RemoteLocatorsAllocationAttributes& loc_alloc,
- StatefulWriter* writer);
-
- void start(
- const ReaderProxyData& reader_attributes);
-
- bool update(
- const ReaderProxyData& reader_attributes);
-
- void stop();
-
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression);
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
-
- bool has_changes() const;
-
- bool change_is_acked(
- const SequenceNumber_t& seq_num) const;
-
- bool change_is_unsent(
- const SequenceNumber_t& seq_num,
- bool& is_irrelevant) const;
-
- void acked_changes_set(
- const SequenceNumber_t& seq_num);
-
- bool requested_changes_set(
- const SequenceNumberSet_t& seq_num_set);
-
- bool process_initial_acknack();
-
- template <class BinaryFunction>
- void for_each_unsent_change(
- const SequenceNumber_t& max_seq,
- BinaryFunction f) const
- {
- if (!changes_for_reader_.empty())
- {
- SequenceNumber_t current_seq = changes_low_mark_ + 1;
- ChangeConstIterator it = changes_for_reader_.begin();
- while (it != changes_for_reader_.end())
- {
-
- SequenceNumber_t change_seq = it->getSequenceNumber();
- for (; current_seq < change_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
-
- if (it->getStatus() == UNSENT)
- {
- f(current_seq, &(*it));
- }
- ++current_seq;
- ++it;
- }
-
- for (; current_seq < max_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
- }
- else
- {
-
- for (SequenceNumber_t seq = changes_low_mark_ + 1; seq < max_seq; ++seq)
- {
- f(seq, nullptr);
- }
- }
- }
-
- bool set_change_to_status(
- const SequenceNumber_t& seq_num,
- ChangeForReaderStatus_t status,
- bool restart_nack_supression);
-
- bool mark_fragment_as_sent_for_change(
- const SequenceNumber_t& seq_num,
- FragmentNumber_t frag_num,
- bool& was_last_fragment);
-
- bool perform_nack_supression();
-
- bool perform_acknack_response();
-
- void change_has_been_removed(
- const SequenceNumber_t& seq_num);
-
- bool has_unacknowledged() const;
-
- inline const GUID_t& guid() const
- {
- return locator_info_.remote_guid();
- }
-
- inline DurabilityKind_t durability_kind() const
- {
- return durability_kind_;
- }
-
- inline bool expects_inline_qos() const
- {
- return expects_inline_qos_;
- }
-
- inline bool is_reliable() const
- {
- return is_reliable_;
- }
- inline bool disable_positive_acks() const
- {
- return disable_positive_acks_;
- }
-
- inline bool is_remote_and_reliable() const
- {
- return !locator_info_.is_local_reader() && is_reliable_;
- }
-
- inline bool is_local_reader()
- {
- return locator_info_.is_local_reader();
- }
-
- inline RTPSReader* local_reader()
- {
- return locator_info_.local_reader();
- }
-
- bool check_and_set_acknack_count(
- uint32_t acknack_count)
- {
- if (last_acknack_count_ < acknack_count)
- {
- last_acknack_count_ = acknack_count;
- return true;
- }
- return false;
- }
-
- bool process_nack_frag(
- const GUID_t& reader_guid,
- uint32_t nack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& fragments_state);
-
- inline bool rtps_is_relevant(
- CacheChange_t* change)
- {
- (void)change; return true;
- }
-
- SequenceNumber_t changes_low_mark() const
- {
- return changes_low_mark_;
- }
-
- void update_nack_supression_interval(
- const Duration_t& interval);
-
- bool are_there_gaps();
- LocatorSelectorEntry* locator_selector_entry()
- {
- return locator_info_.locator_selector_entry();
- }
- const RTPSMessageSenderInterface& message_sender() const
- {
- return locator_info_;
- }
- private:
-
- bool is_active_;
-
- ReaderLocator locator_info_;
-
- DurabilityKind_t durability_kind_;
-
- bool expects_inline_qos_;
-
- bool is_reliable_;
-
- bool disable_positive_acks_;
-
- StatefulWriter* writer_;
-
- ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
-
- TimedEvent* nack_supression_event_;
- TimedEvent* initial_heartbeat_event_;
-
- std::atomic_bool timers_enabled_;
-
- uint32_t last_acknack_count_;
-
- uint32_t last_nackfrag_count_;
- SequenceNumber_t changes_low_mark_;
- using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
- using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
- void disable_timers();
-
- bool convert_status_on_all_changes(
- ChangeForReaderStatus_t previous,
- ChangeForReaderStatus_t next);
-
- bool requested_fragment_set(
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& frag_set);
- void add_change(
- const ChangeForReader_t& change);
-
- ChangeIterator find_change(
- const SequenceNumber_t& seq_num,
- bool exact);
-
- ChangeConstIterator find_change(
- const SequenceNumber_t& seq_num) const;
- };
- }
- }
- }
- #endif
- #endif
|