C++程序  |  1060行  |  38.44 KB

/*
 * Copyright (C) 2018 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//#define LOG_NDEBUG 0
#define LOG_TAG "Codec2-Component"
#include <android-base/logging.h>

#include <C2PlatformSupport.h>
#include <codec2/hidl/1.0/Component.h>
#include <codec2/hidl/1.0/ComponentStore.h>
#include <codec2/hidl/1.0/types.h>

#include <hidl/HidlBinderSupport.h>
#include <utils/Timers.h>

#include <C2BqBufferPriv.h>
#include <C2Debug.h>
#include <C2PlatformSupport.h>

#include <chrono>
#include <thread>

namespace hardware {
namespace google {
namespace media {
namespace c2 {
namespace V1_0 {
namespace utils {

using namespace ::android;

namespace /* unnamed */ {

// Implementation of ConfigurableC2Intf based on C2ComponentInterface
struct CompIntf : public ConfigurableC2Intf {
    CompIntf(const std::shared_ptr<C2ComponentInterface>& intf) :
        ConfigurableC2Intf(intf->getName()),
        mIntf(intf) {
    }

    virtual c2_status_t config(
            const std::vector<C2Param*>& params,
            c2_blocking_t mayBlock,
            std::vector<std::unique_ptr<C2SettingResult>>* const failures
            ) override {
        ALOGV("config");
        return mIntf->config_vb(params, mayBlock, failures);
    }

    virtual c2_status_t query(
            const std::vector<C2Param::Index>& indices,
            c2_blocking_t mayBlock,
            std::vector<std::unique_ptr<C2Param>>* const params
            ) const override {
        ALOGV("query");
        return mIntf->query_vb({}, indices, mayBlock, params);
    }

    virtual c2_status_t querySupportedParams(
            std::vector<std::shared_ptr<C2ParamDescriptor>>* const params
            ) const override {
        ALOGV("querySupportedParams");
        return mIntf->querySupportedParams_nb(params);
    }

    virtual c2_status_t querySupportedValues(
            std::vector<C2FieldSupportedValuesQuery>& fields,
            c2_blocking_t mayBlock) const override {
        ALOGV("querySupportedValues");
        return mIntf->querySupportedValues_vb(fields, mayBlock);
    }

protected:
    std::shared_ptr<C2ComponentInterface> mIntf;
};

} // unnamed namespace

// InputBufferManager
// ==================
//
// InputBufferManager presents a way to track and untrack input buffers in this
// (codec) process and send a notification to a listener, possibly in a
// different process, when a tracked buffer no longer has any references in this
// process. (In fact, this class would work for listeners in the same process
// too, but the optimization discussed below will not be beneficial.)
//
// InputBufferManager holds a collection of records representing tracked buffers
// and their callback listeners. Conceptually, one record is a triple (listener,
// frameIndex, bufferIndex) where
//
// - (frameIndex, bufferIndex) is a pair of indices used to identify the buffer.
// - listener is of type IComponentListener. Its onFramesRendered() function
//   will be called after the associated buffer dies. The argument of
//   onFramesRendered() is a list of RenderedFrame objects, each of which has
//   the following members:
//
//     uint64_t bufferQueueId
//     int32_t  slotId
//     int64_t  timestampNs
//
// When a tracked buffer associated to the triple (listener, frameIndex,
// bufferIndex) goes out of scope, listener->onFramesRendered() will be called
// with a RenderedFrame object whose members are set as follows:
//
//     bufferQueueId = frameIndex
//     slotId        = ~bufferIndex
//     timestampNs   = systemTime() at the time of notification
//
// The reason for the bitwise negation of bufferIndex is that onFramesRendered()
// may be used for a different purpose when slotId is non-negative (which is a
// more general use case).
//
// IPC Optimization
// ----------------
//
// Since onFramesRendered() generally is an IPC call, InputBufferManager tries
// not to call it too often. There is a mechanism to guarantee that any two
// calls to the same listener are at least kNotificationPeriodNs nanoseconds
// apart.
//
struct InputBufferManager {
    // The minimum time period between IPC calls to notify the client about the
    // destruction of input buffers.
    static constexpr nsecs_t kNotificationPeriodNs = 1000000;

    // Track all buffers in a C2FrameData object.
    //
    // input (C2FrameData) has the following two members that are of interest:
    //
    //   C2WorkOrdinal                ordinal
    //   vector<shared_ptr<C2Buffer>> buffers
    //
    // Calling registerFrameData(listener, input) will register multiple
    // triples (, frameIndex, bufferIndex) where frameIndex is equal to
    // input.ordinal.frameIndex and bufferIndex runs through the indices of
    // input.buffers such that input.buffers[bufferIndex] is not null.
    //
    // This should be called from queue().
    static void registerFrameData(
            const sp<IComponentListener>& listener,
            const C2FrameData& input);

