123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- /**
- * @file StatefulWriter.h
- */
- #ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
- #define _FASTDDS_RTPS_STATEFULWRITER_H_
- #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
- #include <fastdds/rtps/writer/RTPSWriter.h>
- #include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
- #include <condition_variable>
- #include <mutex>
- namespace eprosima {
- namespace fastrtps {
- namespace rtps {
- class ReaderProxy;
- class TimedEvent;
- /**
- * Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.
- * @ingroup WRITER_MODULE
- */
- class StatefulWriter : public RTPSWriter
- {
- friend class RTPSParticipantImpl;
- friend class ReaderProxy;
- public:
- //!Destructor
- virtual ~StatefulWriter();
- protected:
- //!Constructor
- StatefulWriter(
- RTPSParticipantImpl*,
- const GUID_t& guid,
- const WriterAttributes& att,
- WriterHistory* hist,
- WriterListener* listen = nullptr);
- private:
- //!Timed Event to manage the periodic HB to the Reader.
- TimedEvent* periodic_hb_event_;
- //! Timed Event to manage the Acknack response delay.
- TimedEvent* nack_response_event_;
- //! A timed event to mark samples as acknowledget (used only if disable positive ACKs QoS is enabled)
- TimedEvent* ack_event_;
- //!Count of the sent heartbeats.
- Count_t m_heartbeatCount;
- //!WriterTimes
- WriterTimes m_times;
- //! Vector containing all the active ReaderProxies.
- ResourceLimitedVector<ReaderProxy*> matched_readers_;
- //! Vector containing all the inactive, ready for reuse, ReaderProxies.
- ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;
- using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
- using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;
- //!To avoid notifying twice of the same sequence number
- SequenceNumber_t next_all_acked_notify_sequence_;
- // TODO Join this mutex when main mutex would not be recursive.
- std::mutex all_acked_mutex_;
- std::condition_variable all_acked_cond_;
- // TODO Also remove when main mutex not recursive.
- bool all_acked_;
- std::condition_variable_any may_remove_change_cond_;
- unsigned int may_remove_change_;
- public:
- /**
- * Add a specific change to all ReaderLocators.
- * @param p Pointer to the change.
- * @param max_blocking_time
- */
- void unsent_change_added_to_history(
- CacheChange_t* p,
- const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
- /**
- * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
- * @param a_change Pointer to the change that is going to be removed.
- * @return True if removed correctly.
- */
- bool change_removed_by_history(
- CacheChange_t* a_change) override;
- /**
- * Method to indicate that there are changes not sent in some of all ReaderProxy.
- */
- void send_any_unsent_changes() override;
- /**
- * Sends a change directly to a intraprocess reader.
- */
- bool intraprocess_delivery(
- CacheChange_t* change,
- ReaderProxy* reader_proxy);
- bool intraprocess_gap(
- ReaderProxy* reader_proxy,
- const SequenceNumber_t& seq_num);
- bool intraprocess_heartbeat(
- ReaderProxy* reader_proxy,
- bool liveliness = false);
- //!Increment the HB count.
- inline void incrementHBCount()
- {
- ++m_heartbeatCount;
- }
- /**
- * Add a matched reader.
- * @param data Pointer to the ReaderProxyData object added.
- * @return True if added.
- */
- bool matched_reader_add(
- const ReaderProxyData& data) override;
- /**
- * Remove a matched reader.
- * @param reader_guid GUID of the reader to remove.
- * @return True if removed.
- */
- bool matched_reader_remove(
- const GUID_t& reader_guid) override;
- /**
- * Tells us if a specific Reader is matched against this writer
- * @param reader_guid GUID of the reader to check.
- * @return True if it was matched.
- */
- bool matched_reader_is_matched(
- const GUID_t& reader_guid) override;
- bool is_acked_by_all(
- const CacheChange_t* a_change) const override;
- bool wait_for_all_acked(
- const Duration_t& max_wait) override;
- bool all_readers_updated();
- /**
- * Remove the change with the minimum SequenceNumber
- * @return True if removed.
- */
- bool try_remove_change(
- const std::chrono::steady_clock::time_point& max_blocking_time_point,
- std::unique_lock<RecursiveTimedMutex>& lock) override;
- /**
- * Update the Attributes of the Writer.
- * @param att New attributes
- */
- void updateAttributes(
- const WriterAttributes& att) override;
- /**
- * Find a Reader Proxy in this writer.
- * @param[in] readerGuid The GUID_t of the reader.
- * @param[out] RP Pointer to pointer to return the ReaderProxy.
- * @return True if correct.
- */
- bool matched_reader_lookup(
- GUID_t& readerGuid,
- ReaderProxy** RP);
- /** Get count of heartbeats
- * @return count of heartbeats
- */
- inline Count_t getHeartbeatCount() const
- {
- return this->m_heartbeatCount;
- }
- /**
- * Get the RTPS participant
- * @return RTPS participant
- */
- inline RTPSParticipantImpl* getRTPSParticipant() const
- {
- return mp_RTPSParticipant;
- }
- /**
- * Get the number of matched readers
- * @return Number of the matched readers
- */
- inline size_t getMatchedReadersSize() const
- {
- std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
- return matched_readers_.size();
- }
- /**
- * @brief Returns true if disable positive ACKs QoS is enabled
- * @return True if positive acks are disabled, false otherwise
- */
- inline bool get_disable_positive_acks() const
- {
- return disable_positive_acks_;
- }
- /**
- * Update the WriterTimes attributes of all associated ReaderProxy.
- * @param times WriterTimes parameter.
- */
- void updateTimes(
- const WriterTimes& times);
- void add_flow_controller(
- std::unique_ptr<FlowController> controller) override;
- SequenceNumber_t next_sequence_number() const;
- /**
- * @brief Sends a periodic heartbeat
- * @param final Final flag
- * @param liveliness Liveliness flag
- * @return True on success
- */
- bool send_periodic_heartbeat(
- bool final = false,
- bool liveliness = false);
- /*!
- * @brief Sends a heartbeat to a remote reader.
- * @remarks This function is non thread-safe.
- */
- void send_heartbeat_to_nts(
- ReaderProxy& remoteReaderProxy,
- bool liveliness = false);
- void perform_nack_response();
- void perform_nack_supression(
- const GUID_t& reader_guid);
- /**
- * Process an incoming ACKNACK submessage.
- * @param[in] writer_guid GUID of the writer the submessage is directed to.
- * @param[in] reader_guid GUID of the reader originating the submessage.
- * @param[in] ack_count Count field of the submessage.
- * @param[in] sn_set Sequence number bitmap field of the submessage.
- * @param[in] final_flag Final flag field of the submessage.
- * @param[out] result true if the writer could process the submessage.
- * Only valid when returned value is true.
- * @return true when the submessage was destinated to this writer, false otherwise.
- */
- bool process_acknack(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumberSet_t& sn_set,
- bool final_flag,
- bool& result) override;
- /**
- * Process an incoming NACKFRAG submessage.
- * @param[in] writer_guid GUID of the writer the submessage is directed to.
- * @param[in] reader_guid GUID of the reader originating the submessage.
- * @param[in] ack_count Count field of the submessage.
- * @param[in] seq_num Sequence number field of the submessage.
- * @param[in] fragments_state Sequence number field of the submessage.
- * @param[out] result true if the writer could process the submessage.
- * Only valid when returned value is true.
- * @return true when the submessage was destinated to this writer, false otherwise.
- */
- virtual bool process_nack_frag(
- const GUID_t& writer_guid,
- const GUID_t& reader_guid,
- uint32_t ack_count,
- const SequenceNumber_t& seq_num,
- const FragmentNumberSet_t fragments_state,
- bool& result) override;
- private:
- void update_reader_info(
- bool create_sender_resources);
- void send_heartbeat_piggyback_nts_(
- ReaderProxy* reader,
- RTPSMessageGroup& message_group,
- uint32_t& last_bytes_processed);
- void send_heartbeat_nts_(
- size_t number_of_readers,
- RTPSMessageGroup& message_group,
- bool final,
- bool liveliness = false);
- void check_acked_status();
- /**
- * @brief A method called when the ack timer expires
- * @details Only used if disable positive ACKs QoS is enabled
- */
- bool ack_timer_expired();
- void send_heartbeat_to_all_readers();
- void send_changes_separatedly(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- void send_all_intraprocess_changes(
- SequenceNumber_t max_sequence);
- void send_all_unsent_changes(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- void send_unsent_changes_with_flow_control(
- SequenceNumber_t max_sequence,
- bool& activateHeartbeatPeriod);
- bool send_hole_gaps_to_group(
- RTPSMessageGroup& group);
- //! True to disable piggyback heartbeats
- bool disable_heartbeat_piggyback_;
- //! True to disable positive ACKs
- bool disable_positive_acks_;
- //! Keep duration for disable positive ACKs QoS, in microseconds
- std::chrono::duration<double, std::ratio<1, 1000000> > keep_duration_us_;
- //! Last acknowledged cache change (only used if using disable positive ACKs QoS)
- SequenceNumber_t last_sequence_number_;
- //! Biggest sequence number removed from history
- SequenceNumber_t biggest_removed_sequence_number_;
- const uint32_t sendBufferSize_;
- int32_t currentUsageSendBufferSize_;
- std::vector<std::unique_ptr<FlowController> > m_controllers;
- bool there_are_remote_readers_ = false;
- bool there_are_local_readers_ = false;
- StatefulWriter& operator =(
- const StatefulWriter&) = delete;
- };
- } /* namespace rtps */
- } /* namespace fastrtps */
- } /* namespace eprosima */
- #endif
- #endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */
|