123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- #ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #define _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <algorithm>
- #include <mutex>
- #include <set>
- #include <atomic>
- #include <fastdds/rtps/builtin/data/ReaderProxyData.h>
- #include <fastdds/rtps/writer/ReaderLocator.h>
- #include <fastdds/rtps/common/Types.h>
- #include <fastdds/rtps/common/Locator.h>
- #include <fastdds/rtps/common/SequenceNumber.h>
- #include <fastdds/rtps/common/CacheChange.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <fastdds/rtps/attributes/WriterAttributes.h>
- #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class StatefulWriter;
- class TimedEvent;
- class RTPSReader;
- class ReaderProxy
- {
- public:
- ~ReaderProxy();
-
- ReaderProxy(
- const WriterTimes& times,
- const RemoteLocatorsAllocationAttributes& loc_alloc,
- StatefulWriter* writer);
-
- void start(
- const ReaderProxyData& reader_attributes);
-
- bool update(
- const ReaderProxyData& reader_attributes);
-
- void stop();
-
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression);
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
-
- bool has_changes() const;
-
- bool change_is_acked(
- const SequenceNumber_t& seq_num) const;
-
- bool change_is_unsent(
- const SequenceNumber_t& seq_num,
- bool& is_irrelevant) const;
-
- void acked_changes_set(
- const SequenceNumber_t& seq_num);
-
- bool requested_changes_set(
- const SequenceNumberSet_t& seq_num_set);
-
- bool process_initial_acknack();
-
- template <class BinaryFunction>
- void for_each_unsent_change(
- const SequenceNumber_t& max_seq,
- BinaryFunction f) const
- {
- if (!changes_for_reader_.empty())
- {
- SequenceNumber_t current_seq = changes_low_mark_ + 1;
- ChangeConstIterator it = changes_for_reader_.begin();
- while (it != changes_for_reader_.end())
- {
-
- SequenceNumber_t change_seq = it->getSequenceNumber();
- for (; current_seq < change_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
-
- if (it->getStatus() == UNSENT)
- {
- f(current_seq, &(*it));
- }
- ++current_seq;
- ++it;
- }
-
- for (; current_seq < max_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
- }
- else
- {
-
- for (SequenceNumber_t seq = changes_low_mark_ + 1; seq < max_seq; ++seq)
- {
- f(seq, nullptr);
- }
- }
- }
-
- bool set_change_to_status(
- const SequenceNumber_t& seq_num,
- ChangeForReaderStatus_t status,
- bool restart_nack_supression);
-
- bool mark_fragment_as_sent_for_change(
- const SequenceNumber_t& seq_num,
- FragmentNumber_t frag_num,
- bool& was_last_fragment);
-
- bool perform_nack_supression();
-
- bool perform_acknack_response();
-
- void change_has_been_removed(
- const SequenceNumber_t& seq_num);
-
- bool has_unacknowledged() const;
-
- inline const GUID_t& guid() const
- {
- return locator_info_.remote_guid();
- }
-
- inline DurabilityKind_t durability_kind() const
- {
- return durability_kind_;
- }
-
- inline bool expects_inline_qos() const
- {
- return expects_inline_qos_;
- }
-
- inline bool is_reliable() const
- {
- return is_reliable_;
- }
- inline bool disable_positive_acks() const
- {
- return disable_positive_acks_;
- }
-
- inline bool is_remote_and_reliable() const
- {
- return !locator_info_.is_local_reader() && is_reliable_;
- }
-
- inline bool is_local_reader()
- {
- return locator_info_.is_local_reader();
- }
-
- inline RTPSReader* local_reader()
- {
- return locator_info_.local_reader();
- }
-
- bool check_and_set_acknack_count(
- uint32_t acknack_count)
- {
- if (last_acknack_count_ < acknack_count)
- {
- last_acknack_count_ = acknack_count;
- return true;
- }
- return false;
- }
-
- bool process_nack_frag(
- const GUID_t& reader_guid,
- uint32_t nack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& fragments_state);
-
- inline bool rtps_is_relevant(
- CacheChange_t* change)
- {
- (void)change; return true;
- }
-
- SequenceNumber_t changes_low_mark() const
- {
- return changes_low_mark_;
- }
-
- void update_nack_supression_interval(
- const Duration_t& interval);
-
- bool are_there_gaps();
- LocatorSelectorEntry* locator_selector_entry()
- {
- return locator_info_.locator_selector_entry();
- }
- const RTPSMessageSenderInterface& message_sender() const
- {
- return locator_info_;
- }
- private:
-
- bool is_active_;
-
- ReaderLocator locator_info_;
-
- DurabilityKind_t durability_kind_;
-
- bool expects_inline_qos_;
-
- bool is_reliable_;
-
- bool disable_positive_acks_;
-
- StatefulWriter* writer_;
-
- ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
-
- TimedEvent* nack_supression_event_;
- TimedEvent* initial_heartbeat_event_;
-
- std::atomic_bool timers_enabled_;
-
- uint32_t last_acknack_count_;
-
- uint32_t last_nackfrag_count_;
- SequenceNumber_t changes_low_mark_;
- using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
- using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
- void disable_timers();
-
- bool convert_status_on_all_changes(
- ChangeForReaderStatus_t previous,
- ChangeForReaderStatus_t next);
-
- bool requested_fragment_set(
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& frag_set);
- void add_change(
- const ChangeForReader_t& change);
-
- ChangeIterator find_change(
- const SequenceNumber_t& seq_num,
- bool exact);
-
- ChangeConstIterator find_change(
- const SequenceNumber_t& seq_num) const;
- };
- }
- }
- }
- #endif
- #endif
|