    // Untrack all buffers in a C2FrameData object.
    //
    // Calling unregisterFrameData(listener, input) will unregister and remove
    // pending notifications for all triples (l, fi, bufferIndex) such that
    // l = listener and fi = input.ordinal.frameIndex.
    //
    // This should be called from onWorkDone() and flush().
    static void unregisterFrameData(
            const wp<IComponentListener>& listener,
            const C2FrameData& input);

    // Untrack all buffers associated to a given listener.
    //
    // Calling unregisterFrameData(listener) will unregister and remove
    // pending notifications for all triples (l, frameIndex, bufferIndex) such
    // that l = listener.
    //
    // This should be called when the component cleans up all input buffers,
    // i.e., when reset(), release(), stop() or ~Component() is called.
    static void unregisterFrameData(
            const wp<IComponentListener>& listener);

private:
    void _registerFrameData(
            const sp<IComponentListener>& listener,
            const C2FrameData& input);
    void _unregisterFrameData(
            const wp<IComponentListener>& listener,
            const C2FrameData& input);
    void _unregisterFrameData(
            const wp<IComponentListener>& listener);

    // The callback function tied to C2Buffer objects.
    //
    // Note: This function assumes that sInstance is the only instance of this
    //       class.
    static void onBufferDestroyed(const C2Buffer* buf, void* arg);
    void _onBufferDestroyed(const C2Buffer* buf, void* arg);

    // Comparison operator for weak pointers.
    struct CompareWeakComponentListener {
        constexpr bool operator()(
                const wp<IComponentListener>& x,
                const wp<IComponentListener>& y) const {
            return x.get_refs() < y.get_refs();
        }
    };

    // Persistent data to be passed as "arg" in onBufferDestroyed().
    // This is essentially the triple (listener, frameIndex, bufferIndex) plus a
    // weak pointer to the C2Buffer object.
    //
    // Note that the "key" is bufferIndex according to operator<(). This is
    // designed to work with TrackedBuffersMap defined below.
    struct TrackedBuffer {
        wp<IComponentListener> listener;
        uint64_t frameIndex;
        size_t bufferIndex;
        std::weak_ptr<C2Buffer> buffer;
        TrackedBuffer(const wp<IComponentListener>& listener,
                      uint64_t frameIndex,
                      size_t bufferIndex,
                      const std::shared_ptr<C2Buffer>& buffer)
              : listener(listener),
                frameIndex(frameIndex),
                bufferIndex(bufferIndex),
                buffer(buffer) {}
        TrackedBuffer(const TrackedBuffer&) = default;
        bool operator<(const TrackedBuffer& other) const {
            return bufferIndex < other.bufferIndex;
        }
    };

    // Map: listener -> frameIndex -> set<TrackedBuffer>.
    // Essentially, this is used to store triples (listener, frameIndex,
    // bufferIndex) that's searchable by listener and (listener, frameIndex).
    // However, the value of the innermost map is TrackedBuffer, which also
    // contains an extra copy of listener and frameIndex. This is needed
    // because onBufferDestroyed() needs to know listener and frameIndex too.
    typedef std::map<wp<IComponentListener>,
                     std::map<uint64_t,
                              std::set<TrackedBuffer>>,
                     CompareWeakComponentListener> TrackedBuffersMap;

    // Storage for pending (unsent) death notifications for one listener.
    // Each pair in member named "indices" are (frameIndex, bufferIndex) from
    // the (listener, frameIndex, bufferIndex) triple.
    struct DeathNotifications {

        // The number of pending notifications for this listener.
        // count may be 0, in which case the DeathNotifications object will
        // remain valid for only a small period (kNotificationPeriodNs
        // nanoseconds).
        size_t count;

        // The timestamp of the most recent callback on this listener. This is
        // used to guarantee that callbacks do not occur too frequently, and
        // also to trigger expiration of a DeathNotifications object that has
        // count = 0.
        nsecs_t lastSentNs;

        // Map: frameIndex -> vector of bufferIndices
        // This is essentially a collection of (framdeIndex, bufferIndex).
        std::map<uint64_t, std::vector<size_t>> indices;

        DeathNotifications()
              : count(0),
                lastSentNs(systemTime() - kNotificationPeriodNs),
                indices() {}
    };

    // Mutex for the management of all input buffers.
    std::mutex mMutex;

