// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include <stdint.h>
#include <stdio.h>
#include <string.h>

#include <vector>

#include "base/bind.h"
#include "base/files/file_path.h"
#include "base/files/file_util.h"
#include "base/files/scoped_file.h"
#include "base/files/scoped_temp_dir.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/message_loop/message_loop.h"
#include "base/test/test_io_thread.h"
#include "base/threading/platform_thread.h"  // For |Sleep()|.
#include "build/build_config.h"              // TODO(vtl): Remove this.
#include "mojo/common/test/test_utils.h"
#include "mojo/embedder/platform_channel_pair.h"
#include "mojo/embedder/platform_shared_buffer.h"
#include "mojo/embedder/scoped_platform_handle.h"
#include "mojo/embedder/simple_platform_support.h"
#include "mojo/system/channel.h"
#include "mojo/system/channel_endpoint.h"
#include "mojo/system/message_pipe.h"
#include "mojo/system/message_pipe_dispatcher.h"
#include "mojo/system/platform_handle_dispatcher.h"
#include "mojo/system/raw_channel.h"
#include "mojo/system/shared_buffer_dispatcher.h"
#include "mojo/system/test_utils.h"
#include "mojo/system/waiter.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace system {
namespace {

class RemoteMessagePipeTest : public testing::Test {
 public:
  RemoteMessagePipeTest() : io_thread_(base::TestIOThread::kAutoStart) {}
  virtual ~RemoteMessagePipeTest() {}

  virtual void SetUp() OVERRIDE {
    io_thread_.PostTaskAndWait(
        FROM_HERE,
        base::Bind(&RemoteMessagePipeTest::SetUpOnIOThread,
                   base::Unretained(this)));
  }

  virtual void TearDown() OVERRIDE {
    io_thread_.PostTaskAndWait(
        FROM_HERE,
        base::Bind(&RemoteMessagePipeTest::TearDownOnIOThread,
                   base::Unretained(this)));
  }

 protected:
  // This connects the two given |ChannelEndpoint|s.
  void ConnectChannelEndpoints(scoped_refptr<ChannelEndpoint> ep0,
                               scoped_refptr<ChannelEndpoint> ep1) {
    io_thread_.PostTaskAndWait(
        FROM_HERE,
        base::Bind(&RemoteMessagePipeTest::ConnectChannelEndpointsOnIOThread,
                   base::Unretained(this),
                   ep0,
                   ep1));
  }

  // This bootstraps |ep| on |channels_[channel_index]|. It assumes/requires
  // that this is the bootstrap case, i.e., that the endpoint IDs are both/will
  // both be |Channel::kBootstrapEndpointId|. This returns *without* waiting for
  // it to finish connecting.
  void BootstrapChannelEndpointNoWait(unsigned channel_index,
                                      scoped_refptr<ChannelEndpoint> ep) {
    io_thread_.PostTask(
        FROM_HERE,
        base::Bind(&RemoteMessagePipeTest::BootstrapChannelEndpointOnIOThread,
                   base::Unretained(this),
                   channel_index,
                   ep));
  }

  void RestoreInitialState() {
    io_thread_.PostTaskAndWait(
        FROM_HERE,
        base::Bind(&RemoteMessagePipeTest::RestoreInitialStateOnIOThread,
                   base::Unretained(this)));
  }

  embedder::PlatformSupport* platform_support() { return &platform_support_; }
  base::TestIOThread* io_thread() { return &io_thread_; }

 private:
  void SetUpOnIOThread() {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());

    embedder::PlatformChannelPair channel_pair;
    platform_handles_[0] = channel_pair.PassServerHandle();
    platform_handles_[1] = channel_pair.PassClientHandle();
  }

  void TearDownOnIOThread() {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());

