Ray Usage
Ray能方便使用多进程和分布式, 无需手动管理socket、线程池、消息队列等, 只需在函数或类前加上@ray.remote装饰器, 就能将其变成分布式任务或分布式对象。底层用了KV数据库自动管理这里metadata。
核心抽象
- task: function call
- actor: 分布式对象, AKA object的集合
- object: 分布式变量, AKA future/自动async
tasks
function call
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import ray
@ray.remote def func_call(): return 1
def main(): ray.init()
result_handle = [] for i in range(4): result_handle.append(func_call.remote()) print([ray.get(h) for h in result_handle])
if __name__ == "__main__": main()
|
相当于
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import ray
def func_call(): return 1
def main(): result_handle = [] for i in thread_pool(4): result_handle.append(func_call()) print([future.get(h) for h in result_handle]) ray.shutdown()
if __name__ == "__main__": main()
|
actors
分布式对象/状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @ray.remote class Counter: def __init__(self): self.count = 0
def increment(self): self.count += 1 return self.count
def main(): ray.init()
counter = Counter.remote() for i in range(4): print(ray.get(counter.increment.remote())) ray.shutdown()
if __name__ == "__main__": main()
|
objects
分布式中间结果的存储
所有的结果都是object store
- 获取结果需要ray.get()
- 不需要等待计算完成就可以传递(async)
- 可以手动创建object store
1 2 3 4 5 6
| handle1 = func_call.remote() handle2 = func_call.remote(handle1) custom_data = ray.put([1, 2, 3]) print(ray.get(handle2))
ray.wait([handle1, handle2])
|