    // Tracked input buffers.
    TrackedBuffersMap mTrackedBuffersMap;

    // Death notifications to be sent.
    //
    // A DeathNotifications object is associated to each listener. An entry in
    // this map will be removed if its associated DeathNotifications has count =
    // 0 and lastSentNs < systemTime() - kNotificationPeriodNs.
    std::map<wp<IComponentListener>, DeathNotifications> mDeathNotifications;

    // Condition variable signaled when an entry is added to mDeathNotifications.
    std::condition_variable mOnBufferDestroyed;

    // Notify the clients about buffer destructions.
    // Return false if all destructions have been notified.
    // Return true and set timeToRetry to the duration to wait for before
    // retrying if some destructions have not been notified.
    bool processNotifications(nsecs_t* timeToRetryNs);

    // Main function for the input buffer manager thread.
    void main();

    // The thread that manages notifications.
    //
    // Note: This variable is declared last so its initialization will happen
    // after all other member variables have been initialized.
    std::thread mMainThread;

    // Private constructor.
    InputBufferManager();

    // The only instance of this class.
    static InputBufferManager& getInstance();

};

// ComponentInterface
ComponentInterface::ComponentInterface(
        const std::shared_ptr<C2ComponentInterface>& intf,
        const sp<ComponentStore>& store) :
    Configurable(new CachedConfigurable(std::make_unique<CompIntf>(intf))),
    mInterface(intf) {
    mInit = init(store.get());
}

c2_status_t ComponentInterface::status() const {
    return mInit;
}

// ComponentListener wrapper
struct Component::Listener : public C2Component::Listener {

    Listener(const sp<Component>& component) :
        mComponent(component),
        mListener(component->mListener) {
    }

    virtual void onError_nb(
            std::weak_ptr<C2Component> /* c2component */,
            uint32_t errorCode) override {
        ALOGV("onError");
        sp<IComponentListener> listener = mListener.promote();
        if (listener) {
            Return<void> transStatus = listener->onError(Status::OK, errorCode);
            if (!transStatus.isOk()) {
                ALOGE("onError -- transaction failed.");
            }
        }
    }

    virtual void onTripped_nb(
            std::weak_ptr<C2Component> /* c2component */,
            std::vector<std::shared_ptr<C2SettingResult>> c2settingResult
            ) override {
        ALOGV("onTripped");
        sp<IComponentListener> listener = mListener.promote();
        if (listener) {
            hidl_vec<SettingResult> settingResults(c2settingResult.size());
            size_t ix = 0;
            for (const std::shared_ptr<C2SettingResult> &c2result :
                    c2settingResult) {
                if (c2result) {
                    if (objcpy(&settingResults[ix++], *c2result) !=
                            Status::OK) {
                        break;
                    }
                }
            }
            settingResults.resize(ix);
            Return<void> transStatus = listener->onTripped(settingResults);
            if (!transStatus.isOk()) {
                ALOGE("onTripped -- transaction failed.");
            }
        }
    }

    virtual void onWorkDone_nb(
            std::weak_ptr<C2Component> /* c2component */,
            std::list<std::unique_ptr<C2Work>> c2workItems) override {
        ALOGV("onWorkDone");
        for (const std::unique_ptr<C2Work>& work : c2workItems) {
            if (work) {
                if (work->worklets.empty()
                        || !work->worklets.back()
                        || (work->worklets.back()->output.flags &
                            C2FrameData::FLAG_INCOMPLETE) == 0) {
                    InputBufferManager::
                            unregisterFrameData(mListener, work->input);
                }
            }
        }

        sp<IComponentListener> listener = mListener.promote();
        if (listener) {
            WorkBundle workBundle;

            sp<Component> strongComponent = mComponent.promote();
            if (objcpy(&workBundle, c2workItems, strongComponent ?
                    &strongComponent->mBufferPoolSender : nullptr)
                    != Status::OK) {
                ALOGE("onWorkDone() received corrupted work items.");
                return;
            }
            Return<void> transStatus = listener->onWorkDone(workBundle);
            if (!transStatus.isOk()) {
                ALOGE("onWorkDone -- transaction failed.");
                return;
            }
            yieldBufferQueueBlocks(c2workItems, true);
        }
    }

protected:
    wp<Component> mComponent;
    wp<IComponentListener> mListener;
};