    if (channels_[0].get()) {
      channels_[0]->Shutdown();
      channels_[0] = nullptr;
    }
    if (channels_[1].get()) {
      channels_[1]->Shutdown();
      channels_[1] = nullptr;
    }
  }

  void CreateAndInitChannel(unsigned channel_index) {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    CHECK(channel_index == 0 || channel_index == 1);
    CHECK(!channels_[channel_index].get());

    channels_[channel_index] = new Channel(&platform_support_);
    CHECK(channels_[channel_index]->Init(
        RawChannel::Create(platform_handles_[channel_index].Pass())));
  }

  void ConnectChannelEndpointsOnIOThread(scoped_refptr<ChannelEndpoint> ep0,
                                         scoped_refptr<ChannelEndpoint> ep1) {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());

    if (!channels_[0].get())
      CreateAndInitChannel(0);
    if (!channels_[1].get())
      CreateAndInitChannel(1);

    MessageInTransit::EndpointId local_id0 = channels_[0]->AttachEndpoint(ep0);
    MessageInTransit::EndpointId local_id1 = channels_[1]->AttachEndpoint(ep1);

    CHECK(channels_[0]->RunMessagePipeEndpoint(local_id0, local_id1));
    CHECK(channels_[1]->RunMessagePipeEndpoint(local_id1, local_id0));
  }

  void BootstrapChannelEndpointOnIOThread(unsigned channel_index,
                                          scoped_refptr<ChannelEndpoint> ep) {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());
    CHECK(channel_index == 0 || channel_index == 1);

    CreateAndInitChannel(channel_index);
    MessageInTransit::EndpointId endpoint_id =
        channels_[channel_index]->AttachEndpoint(ep);
    if (endpoint_id == MessageInTransit::kInvalidEndpointId)
      return;

    CHECK_EQ(endpoint_id, Channel::kBootstrapEndpointId);
    CHECK(channels_[channel_index]->RunMessagePipeEndpoint(
        Channel::kBootstrapEndpointId, Channel::kBootstrapEndpointId));
  }

  void RestoreInitialStateOnIOThread() {
    CHECK_EQ(base::MessageLoop::current(), io_thread()->message_loop());

    TearDownOnIOThread();
    SetUpOnIOThread();
  }

  embedder::SimplePlatformSupport platform_support_;
  base::TestIOThread io_thread_;
  embedder::ScopedPlatformHandle platform_handles_[2];
  scoped_refptr<Channel> channels_[2];

  DISALLOW_COPY_AND_ASSIGN(RemoteMessagePipeTest);
};

TEST_F(RemoteMessagePipeTest, Basic) {
  static const char kHello[] = "hello";
  static const char kWorld[] = "world!!!1!!!1!";
  char buffer[100] = {0};
  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
  // connected to MP 1, port 0, which will be attached to channel 1. This leaves
  // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  // Write in one direction: MP 0, port 0 -> ... -> MP 1, port 1.

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  // Write to MP 0, port 0.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp0->WriteMessage(0,
                              UserPointer<const void>(kHello),
                              sizeof(kHello),
                              nullptr,
                              MOJO_WRITE_MESSAGE_FLAG_NONE));

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 1, port 1.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
  EXPECT_STREQ(kHello, buffer);

  // Write in the other direction: MP 1, port 1 -> ... -> MP 0, port 0.

  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));

  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->WriteMessage(1,
                              UserPointer<const void>(kWorld),
                              sizeof(kWorld),
                              nullptr,
                              MOJO_WRITE_MESSAGE_FLAG_NONE));

  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(456u, context);
  hss = HandleSignalsState();
  mp0->RemoveWaiter(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            mp0->ReadMessage(0,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
  EXPECT_STREQ(kWorld, buffer);

  // Close MP 0, port 0.
  mp0->Close(0);

  // Try to wait for MP 1, port 1 to become readable. This will eventually fail
  // when it realizes that MP 0, port 0 has been closed. (It may also fail
  // immediately.)
  waiter.Init();
  hss = HandleSignalsState();
  MojoResult result =
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, &hss);
  if (result == MOJO_RESULT_OK) {
    EXPECT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
              waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
    EXPECT_EQ(789u, context);
    hss = HandleSignalsState();
    mp1->RemoveWaiter(1, &waiter, &hss);
  }
  EXPECT_EQ(0u, hss.satisfied_signals);
  EXPECT_EQ(0u, hss.satisfiable_signals);

  // And MP 1, port 1.
  mp1->Close(1);
}

