#pragma once #include #include #include namespace c10::xpu { /* * Note [Stream Management] * * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel * can execute. Currently, there are several pools per device to manage SYCL * queue, and a device's pool is lazily created. * * There are two pools per device. The first pool contains "normal priority" * queues. The second pool is the "high priority" queues. There are 32 queues in * per pool per device, and when a queue is requested one of these queues is * returned round-robin. That is, the first queue requested is at index 0, the * second at index 1... to index 31, then index 0 again. * * This means that if 33 queues are requested, the first and last queues * requested are actually the same queue (under the covers) and kernels enqueued * on them cannot run concurrently. * * It is safe to enqueue a kernel on the same queue from two different * threads as the SYCL specification described. */ static constexpr int max_compile_time_stream_priorities = 2; /* * This serves as a wrapper around c10::Stream and acts as a representation for * a SYCL queue, which allows asynchronous execution of XPU tasks. */ class C10_XPU_API XPUStream { public: enum Unchecked { UNCHECKED }; /// Construct a XPUStream from a Stream. This construction is checked, and /// will raise an error if the Stream is not, in fact, a XPU stream. explicit XPUStream(Stream stream) : stream_(stream) { TORCH_CHECK(stream_.device_type() == DeviceType::XPU); } /// Construct a XPUStream from a Stream with no error checking. explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {} bool operator==(const XPUStream& other) const noexcept { return unwrap() == other.unwrap(); } bool operator!=(const XPUStream& other) const noexcept { return unwrap() != other.unwrap(); } /// Implicit conversion to sycl::queue&. operator sycl::queue&() const { return queue(); } /// Implicit conversion to Stream (a.k.a., forget that the stream is a /// XPU stream). operator Stream() const { return unwrap(); } /// Get the XPU device type that this stream is associated with. DeviceType device_type() const { return DeviceType::XPU; } /// Get the XPU device index that this stream is associated with. DeviceIndex device_index() const { return stream_.device_index(); } /// Get the full Device that this stream is associated with. The Device is /// guaranteed to be a XPU device. Device device() const { return Device(DeviceType::XPU, device_index()); } /// Return the stream ID corresponding to this particular stream. StreamId is /// a int64_t representation generated by its type and index. StreamId id() const { return stream_.id(); } /// Return true if all enqueued tasks in this stream have been completed, /// otherwise return false. bool query() const { return queue().ext_oneapi_empty(); } /// Performs a blocking wait for the completion of all enqueued tasks in this /// stream. void synchronize() const { queue().wait_and_throw(); const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); if (C10_UNLIKELY(interp)) { (*interp)->trace_gpu_stream_synchronization( c10::kXPU, reinterpret_cast(&queue())); } } /// Return the priority that this stream is associated with. Lower numbers /// represent higher priority. int priority() const; /// Explicit conversion to sycl::queue&. sycl::queue& queue() const; /// Explicit conversion to Stream. Stream unwrap() const { return stream_; } /// Reversibly pack a XPUStream into a struct representation. The XPUStream /// can be unpacked using unpack3(). struct c10::StreamData3 pack3() const { return stream_.pack3(); } /// Unpack a XPUStream from the 3 fields generated by pack3(). static XPUStream unpack3( StreamId stream_id, DeviceIndex device_index, DeviceType device_type) { return XPUStream(Stream::unpack3(stream_id, device_index, device_type)); } /// Return the range of priority **supported by PyTorch**. static std::tuple priority_range() { return std::make_tuple(0, -max_compile_time_stream_priorities + 1); } private: Stream stream_; }; /** * Get a stream from the pool in a round-robin fashion. * * You can request a stream from the highest priority pool by setting * isHighPriority to true for a specific device. */ C10_XPU_API XPUStream getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1); /** * Get a stream from the pool in a round-robin fashion. * * You can request a stream by setting a priority value for a specific device. * The priority number lower, the priority higher. */ C10_XPU_API XPUStream getStreamFromPool(const int priority, DeviceIndex device = -1); /** * Get the current XPU stream, for the passed XPU device, or for the current * device if no device index is passed. */ C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1); /** * Set the current stream on the device of the passed in stream to be the passed * in stream. */ C10_XPU_API void setCurrentXPUStream(XPUStream stream); C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s); /** * Block all reserved SYCL queues in the stream pools on the device, and wait * for their synchronizations. */ C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1); } // namespace c10::xpu namespace std { template <> struct hash { size_t operator()(c10::xpu::XPUStream s) const noexcept { return std::hash{}(s.unwrap()); } }; } // namespace std