// Component
Component::Component(
        const std::shared_ptr<C2Component>& component,
        const sp<IComponentListener>& listener,
        const sp<ComponentStore>& store,
        const sp<::android::hardware::media::bufferpool::V1_0::
        IClientManager>& clientPoolManager) :
    Configurable(new CachedConfigurable(
            std::make_unique<CompIntf>(component->intf()))),
    mComponent(component),
    mInterface(component->intf()),
    mListener(listener),
    mStore(store),
    mBufferPoolSender(clientPoolManager) {
    // Retrieve supported parameters from store
    // TODO: We could cache this per component/interface type
    mInit = init(store.get());
}

c2_status_t Component::status() const {
    return mInit;
}

// Methods from ::android::hardware::media::c2::V1_0::IComponent
Return<Status> Component::queue(const WorkBundle& workBundle) {
    ALOGV("queue -- converting input");
    std::list<std::unique_ptr<C2Work>> c2works;

    if (objcpy(&c2works, workBundle) != C2_OK) {
        ALOGV("queue -- corrupted");
        return Status::CORRUPTED;
    }

    // Register input buffers.
    for (const std::unique_ptr<C2Work>& work : c2works) {
        if (work) {
            InputBufferManager::
                    registerFrameData(mListener, work->input);
        }
    }

    ALOGV("queue -- calling");
    return static_cast<Status>(mComponent->queue_nb(&c2works));
}

Return<void> Component::flush(flush_cb _hidl_cb) {
    std::list<std::unique_ptr<C2Work>> c2flushedWorks;
    ALOGV("flush -- calling");
    c2_status_t c2res = mComponent->flush_sm(
            C2Component::FLUSH_COMPONENT,
            &c2flushedWorks);

    // Unregister input buffers.
    for (const std::unique_ptr<C2Work>& work : c2flushedWorks) {
        if (work) {
            if (work->worklets.empty()
                    || !work->worklets.back()
                    || (work->worklets.back()->output.flags &
                        C2FrameData::FLAG_INCOMPLETE) == 0) {
                InputBufferManager::
                        unregisterFrameData(mListener, work->input);
            }
        }
    }

    WorkBundle flushedWorkBundle;
    Status res = static_cast<Status>(c2res);
    if (c2res == C2_OK) {
        ALOGV("flush -- converting output");
        res = objcpy(&flushedWorkBundle, c2flushedWorks, &mBufferPoolSender);
    }
    _hidl_cb(res, flushedWorkBundle);
    yieldBufferQueueBlocks(c2flushedWorks, true);
    return Void();
}

Return<Status> Component::drain(bool withEos) {
    ALOGV("drain");
    return static_cast<Status>(mComponent->drain_nb(withEos ?
            C2Component::DRAIN_COMPONENT_WITH_EOS :
            C2Component::DRAIN_COMPONENT_NO_EOS));
}

Return<Status> Component::setOutputSurface(
        uint64_t blockPoolId,
        const sp<HGraphicBufferProducer>& surface) {
    std::shared_ptr<C2BlockPool> pool;
    GetCodec2BlockPool(blockPoolId, mComponent, &pool);
    if (pool && pool->getAllocatorId() == C2PlatformAllocatorStore::BUFFERQUEUE) {
        std::shared_ptr<C2BufferQueueBlockPool> bqPool =
                std::static_pointer_cast<C2BufferQueueBlockPool>(pool);
        C2BufferQueueBlockPool::OnRenderCallback cb =
            [this](uint64_t producer, int32_t slot, int64_t nsecs) {
                // TODO: batch this
                hidl_vec<IComponentListener::RenderedFrame> rendered;
                rendered.resize(1);
                rendered[0] = { producer, slot, nsecs };
                (void)mListener->onFramesRendered(rendered).isOk();
        };
        if (bqPool) {
            bqPool->setRenderCallback(cb);
            bqPool->configureProducer(surface);
        }
    }
    return Status::OK;
}

Return<Status> Component::connectToOmxInputSurface(
        const sp<HGraphicBufferProducer>& producer,
        const sp<::android::hardware::media::omx::V1_0::
        IGraphicBufferSource>& source) {
    // TODO implement
    (void)producer;
    (void)source;
    return Status::OMITTED;
}

Return<Status> Component::disconnectFromInputSurface() {
    // TODO implement
    return Status::OK;
}

namespace /* unnamed */ {

struct BlockPoolIntf : public ConfigurableC2Intf {
    BlockPoolIntf(const std::shared_ptr<C2BlockPool>& pool) :
        ConfigurableC2Intf("C2BlockPool:" +
                           (pool ? std::to_string(pool->getLocalId()) :
                           "null")),
        mPool(pool) {
    }

    virtual c2_status_t config(
            const std::vector<C2Param*>& params,
            c2_blocking_t mayBlock,
            std::vector<std::unique_ptr<C2SettingResult>>* const failures
            ) override {
        (void)params;
        (void)mayBlock;
        (void)failures;
        return C2_OK;
    }

    virtual c2_status_t query(
            const std::vector<C2Param::Index>& indices,
            c2_blocking_t mayBlock,
            std::vector<std::unique_ptr<C2Param>>* const params
            ) const override {
        (void)indices;
        (void)mayBlock;
        (void)params;
        return C2_OK;
    }

    virtual c2_status_t querySupportedParams(
            std::vector<std::shared_ptr<C2ParamDescriptor>>* const params
            ) const override {
        (void)params;
        return C2_OK;
    }

    virtual c2_status_t querySupportedValues(
            std::vector<C2FieldSupportedValuesQuery>& fields,
            c2_blocking_t mayBlock) const override {
        (void)fields;
        (void)mayBlock;
        return C2_OK;
    }

protected:
    std::shared_ptr<C2BlockPool> mPool;
};

} // unnamed namespace

Return<void> Component::createBlockPool(
        uint32_t allocatorId,
        createBlockPool_cb _hidl_cb) {
    std::shared_ptr<C2BlockPool> blockPool;
    c2_status_t status = CreateCodec2BlockPool(
            static_cast<C2PlatformAllocatorStore::id_t>(allocatorId),
            mComponent,
            &blockPool);
    if (status != C2_OK) {
        blockPool = nullptr;
    }
    if (blockPool) {
        mBlockPoolsMutex.lock();
        mBlockPools.emplace(blockPool->getLocalId(), blockPool);
        mBlockPoolsMutex.unlock();
    } else if (status == C2_OK) {
        status = C2_CORRUPTED;
    }

    _hidl_cb(static_cast<Status>(status),
            blockPool ? blockPool->getLocalId() : 0,
            new CachedConfigurable(
            std::make_unique<BlockPoolIntf>(blockPool)));
    return Void();
}

Return<Status> Component::destroyBlockPool(uint64_t blockPoolId) {
    std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
    return mBlockPools.erase(blockPoolId) == 1 ?
            Status::OK : Status::CORRUPTED;
}

Return<Status> Component::start() {
    ALOGV("start");
    return static_cast<Status>(mComponent->start());
}

Return<Status> Component::stop() {
    ALOGV("stop");
    InputBufferManager::unregisterFrameData(mListener);
    return static_cast<Status>(mComponent->stop());
}

Return<Status> Component::reset() {
    ALOGV("reset");
    Status status = static_cast<Status>(mComponent->reset());
    {
        std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
        mBlockPools.clear();
    }
    InputBufferManager::unregisterFrameData(mListener);
    return status;
}

Return<Status> Component::release() {
    ALOGV("release");
    Status status = static_cast<Status>(mComponent->release());
    {
        std::lock_guard<std::mutex> lock(mBlockPoolsMutex);
        mBlockPools.clear();
    }
    InputBufferManager::unregisterFrameData(mListener);
    return status;
}

void Component::setLocalId(const Component::LocalId& localId) {
    mLocalId = localId;
}

void Component::initListener(const sp<Component>& self) {
    std::shared_ptr<C2Component::Listener> c2listener =
            std::make_shared<Listener>(self);
    c2_status_t res = mComponent->setListener_vb(c2listener, C2_DONT_BLOCK);
    if (res != C2_OK) {
        mInit = res;
    }
}

Component::~Component() {
    InputBufferManager::unregisterFrameData(mListener);
    mStore->reportComponentDeath(mLocalId);
}

Component::InterfaceKey::InterfaceKey(const sp<IComponent>& component) {
    isRemote = component->isRemote();
    if (isRemote) {
        remote = ::android::hardware::toBinder(component);
    } else {
        local = component;
    }
}

// InputBufferManager implementation

constexpr nsecs_t InputBufferManager::kNotificationPeriodNs;

void InputBufferManager::registerFrameData(
        const sp<IComponentListener>& listener,
        const C2FrameData& input) {
    getInstance()._registerFrameData(listener, input);
}

void InputBufferManager::unregisterFrameData(
        const wp<IComponentListener>& listener,
        const C2FrameData& input) {
    getInstance()._unregisterFrameData(listener, input);
}

void InputBufferManager::unregisterFrameData(
        const wp<IComponentListener>& listener) {
    getInstance()._unregisterFrameData(listener);
}