TEST_F(RemoteMessagePipeTest, Multiplex) {
  static const char kHello[] = "hello";
  static const char kWorld[] = "world!!!1!!!1!";
  char buffer[100] = {0};
  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  // Connect message pipes as in the |Basic| test.

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  // Now put another message pipe on the channel.

  scoped_refptr<ChannelEndpoint> ep2;
  scoped_refptr<MessagePipe> mp2(MessagePipe::CreateLocalProxy(&ep2));
  scoped_refptr<ChannelEndpoint> ep3;
  scoped_refptr<MessagePipe> mp3(MessagePipe::CreateProxyLocal(&ep3));
  ConnectChannelEndpoints(ep2, ep3);

  // Write: MP 2, port 0 -> MP 3, port 1.

  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp3->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));

  EXPECT_EQ(MOJO_RESULT_OK,
            mp2->WriteMessage(0,
                              UserPointer<const void>(kHello),
                              sizeof(kHello),
                              nullptr,
                              MOJO_WRITE_MESSAGE_FLAG_NONE));

  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(789u, context);
  hss = HandleSignalsState();
  mp3->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Make sure there's nothing on MP 0, port 0 or MP 1, port 1 or MP 2, port 0.
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp0->ReadMessage(0,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp1->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp2->ReadMessage(0,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));

  // Read from MP 3, port 1.
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            mp3->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
  EXPECT_STREQ(kHello, buffer);

  // Write: MP 0, port 0 -> MP 1, port 1 again.

  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  EXPECT_EQ(MOJO_RESULT_OK,
            mp0->WriteMessage(0,
                              UserPointer<const void>(kWorld),
                              sizeof(kWorld),
                              nullptr,
                              MOJO_WRITE_MESSAGE_FLAG_NONE));

  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Make sure there's nothing on the other ports.
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp0->ReadMessage(0,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp2->ReadMessage(0,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_SHOULD_WAIT,
            mp3->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));

  buffer_size = static_cast<uint32_t>(sizeof(buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(buffer_size));
  EXPECT_STREQ(kWorld, buffer);

  mp0->Close(0);
  mp1->Close(1);
  mp2->Close(0);
  mp3->Close(1);
}

TEST_F(RemoteMessagePipeTest, CloseBeforeConnect) {
  static const char kHello[] = "hello";
  char buffer[100] = {0};
  uint32_t buffer_size = static_cast<uint32_t>(sizeof(buffer));
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  // Connect message pipes. MP 0, port 1 will be attached to channel 0 and
  // connected to MP 1, port 0, which will be attached to channel 1. This leaves
  // MP 0, port 0 and MP 1, port 1 as the "user-facing" endpoints.

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));

  // Write to MP 0, port 0.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp0->WriteMessage(0,
                              UserPointer<const void>(kHello),
                              sizeof(kHello),
                              nullptr,
                              MOJO_WRITE_MESSAGE_FLAG_NONE));

  BootstrapChannelEndpointNoWait(0, ep0);

  // Close MP 0, port 0 before channel 1 is even connected.
  mp0->Close(0);

  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  BootstrapChannelEndpointNoWait(1, ep1);

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  // Note: MP 1, port 1 should definitely should be readable, but it may or may
  // not appear as writable (there's a race, and it may not have noticed that
  // the other side was closed yet -- e.g., inserting a sleep here would make it
  // much more likely to notice that it's no longer writable).
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_TRUE((hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE));
  EXPECT_TRUE((hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE));

  // Read from MP 1, port 1.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(buffer),
                             MakeUserPointer(&buffer_size),
                             nullptr,
                             nullptr,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(buffer_size));
  EXPECT_STREQ(kHello, buffer);

  // And MP 1, port 1.
  mp1->Close(1);
}

