// Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
 * @file ReaderProxy.h
 */
#ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
#define _FASTDDS_RTPS_WRITER_READERPROXY_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <algorithm>
#include <mutex>
#include <set>
#include <atomic>

#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/writer/ReaderLocator.h>

#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/FragmentNumber.h>
#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class StatefulWriter;
class TimedEvent;
class RTPSReader;

/**
 * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.
 * @ingroup WRITER_MODULE
 */
class ReaderProxy
{
public:

    ~ReaderProxy();

    /**
     * Constructor.
     * @param times WriterTimes to use in the ReaderProxy.
     * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
     * @param writer Pointer to the StatefulWriter creating the reader proxy.
     */
    ReaderProxy(
            const WriterTimes& times,
            const RemoteLocatorsAllocationAttributes& loc_alloc,
            StatefulWriter* writer);

    /**
     * Activate this proxy associating it to a remote reader.
     * @param reader_attributes ReaderProxyData of the reader for which to keep state.
     */
    void start(
            const ReaderProxyData& reader_attributes);

    /**
     * Update information about the remote reader.
     * @param reader_attributes ReaderProxyData with updated information of the reader.
     * @return true if data was modified, false otherwise.
     */
    bool update(
            const ReaderProxyData& reader_attributes);

    /**
     * Disable this proxy.
     */
    void stop();

    /**
     * Called when a change is added to the writer's history.
     * @param change Information regarding the change added.
     * @param restart_nack_supression Whether nack-supression event should be restarted.
     */
    void add_change(
            const ChangeForReader_t& change,
            bool restart_nack_supression);