void InputBufferManager::_registerFrameData(
        const sp<IComponentListener>& listener,
        const C2FrameData& input) {
    uint64_t frameIndex = input.ordinal.frameIndex.peeku();
    ALOGV("InputBufferManager::_registerFrameData called "
          "(listener @ %p, frameIndex = %llu)",
          listener.get(),
          static_cast<long long unsigned>(frameIndex));
    std::lock_guard<std::mutex> lock(mMutex);

    std::set<TrackedBuffer> &bufferIds =
            mTrackedBuffersMap[listener][frameIndex];

    for (size_t i = 0; i < input.buffers.size(); ++i) {
        if (!input.buffers[i]) {
            ALOGV("InputBufferManager::_registerFrameData: "
                  "Input buffer at index %zu is null", i);
            continue;
        }
        const TrackedBuffer &bufferId =
                *bufferIds.emplace(listener, frameIndex, i, input.buffers[i]).
                first;

        c2_status_t status = input.buffers[i]->registerOnDestroyNotify(
                onBufferDestroyed,
                const_cast<void*>(reinterpret_cast<const void*>(&bufferId)));
        if (status != C2_OK) {
            ALOGD("InputBufferManager: registerOnDestroyNotify failed "
                  "(listener @ %p, frameIndex = %llu, bufferIndex = %zu) "
                  "=> %s (%d)",
                  listener.get(),
                  static_cast<unsigned long long>(frameIndex),
                  i,
                  asString(status), static_cast<int>(status));
        }
    }

    mDeathNotifications.emplace(listener, DeathNotifications());
}

// Remove a pair (listener, frameIndex) from mTrackedBuffersMap and
// mDeathNotifications. This implies all bufferIndices are removed.
//
// This is called from onWorkDone() and flush().
void InputBufferManager::_unregisterFrameData(
        const wp<IComponentListener>& listener,
        const C2FrameData& input) {
    uint64_t frameIndex = input.ordinal.frameIndex.peeku();
    ALOGV("InputBufferManager::_unregisterFrameData called "
          "(listener @ %p, frameIndex = %llu)",
          listener.unsafe_get(),
          static_cast<long long unsigned>(frameIndex));
    std::lock_guard<std::mutex> lock(mMutex);

    auto findListener = mTrackedBuffersMap.find(listener);
    if (findListener != mTrackedBuffersMap.end()) {
        std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
                = findListener->second;
        auto findFrameIndex = frameIndex2BufferIds.find(frameIndex);
        if (findFrameIndex != frameIndex2BufferIds.end()) {
            std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
            for (const TrackedBuffer& bufferId : bufferIds) {
                std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
                if (buffer) {
                    c2_status_t status = buffer->unregisterOnDestroyNotify(
                            onBufferDestroyed,
                            const_cast<void*>(
                            reinterpret_cast<const void*>(&bufferId)));
                    if (status != C2_OK) {
                        ALOGD("InputBufferManager: "
                              "unregisterOnDestroyNotify failed "
                              "(listener @ %p, "
                              "frameIndex = %llu, "
                              "bufferIndex = %zu) "
                              "=> %s (%d)",
                              bufferId.listener.unsafe_get(),
                              static_cast<unsigned long long>(
                                  bufferId.frameIndex),
                              bufferId.bufferIndex,
                              asString(status), static_cast<int>(status));
                    }
                }
            }

            frameIndex2BufferIds.erase(findFrameIndex);
            if (frameIndex2BufferIds.empty()) {
                mTrackedBuffersMap.erase(findListener);
            }
        }
    }

    auto findListenerD = mDeathNotifications.find(listener);
    if (findListenerD != mDeathNotifications.end()) {
        DeathNotifications &deathNotifications = findListenerD->second;
        auto findFrameIndex = deathNotifications.indices.find(frameIndex);
        if (findFrameIndex != deathNotifications.indices.end()) {
            std::vector<size_t> &bufferIndices = findFrameIndex->second;
            deathNotifications.count -= bufferIndices.size();
            deathNotifications.indices.erase(findFrameIndex);
        }
    }
}