TEST_F(RemoteMessagePipeTest, HandlePassing) {
  static const char kHello[] = "hello";
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  // We'll try to pass this dispatcher.
  scoped_refptr<MessagePipeDispatcher> dispatcher(
      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
  scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
  dispatcher->Init(local_mp, 0);

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  // Write to MP 0, port 0.
  {
    DispatcherTransport transport(
        test::DispatcherTryStartTransport(dispatcher.get()));
    EXPECT_TRUE(transport.is_valid());

    std::vector<DispatcherTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK,
              mp0->WriteMessage(0,
                                UserPointer<const void>(kHello),
                                sizeof(kHello),
                                &transports,
                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    // |dispatcher| is destroyed.
    EXPECT_TRUE(dispatcher->HasOneRef());
    dispatcher = nullptr;
  }

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 1, port 1.
  char read_buffer[100] = {0};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  DispatcherVector read_dispatchers;
  uint32_t read_num_dispatchers = 10;  // Maximum to get.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(read_buffer),
                             MakeUserPointer(&read_buffer_size),
                             &read_dispatchers,
                             &read_num_dispatchers,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);
  EXPECT_EQ(1u, read_dispatchers.size());
  EXPECT_EQ(1u, read_num_dispatchers);
  ASSERT_TRUE(read_dispatchers[0].get());
  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());

  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());

  // Add the waiter now, before it becomes readable to avoid a race.
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            dispatcher->AddWaiter(
                &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));

  // Write to "local_mp", port 1.
  EXPECT_EQ(MOJO_RESULT_OK,
            local_mp->WriteMessage(1,
                                   UserPointer<const void>(kHello),
                                   sizeof(kHello),
                                   nullptr,
                                   MOJO_WRITE_MESSAGE_FLAG_NONE));

  // TODO(vtl): FIXME -- We (racily) crash if I close |dispatcher| immediately
  // here. (We don't crash if I sleep and then close.)

  // Wait for the dispatcher to become readable.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(456u, context);
  hss = HandleSignalsState();
  dispatcher->RemoveWaiter(&waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from the dispatcher.
  memset(read_buffer, 0, sizeof(read_buffer));
  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            dispatcher->ReadMessage(UserPointer<void>(read_buffer),
                                    MakeUserPointer(&read_buffer_size),
                                    0,
                                    nullptr,
                                    MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);

  // Prepare to wait on "local_mp", port 1.
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            local_mp->AddWaiter(
                1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));

  // Write to the dispatcher.
  EXPECT_EQ(MOJO_RESULT_OK,
            dispatcher->WriteMessage(UserPointer<const void>(kHello),
                                     sizeof(kHello),
                                     nullptr,
                                     MOJO_WRITE_MESSAGE_FLAG_NONE));

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(789u, context);
  hss = HandleSignalsState();
  local_mp->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from "local_mp", port 1.
  memset(read_buffer, 0, sizeof(read_buffer));
  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            local_mp->ReadMessage(1,
                                  UserPointer<void>(read_buffer),
                                  MakeUserPointer(&read_buffer_size),
                                  nullptr,
                                  nullptr,
                                  MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);

  // TODO(vtl): Also test that messages queued up before the handle was sent are
  // delivered properly.

  // Close everything that belongs to us.
  mp0->Close(0);
  mp1->Close(1);
  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
  // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
  local_mp->Close(1);
}

#if defined(OS_POSIX)
#define MAYBE_SharedBufferPassing SharedBufferPassing
#else
// Not yet implemented (on Windows).
#define MAYBE_SharedBufferPassing DISABLED_SharedBufferPassing
#endif
TEST_F(RemoteMessagePipeTest, MAYBE_SharedBufferPassing) {
  static const char kHello[] = "hello";
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  // We'll try to pass this dispatcher.
  scoped_refptr<SharedBufferDispatcher> dispatcher;
  EXPECT_EQ(MOJO_RESULT_OK,
            SharedBufferDispatcher::Create(
                platform_support(),
                SharedBufferDispatcher::kDefaultCreateOptions,
                100,
                &dispatcher));
  ASSERT_TRUE(dispatcher.get());

  // Make a mapping.
  scoped_ptr<embedder::PlatformSharedBufferMapping> mapping0;
  EXPECT_EQ(
      MOJO_RESULT_OK,
      dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping0));
  ASSERT_TRUE(mapping0);
  ASSERT_TRUE(mapping0->GetBase());
  ASSERT_EQ(100u, mapping0->GetLength());
  static_cast<char*>(mapping0->GetBase())[0] = 'A';
  static_cast<char*>(mapping0->GetBase())[50] = 'B';
  static_cast<char*>(mapping0->GetBase())[99] = 'C';

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  // Write to MP 0, port 0.
  {
    DispatcherTransport transport(
        test::DispatcherTryStartTransport(dispatcher.get()));
    EXPECT_TRUE(transport.is_valid());

    std::vector<DispatcherTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK,
              mp0->WriteMessage(0,
                                UserPointer<const void>(kHello),
                                sizeof(kHello),
                                &transports,
                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    // |dispatcher| is destroyed.
    EXPECT_TRUE(dispatcher->HasOneRef());
    dispatcher = nullptr;
  }

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 1, port 1.
  char read_buffer[100] = {0};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  DispatcherVector read_dispatchers;
  uint32_t read_num_dispatchers = 10;  // Maximum to get.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(read_buffer),
                             MakeUserPointer(&read_buffer_size),
                             &read_dispatchers,
                             &read_num_dispatchers,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);
  EXPECT_EQ(1u, read_dispatchers.size());
  EXPECT_EQ(1u, read_num_dispatchers);
  ASSERT_TRUE(read_dispatchers[0].get());
  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());

  EXPECT_EQ(Dispatcher::kTypeSharedBuffer, read_dispatchers[0]->GetType());
  dispatcher = static_cast<SharedBufferDispatcher*>(read_dispatchers[0].get());

  // Make another mapping.
  scoped_ptr<embedder::PlatformSharedBufferMapping> mapping1;
  EXPECT_EQ(
      MOJO_RESULT_OK,
      dispatcher->MapBuffer(0, 100, MOJO_MAP_BUFFER_FLAG_NONE, &mapping1));
  ASSERT_TRUE(mapping1);
  ASSERT_TRUE(mapping1->GetBase());
  ASSERT_EQ(100u, mapping1->GetLength());
  EXPECT_NE(mapping1->GetBase(), mapping0->GetBase());
  EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);
  EXPECT_EQ('B', static_cast<char*>(mapping1->GetBase())[50]);
  EXPECT_EQ('C', static_cast<char*>(mapping1->GetBase())[99]);

  // Write stuff either way.
  static_cast<char*>(mapping1->GetBase())[1] = 'x';
  EXPECT_EQ('x', static_cast<char*>(mapping0->GetBase())[1]);
  static_cast<char*>(mapping0->GetBase())[2] = 'y';
  EXPECT_EQ('y', static_cast<char*>(mapping1->GetBase())[2]);

  // Kill the first mapping; the second should still be valid.
  mapping0.reset();
  EXPECT_EQ('A', static_cast<char*>(mapping1->GetBase())[0]);

  // Close everything that belongs to us.
  mp0->Close(0);
  mp1->Close(1);
  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());

  // The second mapping should still be good.
  EXPECT_EQ('x', static_cast<char*>(mapping1->GetBase())[1]);
}