    void add_change(
            const ChangeForReader_t& change,
            bool restart_nack_supression,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

    /**
     * Check if there are changes pending for this reader.
     * @return true when there are pending changes, false otherwise.
     */
    bool has_changes() const;

    /**
     * Check if a specific change has been already acknowledged for this reader.
     * @param seq_num Sequence number of the change to be checked.
     * @return true when the change is irrelevant or has been already acknowledged, false otherwise.
     */
    bool change_is_acked(
            const SequenceNumber_t& seq_num) const;

    /**
     * Check if a specific change is marked to be sent to this reader.
     * @param[in]  seq_num Sequence number of the change to be checked.
     * @param[out] is_irrelevant Will be forced to true if change is irrelevant for this reader.
     * @return true when the change is marked to be sent, false otherwise.
     */
    bool change_is_unsent(
            const SequenceNumber_t& seq_num,
            bool& is_irrelevant) const;

    /**
     * Mark all changes up to the one indicated by seq_num as Acknowledged.
     * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
     * @param seq_num Sequence number of the first change not to be marked as acknowledged.
     */
    void acked_changes_set(
            const SequenceNumber_t& seq_num);

    /**
     * Mark all changes in the vector as requested.
     * @param seq_num_set Bitmap of sequence numbers.
     * @return true if at least one change has been marked as REQUESTED, false otherwise.
     */
    bool requested_changes_set(
            const SequenceNumberSet_t& seq_num_set);

    /**
     * Performs processing of preemptive acknack
     * @return true if a heartbeat should be sent, false otherwise.
     */
    bool process_initial_acknack();

    /**
     * Applies the given function object to every unsent change.
     * @param max_seq Maximum sequence number to be considered without including it.
     * @param f Function to apply.
     *          Will receive a SequenceNumber_t and a ChangeForReader_t*.
     *          The second argument may be nullptr for irrelevant changes.
     */
    template <class BinaryFunction>
    void for_each_unsent_change(
            const SequenceNumber_t& max_seq,
            BinaryFunction f) const
    {
        if (!changes_for_reader_.empty())
        {
            SequenceNumber_t current_seq = changes_low_mark_ + 1;
            ChangeConstIterator it = changes_for_reader_.begin();
            while (it != changes_for_reader_.end())
            {
                // Holes before this change are informed as irrelevant.
                SequenceNumber_t change_seq = it->getSequenceNumber();
                for (; current_seq < change_seq; ++current_seq)
                {
                    f(current_seq, nullptr);
                }

                // We then inform of this change if it is unsent, and go to the next one.
                if (it->getStatus() == UNSENT)
                {
                    f(current_seq, &(*it));
                }
                ++current_seq;
                ++it;
            }

            // After the last change has been checked, there may be a hole at the end.
            for (; current_seq < max_seq; ++current_seq)
            {
                f(current_seq, nullptr);
            }
        }
        else
        {
            // This may be entered if all changes where removed before being acknowledged.
            for (SequenceNumber_t seq = changes_low_mark_ + 1; seq < max_seq; ++seq)
            {
                f(seq, nullptr);
            }
        }
    }

    /*!
     * @brief Sets a change to a particular status (if present in the ReaderProxy)
     * @param seq_num Sequence number of the change to update.
     * @param status Status to apply.
     * @param restart_nack_supression Whether nack supression event should be restarted or not.
     * @return true when a status has changed, false otherwise.
     */
    bool set_change_to_status(
            const SequenceNumber_t& seq_num,
            ChangeForReaderStatus_t status,
            bool restart_nack_supression);

    /**
     * @brief Mark a particular fragment as sent.
     * @param[in]  seq_num Sequence number of the change to update.
     * @param[in]  frag_num Fragment number to mark as sent.
     * @param[out] was_last_fragment Indicates if the fragment was the last one pending.
     * @return true when the change was found, false otherwise.
     */
    bool mark_fragment_as_sent_for_change(
            const SequenceNumber_t& seq_num,
            FragmentNumber_t frag_num,
            bool& was_last_fragment);

    /**
     * Turns all UNDERWAY changes into UNACKNOWLEDGED.
     * @return true if at least one change changed its status, false otherwise.
     */
    bool perform_nack_supression();

    /**
     * Turns all REQUESTED changes into UNSENT.
     * @return true if at least one change changed its status, false otherwise.
     */
    bool perform_acknack_response();

    /**
     * Call this to inform a change was removed from history.
     * @param seq_num Sequence number of the removed change.
     */
    void change_has_been_removed(
            const SequenceNumber_t& seq_num);

    /*!
     * @brief Returns there is some UNACKNOWLEDGED change.
     * @return There is some UNACKNOWLEDGED change.
     */
    bool has_unacknowledged() const;

    /**
     * Get the GUID of the reader represented by this proxy.
     * @return the GUID of the reader represented by this proxy.
     */
    inline const GUID_t& guid() const
    {
        return locator_info_.remote_guid();
    }

    /**
     * Get the durability of the reader represented by this proxy.
     * @return the durability of the reader represented by this proxy.
     */
    inline DurabilityKind_t durability_kind() const
    {
        return durability_kind_;
    }

    /**
     * Check if the reader represented by this proxy expexts inline QOS to be received.
     * @return true if the reader represented by this proxy expexts inline QOS to be received.
     */
    inline bool expects_inline_qos() const
    {
        return expects_inline_qos_;
    }

    /**
     * Check if the reader represented by this proxy is reliable.
     * @return true if the reader represented by this proxy is reliable.
     */
    inline bool is_reliable() const
    {
        return is_reliable_;
    }

    inline bool disable_positive_acks() const
    {
        return disable_positive_acks_;
    }

    /**
     * Check if the reader represented by this proxy is remote and reliable.
     * @return true if the reader represented by this proxy is remote and reliable.
     */
    inline bool is_remote_and_reliable() const
    {
        return !locator_info_.is_local_reader() && is_reliable_;
    }

    /**
     * Check if the reader is on the same process.
     * @return true if the reader is no the same process.
     */
    inline bool is_local_reader()
    {
        return locator_info_.is_local_reader();
    }

    /**
     * Get the local reader on the same process (if any).
     * @return The local reader on the same process.
     */
    inline RTPSReader* local_reader()
    {
        return locator_info_.local_reader();
    }

    /**
     * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
     * @param acknack_count The count of the received ACKNACK.
     * @return true if internal count changed (i.e. new ACKNACK is accepted)
     */
    bool check_and_set_acknack_count(
            uint32_t acknack_count)
    {
        if (last_acknack_count_ < acknack_count)
        {
            last_acknack_count_ = acknack_count;
            return true;
        }

        return false;
    }

    /**
     * Process an incoming NACKFRAG submessage.
     * @param reader_guid Destination guid of the submessage.
     * @param nack_count Counter field of the submessage.
     * @param seq_num Sequence number field of the submessage.
     * @param fragments_state Bitmap indicating the requested fragments.
     * @return true if a change was modified, false otherwise.
     */
    bool process_nack_frag(
            const GUID_t& reader_guid,
            uint32_t nack_count,
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t& fragments_state);

    /**
     * Filter a CacheChange_t, in this version always returns true.
     * @param change
     * @return true if the change is relevant, false otherwise.
     */
    inline bool rtps_is_relevant(
            CacheChange_t* change)
    {
        (void)change; return true;
    }

    /**
     * Get the highest fully acknowledged sequence number.
     * @return the highest fully acknowledged sequence number.
     */
    SequenceNumber_t changes_low_mark() const
    {
        return changes_low_mark_;
    }

    /**
     * Change the interval of nack-supression event.
     * @param interval Time from data sending to acknack processing.
     */
    void update_nack_supression_interval(
            const Duration_t& interval);

    /**
     * Check if there are gaps in the list of ChangeForReader_t.
     * @return True if there are gaps, else false.
     */
    bool are_there_gaps();

    LocatorSelectorEntry* locator_selector_entry()
    {
        return locator_info_.locator_selector_entry();
    }

    const RTPSMessageSenderInterface& message_sender() const
    {
        return locator_info_;
    }

private:

    //!Is this proxy active? I.e. does it have a remote reader associated?
    bool is_active_;
    //!Reader locator information
    ReaderLocator locator_info_;
    //!Taken from QoS
    DurabilityKind_t durability_kind_;
    //!Taken from QoS
    bool expects_inline_qos_;
    //!Taken from QoS
    bool is_reliable_;
    //!Taken from QoS
    bool disable_positive_acks_;
    //!Pointer to the associated StatefulWriter.
    StatefulWriter* writer_;
    //!Set of the changes and its state.
    ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
    //! Timed Event to manage the delay to mark a change as UNACKED after sending it.
    TimedEvent* nack_supression_event_;
    TimedEvent* initial_heartbeat_event_;
    //! Are timed events enabled?
    std::atomic_bool timers_enabled_;
    //! Last ack/nack count
    uint32_t last_acknack_count_;
    //! Last  NACKFRAG count.
    uint32_t last_nackfrag_count_;

    SequenceNumber_t changes_low_mark_;

    using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
    using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;

    void disable_timers();

    /*
     * Converts all changes with a given status to a different status.
     * @param previous Status to change.
     * @param next Status to adopt.
     * @return true when at least one change has been modified, false otherwise.
     */
    bool convert_status_on_all_changes(
            ChangeForReaderStatus_t previous,
            ChangeForReaderStatus_t next);

    /*!
     * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay.
     * @param[in] seq_num Sequence number to be paired with the requested fragments.
     * @param[in] frag_set set containing the requested fragments to be sent.
     * @return True if there is at least one requested fragment. False in other case.
     */
    bool requested_fragment_set(
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t& frag_set);

    void add_change(
            const ChangeForReader_t& change);

    /**
     * @brief Find a change with the specified sequence number.
     * @param seq_num Sequence number to find.
     * @param exact When false, the first change with a sequence number not less than seq_num will be returned.
     * When true, the change with a sequence number value of seq_num will be returned.
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
     */
    ChangeIterator find_change(
            const SequenceNumber_t& seq_num,
            bool exact);

    /**
     * @brief Find a change with the specified sequence number.
     * @param seq_num Sequence number to find.
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
     */
    ChangeConstIterator find_change(
            const SequenceNumber_t& seq_num) const;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif
#endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */