123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- // Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /**
- * @file ReaderProxy.h
- */
- #ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #define _FASTDDS_RTPS_WRITER_READERPROXY_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <algorithm>
- #include <mutex>
- #include <set>
- #include <atomic>
- #include <fastdds/rtps/builtin/data/ReaderProxyData.h>
- #include <fastdds/rtps/writer/ReaderLocator.h>
- #include <fastdds/rtps/common/Types.h>
- #include <fastdds/rtps/common/Locator.h>
- #include <fastdds/rtps/common/SequenceNumber.h>
- #include <fastdds/rtps/common/CacheChange.h>
- #include <fastdds/rtps/common/FragmentNumber.h>
- #include <fastdds/rtps/attributes/WriterAttributes.h>
- #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class StatefulWriter;
- class TimedEvent;
- class RTPSReader;
- /**
- * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.
- * @ingroup WRITER_MODULE
- */
- class ReaderProxy
- {
- public:
- ~ReaderProxy();
- /**
- * Constructor.
- * @param times WriterTimes to use in the ReaderProxy.
- * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
- * @param writer Pointer to the StatefulWriter creating the reader proxy.
- */
- ReaderProxy(
- const WriterTimes& times,
- const RemoteLocatorsAllocationAttributes& loc_alloc,
- StatefulWriter* writer);
- /**
- * Activate this proxy associating it to a remote reader.
- * @param reader_attributes ReaderProxyData of the reader for which to keep state.
- */
- void start(
- const ReaderProxyData& reader_attributes);
- /**
- * Update information about the remote reader.
- * @param reader_attributes ReaderProxyData with updated information of the reader.
- * @return true if data was modified, false otherwise.
- */
- bool update(
- const ReaderProxyData& reader_attributes);
- /**
- * Disable this proxy.
- */
- void stop();
- /**
- * Called when a change is added to the writer's history.
- * @param change Information regarding the change added.
- * @param restart_nack_supression Whether nack-supression event should be restarted.
- */
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression);
- void add_change(
- const ChangeForReader_t& change,
- bool restart_nack_supression,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
- /**
- * Check if there are changes pending for this reader.
- * @return true when there are pending changes, false otherwise.
- */
- bool has_changes() const;
- /**
- * Check if a specific change has been already acknowledged for this reader.
- * @param seq_num Sequence number of the change to be checked.
- * @return true when the change is irrelevant or has been already acknowledged, false otherwise.
- */
- bool change_is_acked(
- const SequenceNumber_t& seq_num) const;
- /**
- * Check if a specific change is marked to be sent to this reader.
- * @param[in] seq_num Sequence number of the change to be checked.
- * @param[out] is_irrelevant Will be forced to true if change is irrelevant for this reader.
- * @return true when the change is marked to be sent, false otherwise.
- */
- bool change_is_unsent(
- const SequenceNumber_t& seq_num,
- bool& is_irrelevant) const;
- /**
- * Mark all changes up to the one indicated by seq_num as Acknowledged.
- * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
- * @param seq_num Sequence number of the first change not to be marked as acknowledged.
- */
- void acked_changes_set(
- const SequenceNumber_t& seq_num);
- /**
- * Mark all changes in the vector as requested.
- * @param seq_num_set Bitmap of sequence numbers.
- * @return true if at least one change has been marked as REQUESTED, false otherwise.
- */
- bool requested_changes_set(
- const SequenceNumberSet_t& seq_num_set);
- /**
- * Performs processing of preemptive acknack
- * @return true if a heartbeat should be sent, false otherwise.
- */
- bool process_initial_acknack();
- /**
- * Applies the given function object to every unsent change.
- * @param max_seq Maximum sequence number to be considered without including it.
- * @param f Function to apply.
- * Will receive a SequenceNumber_t and a ChangeForReader_t*.
- * The second argument may be nullptr for irrelevant changes.
- */
- template <class BinaryFunction>
- void for_each_unsent_change(
- const SequenceNumber_t& max_seq,
- BinaryFunction f) const
- {
- if (!changes_for_reader_.empty())
- {
- SequenceNumber_t current_seq = changes_low_mark_ + 1;
- ChangeConstIterator it = changes_for_reader_.begin();
- while (it != changes_for_reader_.end())
- {
- // Holes before this change are informed as irrelevant.
- SequenceNumber_t change_seq = it->getSequenceNumber();
- for (; current_seq < change_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
- // We then inform of this change if it is unsent, and go to the next one.
- if (it->getStatus() == UNSENT)
- {
- f(current_seq, &(*it));
- }
- ++current_seq;
- ++it;
- }
- // After the last change has been checked, there may be a hole at the end.
- for (; current_seq < max_seq; ++current_seq)
- {
- f(current_seq, nullptr);
- }
- }
- else
- {
- // This may be entered if all changes where removed before being acknowledged.
- for (SequenceNumber_t seq = changes_low_mark_ + 1; seq < max_seq; ++seq)
- {
- f(seq, nullptr);
- }
- }
- }
- /*!
- * @brief Sets a change to a particular status (if present in the ReaderProxy)
- * @param seq_num Sequence number of the change to update.
- * @param status Status to apply.
- * @param restart_nack_supression Whether nack supression event should be restarted or not.
- * @return true when a status has changed, false otherwise.
- */
- bool set_change_to_status(
- const SequenceNumber_t& seq_num,
- ChangeForReaderStatus_t status,
- bool restart_nack_supression);
- /**
- * @brief Mark a particular fragment as sent.
- * @param[in] seq_num Sequence number of the change to update.
- * @param[in] frag_num Fragment number to mark as sent.
- * @param[out] was_last_fragment Indicates if the fragment was the last one pending.
- * @return true when the change was found, false otherwise.
- */
- bool mark_fragment_as_sent_for_change(
- const SequenceNumber_t& seq_num,
- FragmentNumber_t frag_num,
- bool& was_last_fragment);
- /**
- * Turns all UNDERWAY changes into UNACKNOWLEDGED.
- * @return true if at least one change changed its status, false otherwise.
- */
- bool perform_nack_supression();
- /**
- * Turns all REQUESTED changes into UNSENT.
- * @return true if at least one change changed its status, false otherwise.
- */
- bool perform_acknack_response();
- /**
- * Call this to inform a change was removed from history.
- * @param seq_num Sequence number of the removed change.
- */
- void change_has_been_removed(
- const SequenceNumber_t& seq_num);
- /*!
- * @brief Returns there is some UNACKNOWLEDGED change.
- * @return There is some UNACKNOWLEDGED change.
- */
- bool has_unacknowledged() const;
- /**
- * Get the GUID of the reader represented by this proxy.
- * @return the GUID of the reader represented by this proxy.
- */
- inline const GUID_t& guid() const
- {
- return locator_info_.remote_guid();
- }
- /**
- * Get the durability of the reader represented by this proxy.
- * @return the durability of the reader represented by this proxy.
- */
- inline DurabilityKind_t durability_kind() const
- {
- return durability_kind_;
- }
- /**
- * Check if the reader represented by this proxy expexts inline QOS to be received.
- * @return true if the reader represented by this proxy expexts inline QOS to be received.
- */
- inline bool expects_inline_qos() const
- {
- return expects_inline_qos_;
- }
- /**
- * Check if the reader represented by this proxy is reliable.
- * @return true if the reader represented by this proxy is reliable.
- */
- inline bool is_reliable() const
- {
- return is_reliable_;
- }
- inline bool disable_positive_acks() const
- {
- return disable_positive_acks_;
- }
- /**
- * Check if the reader represented by this proxy is remote and reliable.
- * @return true if the reader represented by this proxy is remote and reliable.
- */
- inline bool is_remote_and_reliable() const
- {
- return !locator_info_.is_local_reader() && is_reliable_;
- }
- /**
- * Check if the reader is on the same process.
- * @return true if the reader is no the same process.
- */
- inline bool is_local_reader()
- {
- return locator_info_.is_local_reader();
- }
- /**
- * Get the local reader on the same process (if any).
- * @return The local reader on the same process.
- */
- inline RTPSReader* local_reader()
- {
- return locator_info_.local_reader();
- }
- /**
- * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
- * @param acknack_count The count of the received ACKNACK.
- * @return true if internal count changed (i.e. new ACKNACK is accepted)
- */
- bool check_and_set_acknack_count(
- uint32_t acknack_count)
- {
- if (last_acknack_count_ < acknack_count)
- {
- last_acknack_count_ = acknack_count;
- return true;
- }
- return false;
- }
- /**
- * Process an incoming NACKFRAG submessage.
- * @param reader_guid Destination guid of the submessage.
- * @param nack_count Counter field of the submessage.
- * @param seq_num Sequence number field of the submessage.
- * @param fragments_state Bitmap indicating the requested fragments.
- * @return true if a change was modified, false otherwise.
- */
- bool process_nack_frag(
- const GUID_t& reader_guid,
- uint32_t nack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& fragments_state);
- /**
- * Filter a CacheChange_t, in this version always returns true.
- * @param change
- * @return true if the change is relevant, false otherwise.
- */
- inline bool rtps_is_relevant(
- CacheChange_t* change)
- {
- (void)change; return true;
- }
- /**
- * Get the highest fully acknowledged sequence number.
- * @return the highest fully acknowledged sequence number.
- */
- SequenceNumber_t changes_low_mark() const
- {
- return changes_low_mark_;
- }
- /**
- * Change the interval of nack-supression event.
- * @param interval Time from data sending to acknack processing.
- */
- void update_nack_supression_interval(
- const Duration_t& interval);
- /**
- * Check if there are gaps in the list of ChangeForReader_t.
- * @return True if there are gaps, else false.
- */
- bool are_there_gaps();
- LocatorSelectorEntry* locator_selector_entry()
- {
- return locator_info_.locator_selector_entry();
- }
- const RTPSMessageSenderInterface& message_sender() const
- {
- return locator_info_;
- }
- private:
- //!Is this proxy active? I.e. does it have a remote reader associated?
- bool is_active_;
- //!Reader locator information
- ReaderLocator locator_info_;
- //!Taken from QoS
- DurabilityKind_t durability_kind_;
- //!Taken from QoS
- bool expects_inline_qos_;
- //!Taken from QoS
- bool is_reliable_;
- //!Taken from QoS
- bool disable_positive_acks_;
- //!Pointer to the associated StatefulWriter.
- StatefulWriter* writer_;
- //!Set of the changes and its state.
- ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
- //! Timed Event to manage the delay to mark a change as UNACKED after sending it.
- TimedEvent* nack_supression_event_;
- TimedEvent* initial_heartbeat_event_;
- //! Are timed events enabled?
- std::atomic_bool timers_enabled_;
- //! Last ack/nack count
- uint32_t last_acknack_count_;
- //! Last NACKFRAG count.
- uint32_t last_nackfrag_count_;
- SequenceNumber_t changes_low_mark_;
- using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
- using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
- void disable_timers();
- /*
- * Converts all changes with a given status to a different status.
- * @param previous Status to change.
- * @param next Status to adopt.
- * @return true when at least one change has been modified, false otherwise.
- */
- bool convert_status_on_all_changes(
- ChangeForReaderStatus_t previous,
- ChangeForReaderStatus_t next);
- /*!
- * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay.
- * @param[in] seq_num Sequence number to be paired with the requested fragments.
- * @param[in] frag_set set containing the requested fragments to be sent.
- * @return True if there is at least one requested fragment. False in other case.
- */
- bool requested_fragment_set(
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t& frag_set);
- void add_change(
- const ChangeForReader_t& change);
- /**
- * @brief Find a change with the specified sequence number.
- * @param seq_num Sequence number to find.
- * @param exact When false, the first change with a sequence number not less than seq_num will be returned.
- * When true, the change with a sequence number value of seq_num will be returned.
- * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
- */
- ChangeIterator find_change(
- const SequenceNumber_t& seq_num,
- bool exact);
- /**
- * @brief Find a change with the specified sequence number.
- * @param seq_num Sequence number to find.
- * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
- */
- ChangeConstIterator find_change(
- const SequenceNumber_t& seq_num) const;
- };
- } /* namespace rtps */
- } /* namespace fastrtps */
- } /* namespace eprosima */
- #endif
- #endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */
|