#if defined(OS_POSIX)
#define MAYBE_PlatformHandlePassing PlatformHandlePassing
#else
// Not yet implemented (on Windows).
#define MAYBE_PlatformHandlePassing DISABLED_PlatformHandlePassing
#endif
TEST_F(RemoteMessagePipeTest, MAYBE_PlatformHandlePassing) {
  base::ScopedTempDir temp_dir;
  ASSERT_TRUE(temp_dir.CreateUniqueTempDir());

  static const char kHello[] = "hello";
  static const char kWorld[] = "world";
  Waiter waiter;
  uint32_t context = 0;
  HandleSignalsState hss;

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  base::FilePath unused;
  base::ScopedFILE fp(
      CreateAndOpenTemporaryFileInDir(temp_dir.path(), &unused));
  EXPECT_EQ(sizeof(kHello), fwrite(kHello, 1, sizeof(kHello), fp.get()));
  // We'll try to pass this dispatcher, which will cause a |PlatformHandle| to
  // be passed.
  scoped_refptr<PlatformHandleDispatcher> dispatcher(
      new PlatformHandleDispatcher(
          mojo::test::PlatformHandleFromFILE(fp.Pass())));

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  // Write to MP 0, port 0.
  {
    DispatcherTransport transport(
        test::DispatcherTryStartTransport(dispatcher.get()));
    EXPECT_TRUE(transport.is_valid());

    std::vector<DispatcherTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK,
              mp0->WriteMessage(0,
                                UserPointer<const void>(kWorld),
                                sizeof(kWorld),
                                &transports,
                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    // |dispatcher| is destroyed.
    EXPECT_TRUE(dispatcher->HasOneRef());
    dispatcher = nullptr;
  }

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 1, port 1.
  char read_buffer[100] = {0};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  DispatcherVector read_dispatchers;
  uint32_t read_num_dispatchers = 10;  // Maximum to get.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(read_buffer),
                             MakeUserPointer(&read_buffer_size),
                             &read_dispatchers,
                             &read_num_dispatchers,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kWorld, read_buffer);
  EXPECT_EQ(1u, read_dispatchers.size());
  EXPECT_EQ(1u, read_num_dispatchers);
  ASSERT_TRUE(read_dispatchers[0].get());
  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());

  EXPECT_EQ(Dispatcher::kTypePlatformHandle, read_dispatchers[0]->GetType());
  dispatcher =
      static_cast<PlatformHandleDispatcher*>(read_dispatchers[0].get());

  embedder::ScopedPlatformHandle h = dispatcher->PassPlatformHandle().Pass();
  EXPECT_TRUE(h.is_valid());

  fp = mojo::test::FILEFromPlatformHandle(h.Pass(), "rb").Pass();
  EXPECT_FALSE(h.is_valid());
  EXPECT_TRUE(fp);

  rewind(fp.get());
  memset(read_buffer, 0, sizeof(read_buffer));
  EXPECT_EQ(sizeof(kHello),
            fread(read_buffer, 1, sizeof(read_buffer), fp.get()));
  EXPECT_STREQ(kHello, read_buffer);

  // Close everything that belongs to us.
  mp0->Close(0);
  mp1->Close(1);
  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
}