// Remove listener from mTrackedBuffersMap and mDeathNotifications. This implies
// all frameIndices and bufferIndices are removed.
//
// This is called when the component cleans up all input buffers, i.e., when
// reset(), release(), stop() or ~Component() is called.
void InputBufferManager::_unregisterFrameData(
        const wp<IComponentListener>& listener) {
    ALOGV("InputBufferManager::_unregisterFrameData called (listener @ %p)",
            listener.unsafe_get());
    std::lock_guard<std::mutex> lock(mMutex);

    auto findListener = mTrackedBuffersMap.find(listener);
    if (findListener != mTrackedBuffersMap.end()) {
        std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds =
                findListener->second;
        for (auto findFrameIndex = frameIndex2BufferIds.begin();
                findFrameIndex != frameIndex2BufferIds.end();
                ++findFrameIndex) {
            std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
            for (const TrackedBuffer& bufferId : bufferIds) {
                std::shared_ptr<C2Buffer> buffer = bufferId.buffer.lock();
                if (buffer) {
                    c2_status_t status = buffer->unregisterOnDestroyNotify(
                            onBufferDestroyed,
                            const_cast<void*>(
                            reinterpret_cast<const void*>(&bufferId)));
                    if (status != C2_OK) {
                        ALOGD("InputBufferManager: "
                              "unregisterOnDestroyNotify failed "
                              "(listener @ %p, "
                              "frameIndex = %llu, "
                              "bufferIndex = %zu) "
                              "=> %s (%d)",
                              bufferId.listener.unsafe_get(),
                              static_cast<unsigned long long>(bufferId.frameIndex),
                              bufferId.bufferIndex,
                              asString(status), static_cast<int>(status));
                    }
                }
            }
        }
        mTrackedBuffersMap.erase(findListener);
    }

    mDeathNotifications.erase(listener);
}

// Move a buffer from mTrackedBuffersMap to mDeathNotifications.
// This is called when a registered C2Buffer object is destroyed.
void InputBufferManager::onBufferDestroyed(const C2Buffer* buf, void* arg) {
    getInstance()._onBufferDestroyed(buf, arg);
}

void InputBufferManager::_onBufferDestroyed(const C2Buffer* buf, void* arg) {
    if (!buf || !arg) {
        ALOGW("InputBufferManager::_onBufferDestroyed called "
              "with null argument(s) (buf @ %p, arg @ %p)",
              buf, arg);
        return;
    }
    TrackedBuffer id(*reinterpret_cast<TrackedBuffer*>(arg));
    ALOGV("InputBufferManager::_onBufferDestroyed called "
          "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
          id.listener.unsafe_get(),
          static_cast<unsigned long long>(id.frameIndex),
          id.bufferIndex);

    std::lock_guard<std::mutex> lock(mMutex);

    auto findListener = mTrackedBuffersMap.find(id.listener);
    if (findListener == mTrackedBuffersMap.end()) {
        ALOGD("InputBufferManager::_onBufferDestroyed received "
              "invalid listener "
              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
              id.listener.unsafe_get(),
              static_cast<unsigned long long>(id.frameIndex),
              id.bufferIndex);
        return;
    }

    std::map<uint64_t, std::set<TrackedBuffer>> &frameIndex2BufferIds
            = findListener->second;
    auto findFrameIndex = frameIndex2BufferIds.find(id.frameIndex);
    if (findFrameIndex == frameIndex2BufferIds.end()) {
        ALOGD("InputBufferManager::_onBufferDestroyed received "
              "invalid frame index "
              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
              id.listener.unsafe_get(),
              static_cast<unsigned long long>(id.frameIndex),
              id.bufferIndex);
        return;
    }

    std::set<TrackedBuffer> &bufferIds = findFrameIndex->second;
    auto findBufferId = bufferIds.find(id);
    if (findBufferId == bufferIds.end()) {
        ALOGD("InputBufferManager::_onBufferDestroyed received "
              "invalid buffer index: "
              "(listener @ %p, frameIndex = %llu, bufferIndex = %zu)",
              id.listener.unsafe_get(),
              static_cast<unsigned long long>(id.frameIndex),
              id.bufferIndex);
    }

    bufferIds.erase(findBufferId);
    if (bufferIds.empty()) {
        frameIndex2BufferIds.erase(findFrameIndex);
        if (frameIndex2BufferIds.empty()) {
            mTrackedBuffersMap.erase(findListener);
        }
    }

    DeathNotifications &deathNotifications = mDeathNotifications[id.listener];
    deathNotifications.indices[id.frameIndex].emplace_back(id.bufferIndex);
    ++deathNotifications.count;
    mOnBufferDestroyed.notify_one();
}

