Device Communication#

The oneCCL Communicator defines a group of processes that communicate with each other.

The following example demonstrates the main concepts of communication on device memory buffers.

Consider a simple oneCCL allreduce example for GPU.

Example

  1. Create oneCCL communicator objects with user-supplied size, rank <-> SYCL device mapping, SYCL context and key-value store:

    auto ccl_context = ccl::create_context(sycl_context);
    auto ccl_device = ccl::create_device(sycl_device);
    
    auto comms = ccl::create_communicators(
       size,
       vector_class<pair_class<size_t, device>>{ { rank, ccl_device } },
       ccl_context,
       kvs);
    
  2. Create oneCCL stream object from user-supplied sycl::queue object:

    auto stream = ccl::create_stream(sycl_queue);
    
  3. Initialize send_buf by providing the input content. For example, you can create and initialize the send_buf parameter as follows:

    const size_t elem_count = <N>;
    
    /* using SYCL buffer and accessor */
    auto send_buf_host_acc = send_buf.get_host_access(h, sycl::write_only);
    for (idx = 0; idx < elem_count; idx++) {
       send_buf_host_acc[idx] = rank;
    }
    
    /* or using SYCL USM */
    for (idx = 0; idx < elem_count; idx++) {
       send_buf[idx] = rank;
    }
    
  1. For demonstration purposes, modify the send_buf on the GPU side:

    /* using SYCL buffer and accessor */
    sycl_queue.submit([&](sycl::handler& h) {
       auto send_buf_dev_acc = send_buf.get_access<mode::write>(h);
       h.parallel_for(range<1>{elem_count}, [=](item<1> idx) {
             send_buf_dev_acc[idx] += 1;
       });
    });
    
    /* or using SYCL USM */
    for (idx = 0; idx < elem_count; idx++) {
       send_buf[idx]+ = 1;
    }
    

    The allreduce invocation performs reduction of values from all processes and then distributes the result to all processes. In this case, the result is an array with elem_count elements, where all elements are equal to the sum of arithmetical progression:

    \[p \cdot (p + 1) / 2\]
  2. Execute the allreduce operation:

    std::vector<event> events;
    for (auto& comm : comms) {
       events.push_back(ccl::allreduce(send_buf,
                                        recv_buf,
                                        elem_count,
                                        reduction::sum,
                                        comm,
                                        streams[comm.rank()]));
    }
    
    for (auto& e : events) {
       e.wait();
    }
    
  3. Verify that the allreduce operation is correct:

    /* using SYCL buffer and accessor */
    
    auto comm_size = comm.size();
    auto expected = comm_size * (comm_size + 1) / 2;
    
    sycl_queue.submit([&](handler& h) {
       auto recv_buf_dev_acc = recv_buf.get_access<mode::write>(h);
       h.parallel_for(range<1>{elem_count}, [=](item<1> idx) {
             if (recv_buf_dev_acc[idx] != expected) {
                recv_buf_dev_acc[idx] = -1;
             }
       });
    });
    
    ...
    
    auto recv_buf_host_acc = recv_buf.get_host_access(sycl::read_only);
    for (idx = 0; idx < elem_count; idx++) {
       if (recv_buf_host_acc[idx] == -1) {
             std::count << "unexpected value at index " << idx << std::endl;
             break;
       }
    }
    
    /* or using SYCL USM */
    
    auto comm_size = comm.size();
    auto expected = comm_size * (comm_size + 1) / 2;
    
    for (idx = 0; idx < elem_count; idx++) {
       if (recv_buf[idx] != expected) {
             std::count << "unexpected value at index " << idx << std::endl;
             break;
       }
    }
    

If you encounter an error, make sure the oneCCL environment is configured correctly.

Additional Resources#