// Test racing closes (on each end).
// Note: A flaky failure would almost certainly indicate a problem in the code
// itself (not in the test). Also, any logged warnings/errors would also
// probably be indicative of bugs.
TEST_F(RemoteMessagePipeTest, RacingClosesStress) {
  base::TimeDelta delay = base::TimeDelta::FromMilliseconds(5);

  for (unsigned i = 0; i < 256; i++) {
    DVLOG(2) << "---------------------------------------- " << i;
    scoped_refptr<ChannelEndpoint> ep0;
    scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
    BootstrapChannelEndpointNoWait(0, ep0);

    scoped_refptr<ChannelEndpoint> ep1;
    scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
    BootstrapChannelEndpointNoWait(1, ep1);

    if (i & 1u) {
      io_thread()->task_runner()->PostTask(
          FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    }
    if (i & 2u)
      base::PlatformThread::Sleep(delay);

    mp0->Close(0);

    if (i & 4u) {
      io_thread()->task_runner()->PostTask(
          FROM_HERE, base::Bind(&base::PlatformThread::Sleep, delay));
    }
    if (i & 8u)
      base::PlatformThread::Sleep(delay);

    mp1->Close(1);

    RestoreInitialState();
  }
}

// Tests passing an end of a message pipe over a remote message pipe, and then
// passing that end back.
// TODO(vtl): Also test passing a message pipe across two remote message pipes.
TEST_F(RemoteMessagePipeTest, PassMessagePipeHandleAcrossAndBack) {
  static const char kHello[] = "hello";
  static const char kWorld[] = "world";
  Waiter waiter;
  HandleSignalsState hss;
  uint32_t context = 0;

  scoped_refptr<ChannelEndpoint> ep0;
  scoped_refptr<MessagePipe> mp0(MessagePipe::CreateLocalProxy(&ep0));
  scoped_refptr<ChannelEndpoint> ep1;
  scoped_refptr<MessagePipe> mp1(MessagePipe::CreateProxyLocal(&ep1));
  ConnectChannelEndpoints(ep0, ep1);

  // We'll try to pass this dispatcher.
  scoped_refptr<MessagePipeDispatcher> dispatcher(
      new MessagePipeDispatcher(MessagePipeDispatcher::kDefaultCreateOptions));
  scoped_refptr<MessagePipe> local_mp(MessagePipe::CreateLocalLocal());
  dispatcher->Init(local_mp, 0);

  // Prepare to wait on MP 1, port 1. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp1->AddWaiter(1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 123, nullptr));

  // Write to MP 0, port 0.
  {
    DispatcherTransport transport(
        test::DispatcherTryStartTransport(dispatcher.get()));
    EXPECT_TRUE(transport.is_valid());

    std::vector<DispatcherTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK,
              mp0->WriteMessage(0,
                                UserPointer<const void>(kHello),
                                sizeof(kHello),
                                &transports,
                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    // |dispatcher| is destroyed.
    EXPECT_TRUE(dispatcher->HasOneRef());
    dispatcher = nullptr;
  }

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(123u, context);
  hss = HandleSignalsState();
  mp1->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 1, port 1.
  char read_buffer[100] = {0};
  uint32_t read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  DispatcherVector read_dispatchers;
  uint32_t read_num_dispatchers = 10;  // Maximum to get.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp1->ReadMessage(1,
                             UserPointer<void>(read_buffer),
                             MakeUserPointer(&read_buffer_size),
                             &read_dispatchers,
                             &read_num_dispatchers,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);
  EXPECT_EQ(1u, read_dispatchers.size());
  EXPECT_EQ(1u, read_num_dispatchers);
  ASSERT_TRUE(read_dispatchers[0].get());
  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());

  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
  read_dispatchers.clear();

  // Now pass it back.

  // Prepare to wait on MP 0, port 0. (Add the waiter now. Otherwise, if we do
  // it later, it might already be readable.)
  waiter.Init();
  ASSERT_EQ(
      MOJO_RESULT_OK,
      mp0->AddWaiter(0, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 456, nullptr));

  // Write to MP 1, port 1.
  {
    DispatcherTransport transport(
        test::DispatcherTryStartTransport(dispatcher.get()));
    EXPECT_TRUE(transport.is_valid());

    std::vector<DispatcherTransport> transports;
    transports.push_back(transport);
    EXPECT_EQ(MOJO_RESULT_OK,
              mp1->WriteMessage(1,
                                UserPointer<const void>(kWorld),
                                sizeof(kWorld),
                                &transports,
                                MOJO_WRITE_MESSAGE_FLAG_NONE));
    transport.End();

    // |dispatcher| should have been closed. This is |DCHECK()|ed when the
    // |dispatcher| is destroyed.
    EXPECT_TRUE(dispatcher->HasOneRef());
    dispatcher = nullptr;
  }

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(456u, context);
  hss = HandleSignalsState();
  mp0->RemoveWaiter(0, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from MP 0, port 0.
  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  read_num_dispatchers = 10;  // Maximum to get.
  EXPECT_EQ(MOJO_RESULT_OK,
            mp0->ReadMessage(0,
                             UserPointer<void>(read_buffer),
                             MakeUserPointer(&read_buffer_size),
                             &read_dispatchers,
                             &read_num_dispatchers,
                             MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kWorld), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kWorld, read_buffer);
  EXPECT_EQ(1u, read_dispatchers.size());
  EXPECT_EQ(1u, read_num_dispatchers);
  ASSERT_TRUE(read_dispatchers[0].get());
  EXPECT_TRUE(read_dispatchers[0]->HasOneRef());

  EXPECT_EQ(Dispatcher::kTypeMessagePipe, read_dispatchers[0]->GetType());
  dispatcher = static_cast<MessagePipeDispatcher*>(read_dispatchers[0].get());
  read_dispatchers.clear();

  // Add the waiter now, before it becomes readable to avoid a race.
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            dispatcher->AddWaiter(
                &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));

  // Write to "local_mp", port 1.
  EXPECT_EQ(MOJO_RESULT_OK,
            local_mp->WriteMessage(1,
                                   UserPointer<const void>(kHello),
                                   sizeof(kHello),
                                   nullptr,
                                   MOJO_WRITE_MESSAGE_FLAG_NONE));

  // Wait for the dispatcher to become readable.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(789u, context);
  hss = HandleSignalsState();
  dispatcher->RemoveWaiter(&waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from the dispatcher.
  memset(read_buffer, 0, sizeof(read_buffer));
  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            dispatcher->ReadMessage(UserPointer<void>(read_buffer),
                                    MakeUserPointer(&read_buffer_size),
                                    0,
                                    nullptr,
                                    MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);

  // Prepare to wait on "local_mp", port 1.
  waiter.Init();
  ASSERT_EQ(MOJO_RESULT_OK,
            local_mp->AddWaiter(
                1, &waiter, MOJO_HANDLE_SIGNAL_READABLE, 789, nullptr));

  // Write to the dispatcher.
  EXPECT_EQ(MOJO_RESULT_OK,
            dispatcher->WriteMessage(UserPointer<const void>(kHello),
                                     sizeof(kHello),
                                     nullptr,
                                     MOJO_WRITE_MESSAGE_FLAG_NONE));

  // Wait.
  EXPECT_EQ(MOJO_RESULT_OK, waiter.Wait(MOJO_DEADLINE_INDEFINITE, &context));
  EXPECT_EQ(789u, context);
  hss = HandleSignalsState();
  local_mp->RemoveWaiter(1, &waiter, &hss);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfied_signals);
  EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE,
            hss.satisfiable_signals);

  // Read from "local_mp", port 1.
  memset(read_buffer, 0, sizeof(read_buffer));
  read_buffer_size = static_cast<uint32_t>(sizeof(read_buffer));
  EXPECT_EQ(MOJO_RESULT_OK,
            local_mp->ReadMessage(1,
                                  UserPointer<void>(read_buffer),
                                  MakeUserPointer(&read_buffer_size),
                                  nullptr,
                                  nullptr,
                                  MOJO_READ_MESSAGE_FLAG_NONE));
  EXPECT_EQ(sizeof(kHello), static_cast<size_t>(read_buffer_size));
  EXPECT_STREQ(kHello, read_buffer);

  // TODO(vtl): Also test the cases where messages are written and read (at
  // various points) on the message pipe being passed around.

  // Close everything that belongs to us.
  mp0->Close(0);
  mp1->Close(1);
  EXPECT_EQ(MOJO_RESULT_OK, dispatcher->Close());
  // Note that |local_mp|'s port 0 belong to |dispatcher|, which was closed.
  local_mp->Close(1);
}

}  // namespace
}  // namespace system
}  // namespace mojo