// Notify the clients about buffer destructions.
// Return false if all destructions have been notified.
// Return true and set timeToRetry to the time point to wait for before
// retrying if some destructions have not been notified.
bool InputBufferManager::processNotifications(nsecs_t* timeToRetryNs) {

    struct Notification {
        sp<IComponentListener> listener;
        hidl_vec<IComponentListener::RenderedFrame> renderedFrames;
        Notification(const sp<IComponentListener>& l, size_t s)
              : listener(l), renderedFrames(s) {}
    };
    std::list<Notification> notifications;

    bool retry = false;
    {
        std::lock_guard<std::mutex> lock(mMutex);
        *timeToRetryNs = kNotificationPeriodNs;
        nsecs_t timeNowNs = systemTime();
        for (auto it = mDeathNotifications.begin();
                it != mDeathNotifications.end(); ) {
            sp<IComponentListener> listener = it->first.promote();
            if (!listener) {
                ++it;
                continue;
            }
            DeathNotifications &deathNotifications = it->second;

            nsecs_t timeSinceLastNotifiedNs =
                    timeNowNs - deathNotifications.lastSentNs;
            // If not enough time has passed since the last callback, leave the
            // notifications for this listener untouched for now and retry
            // later.
            if (timeSinceLastNotifiedNs < kNotificationPeriodNs) {
                retry = true;
                *timeToRetryNs = std::min(*timeToRetryNs,
                        kNotificationPeriodNs - timeSinceLastNotifiedNs);
                ALOGV("InputBufferManager: Notifications for "
                      "listener @ %p will be postponed.",
                      listener.get());
                ++it;
                continue;
            }

            // If enough time has passed since the last notification to this
            // listener but there are currently no pending notifications, the
            // listener can be removed from mDeathNotifications---there is no
            // need to keep track of the last notification time anymore.
            if (deathNotifications.count == 0) {
                it = mDeathNotifications.erase(it);
                continue;
            }

            // Create the argument for the callback.
            notifications.emplace_back(listener, deathNotifications.count);
            hidl_vec<IComponentListener::RenderedFrame>& renderedFrames =
                    notifications.back().renderedFrames;
            size_t i = 0;
            for (std::pair<const uint64_t, std::vector<size_t>>& p :
                    deathNotifications.indices) {
                uint64_t frameIndex = p.first;
                const std::vector<size_t> &bufferIndices = p.second;
                for (const size_t& bufferIndex : bufferIndices) {
                    IComponentListener::RenderedFrame &renderedFrame
                            = renderedFrames[i++];
                    renderedFrame.slotId = ~bufferIndex;
                    renderedFrame.bufferQueueId = frameIndex;
                    renderedFrame.timestampNs = timeNowNs;
                    ALOGV("InputBufferManager: "
                          "Sending death notification (listener @ %p, "
                          "frameIndex = %llu, bufferIndex = %zu)",
                          listener.get(),
                          static_cast<long long unsigned>(frameIndex),
                          bufferIndex);
                }
            }

            // Clear deathNotifications for this listener and set retry to true
            // so processNotifications will be called again. This will
            // guarantee that a listener with no pending notifications will
            // eventually be removed from mDeathNotifications after
            // kNotificationPeriodNs nanoseconds has passed.
            retry = true;
            deathNotifications.indices.clear();
            deathNotifications.count = 0;
            deathNotifications.lastSentNs = timeNowNs;
            ++it;
        }
    }

    // Call onFramesRendered outside the lock to avoid deadlock.
    for (const Notification& notification : notifications) {
        if (!notification.listener->onFramesRendered(
                notification.renderedFrames).isOk()) {
            // This may trigger if the client has died.
            ALOGD("InputBufferManager: onFramesRendered transaction failed "
                  "(listener @ %p)",
                  notification.listener.get());
        }
    }
    if (retry) {
        ALOGV("InputBufferManager: Pending death notifications"
              "will be sent in %lldns.",
              static_cast<long long>(*timeToRetryNs));
    }
    return retry;
}

void InputBufferManager::main() {
    ALOGV("InputBufferManager: Starting main thread");
    nsecs_t timeToRetryNs;
    while (true) {
        std::unique_lock<std::mutex> lock(mMutex);
        while (mDeathNotifications.empty()) {
            ALOGV("InputBufferManager: Waiting for buffer deaths");
            mOnBufferDestroyed.wait(lock);
        }
        lock.unlock();
        ALOGV("InputBufferManager: Sending buffer death notifications");
        while (processNotifications(&timeToRetryNs)) {
            std::this_thread::sleep_for(
                    std::chrono::nanoseconds(timeToRetryNs));
            ALOGV("InputBufferManager: Sending pending death notifications");
        }
        ALOGV("InputBufferManager: No pending death notifications");
    }
}

InputBufferManager::InputBufferManager()
      : mMainThread(&InputBufferManager::main, this) {
}

InputBufferManager& InputBufferManager::getInstance() {
    static InputBufferManager instance{};
    return instance;
}

}  // namespace utils
}  // namespace V1_0
}  // namespace c2
}  // namespace media
}  // namespace google
}  // namespace hardware