- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
ray是开源分布式计算框架,为并行处理提供计算层,用于扩展AI与Python应用程序,是ML工作负载统一工具包 。
ML应用程序库集 。
通用分布式计算库 。
- Task -- Ray允许任意Python函数在单独的Python worker上运行,这些异步Python函数称为任务
- Actor -- 从函数扩展到类,是一个有状态的工作者,当一个Actor被创建,一个新的worker被创建,并且actor的方法被安排到那个特定的worker上,并且可以访问和修改那个worker的状态
- Object -- Task与Actor在对象上创建与计算,被称为远程对象,被存储在ray的分布式共享内存对象存储上,通过对象引用来引用远程对象。集群中每个节点都有一个对象存储,远程对象存储在何处(一个或多个节点上)与远程对象引用的持有者无关
- Placement Groups -- 允许用户跨多个节点原子性的保留资源组,以供后续Task与Actor使用
- Environment Dependencies -- 当Ray在远程机器上执行Task或Actor时,它们的依赖环境项(Python包、本地文件、环境变量)必须可供代码运行。解决环境依赖的方式有两种,一种是在集群启动前准备好对集群的依赖,另一种是在ray的运行时环境动态安装
一组连接到公共 Ray 头节点的工作节点,通过 kubeRay operator管理运行在k8s上的ray集群 。
ray版本:2.3.0 。
1主3从集群 。
# 配置文件 -- 一主两从(默认单主),文件名:k8s-3nodes.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
创建k8s集群 。
kind create cluster --config k8s-3nodes.yaml
# helm方式安装
# 添加Charts仓库
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
# 安装default名称空间
# 安装 kubeRay operator
# 下载离线的chart包: helm pull kuberay/kuberay-operator --version 0.5.0
# 本地安装: helm install kuberay-operator
helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0
# 创建ray示例集群,若通过sdk管理则跳过
# 下载离线的ray集群自定义资源:helm pull kuberay/ray-cluster --version 0.5.0
helm install raycluster kuberay/ray-cluster --version 0.5.0
# 获取ray集群对应的CR
kubectl get raycluster
# 查询pod的状态
kubectl get pods
# 转发svc 8265端口到本地8265端口
kubectl port-forward --address 0.0.0.0 svc/raycluster-kuberay-head-svc 8265:8265
# 登录ray head节点,并执行一个job
kubectl exec -it ${RAYCLUSTER_HEAD_POD} -- bash
python -c "import ray; ray.init(); print(ray.cluster_resources())" # (in Ray head Pod)
# 删除ray集群
helm uninstall raycluster
# 删除kubeRay
helm uninstall kuberay-operator
# 查询helm管理的资源
helm ls --all-namespaces
前置要求:
- 安装 KubeRay
- 安装 k8s sdk: pip install kubernetes
- 将python_client拷贝到PYTHONPATH路径下或者直接安装python_client, 该库路径为: https://github.com/ray-project/kuberay/tree/master/clients/python-client/python_client
from python_client import kuberay_cluster_api
from python_client.utils import kuberay_cluster_utils, kuberay_cluster_builder
def main():
# ray集群管理的api 获取集群列表、创建集群、更新集群、删除集群
kuberay_api = kuberay_cluster_api.RayClusterApi()
# CR 构建器,构建ray集群对应的字典格式的CR
cr_builder = kuberay_cluster_builder.ClusterBuilder()
# CR资源对象操作工具,更新cr资源
cluster_utils = kuberay_cluster_utils.ClusterUtils()
# 构建集群的CR,是一个字典对象,可以修改、删除、添加额外的属性
# 可以指定包含特定环境依赖的人ray镜像
cluster = (
cr_builder.build_meta(name="new-cluster1", labels={"demo-cluster": "yes"}) # 输入ray群名称、名称空间、资源标签、ray版本信息
.build_head(cpu_requests="0", memory_requests="0") # ray集群head信息: ray镜像名称、对应service类型、cpu memory的requests与limits、ray head启动参数
.build_worker(group_name="workers", cpu_requests="0", memory_requests="0") # ray集群worker信息: worker组名称、 ray镜像名称、ray启动命令、cpu memory的requests与limits、默认副本个数、最大与最小副本个数
.get_cluster()
)
# 检查CR是否构建成功
if not cr_builder.succeeded:
print("error building the cluster, aborting...")
return
# 创建ray集群
kuberay_api.create_ray_cluster(body=cluster)
# 更新ray集群CR中的worker副本集合
cluster_to_patch, succeeded = cluster_utils.update_worker_group_replicas(
cluster, group_name="workers", max_replicas=4, min_replicas=1, replicas=2
)
if succeeded:
# 更新ray集群
kuberay_api.patch_ray_cluster(
name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
)
# 在原来的集群的CR中的工作组添加新的工作组
cluster_to_patch, succeeded = cluster_utils.duplicate_worker_group(
cluster, group_name="workers", new_group_name="duplicate-workers"
)
if succeeded:
kuberay_api.patch_ray_cluster(
name=cluster_to_patch["metadata"]["name"], ray_patch=cluster_to_patch
)
# 列出所有创建的集群
kube_ray_list = kuberay_api.list_ray_clusters(k8s_namespace="default", label_selector='demo-cluster=yes')
if "items" in kube_ray_list:
for cluster in kube_ray_list["items"]:
print(cluster["metadata"]["name"], cluster["metadata"]["namespace"])
# 删除集群
if "items" in kube_ray_list:
for cluster in kube_ray_list["items"]:
print("deleting raycluster = {}".format(cluster["metadata"]["name"]))
# 通过指定名称删除ray集群
kuberay_api.delete_ray_cluster(
name=cluster["metadata"]["name"],
k8s_namespace=cluster["metadata"]["namespace"],
)
if __name__ == "__main__":
main()
前置: pip install -U "ray[default]" 。
# 文件名称: test_job.py
# python 标准库
import json
import ray
import sys
# 已经在ray节点安装的库
import redis
# 通过job提交时传递的模块依赖 runtime_env 配置 py_modules,通过 py_nodules传递过来就可以直接在job中导入
from test_module import test_1
import stk12
# 创建一个连接redeis对象,通过redis作为中转向job传递输入并获取job的输出
redis_cli = redis.Redis(host='192.168.6.205', port=6379, decode_responses=True)
# 通过redis获取传入过来的参数
input_params_value = None
if len(sys.argv) > 1:
input_params_key = sys.argv[1]
input_params_value = json.loads(redis_cli.get(input_params_key))
# 执行远程任务
@ray.remote
def hello_world(value):
return [v + 100 for v in value]
ray.init()
# 输出传递过来的参数
print("input_params_value:", input_params_value, type(input_params_key))
# 执行远程函数
result = ray.get(hello_world.remote(input_params_value))
# 获取输出key
output_key = input_params_key.split(":")[0] + ":output"
# 将输出结果放入redis
redis_cli.set(output_key, json.dumps(result))
# 测试传递过来的Python依赖库是否能正常导入
print(test_1.test_1())
print(stk12.__dir__())
# 模块路径: test_module/test_1.py
def test_1():
return "test_1"
import json
from ray.job_submission import JobSubmissionClient, JobStatus
import time
import uuid
import redis
# 上传un到ray集群供job使用的模块
import test_module
from agi import stk12
# 创建一个连接redeis对象
redis_cli = redis.Redis(host='192.168.6.205', port=6379, decode_responses=True)
# 创建一个client,指定远程ray集群的head地址
client = JobSubmissionClient("http://127.0.0.1:8265")
# 创建任务的ID
id = uuid.uuid4().hex
input_params_key = f"{id}:input"
input_params_value = [1, 2, 3, 4, 5]
# 将输入参数存入redis,供远程函数job使用
redis_cli.set(input_params_key, json.dumps(input_params_value))
# 提交一个ray job 是一个独立的ray应用程序
job_id = client.submit_job(
# 执行该job的入口脚本
entrypoint=f"python test_job.py {input_params_key}",
# 将本地文件上传到ray集群
runtime_env={
"working_dir": "./",
"py_modules": [test_module, stk12],
"env_vars": {"testenv": "test-1"}
},
# 自定义任务ID
submission_id=f"{id}"
)
# 输出job ID
print("job_id:", job_id)
def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
"""轮询获取Job的状态,当完成时获取任务的的日志输出"""
start = time.time()
while time.time() - start <= timeout_seconds:
# 获取任务的状态
status = client.get_job_status(job_id)
print(f"status: {status}")
# 检查任务的状态
if status in status_to_wait_for:
break
time.sleep(1)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
# 输出job日志
logs = client.get_job_logs(job_id)
print(logs)
# 输出从job中获取的任务
output_key = job_id + ":output"
output_value = redis_cli.get(output_key)
print("output:", output_value)
from ray.job_submission import JobSubmissionClient, JobDetails, JobInfo, JobType, JobStatus
# 创建一个job提交客户端,如果管理多个ray集群的Job则切换或者创建多个连接ray head节点的客户端
job_cli = JobSubmissionClient("http://127.0.0.1:8265")
# Job信息,对应Job中submission_id属性
job_id = "b9ad6ff9ada445a29fb54307f1394594"
job_info = job_cli.get_job_info(job_id)
# 获取提交的所有job
jobs = job_cli.list_jobs()
for job in jobs:
# 获取job的状态
job_status = job_cli.get_job_status(job.submission_id)
print(f"job_id: {job.submission_id}, job_status: {job_status}")
# 输出job的json格式详情
print("job:", job.json())
# 停止Job
job_cli.stop_job(job_id)
# 删除 job
# job_cli.delete_job(job_id)
# 提交 Job
# job_cli.submit_job()
# 获取版本信息
print("version:", job_cli.get_version())
镜像文件打包下载、文件同步、运维脚本、数据导出与同步、镜像同步、服务启停、TATC卫星项目中算法任务的执行、批量同类型任务的计算(如卫星项目中卫星轨迹的计算)、备份任务 。
最后此篇关于ray-分布式计算框架-集群与异步Job管理的文章就讲到这里了,如果你想了解更多关于ray-分布式计算框架-集群与异步Job管理的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
由于 PostgreSQL 人员选择的术语不当,这会让一些人感到困惑,但请耐心等待... 我们需要能够支持多个 PostgreSQL (PG) 集群,并将它们集群在多个服务器上,例如使用repmgr.
Hadoop会跑很多jobs,从Hbase读取数据,写数据到数据库。假设我有 100 个节点,那么有两种方法可以构建我的 Hadoop/Hbase集群: 100节点hadoop&hbase集群(1个b
在kafka中,我们能创建多种类型的集群,一般如下: 单节点——单个broker集群 单节点——多broker集群 多结点——多broker集群 kafka集群中主要有五个组件: Topic:主题主要
我想在两台机器上运行我的 MPI 程序,一台是 ubuntu 18.04,另一台是 Windows 10。是否可以使用不同的操作系统组成一个集群? (我正在使用 MPICH) 如果可能,怎么做?我在网
简介: 更新负载平衡集群中的节点的最佳实践是什么? 我们在 ha 代理负载均衡器后面使用 RabbitMQ 集群来支持我们的客户端轻松集群,as suggested in the RabbitMQ d
我正在尝试创建一个图表,我明确指定了许多节点的等级并指定了集群。以下代码生成我想要的图形类型: digraph { rankdir=LR subgraph cluster0 {
我正在尝试在 RABBITMQ 上进行集群。我添加了 2 个节点,但无法添加第 3 个节点。我已经聚集了 rabbit@node1 和 rabbit@node2。现在我正在尝试将 rabbit@nod
我在 MS Azure 的生产环境中启用了一个双集群多区域 HA。 我被要求重用同一个集群来使用微服务管理几个新项目。 这里的最佳做法是什么?我应该为每个应用程序创建一个集群吗?将不同集群中的每个项目
我正在尝试 flex 搜索,并且很难在具有3个 flex 搜索实例的单台计算机上创建集群。我对实例进行了以下更改: 在所有实例上更改了群集名称{cluster.name:es-stack} 在所有实例
我想创建一个redis集群,仅将特定的redis主机作为主服务器和从服务器。另外,我想自己指定哪个主机应该是哪个主机的副本。 目前,cluster-create command当在任何一个 Redis
我计划在具有负载平衡的集群中设置 Magento,那么是否可以让两个 Magento 安装指向同一个数据库? 如果可能的话 - 我应该如何配置这些服务器的基本 URL?服务器应该具有相同的名称吗? 最
我目前正在计划一个 Web 应用程序,我想计划它最终在集群上运行。 集群将由一个 php web 集群和一个 mysql 集群以及一个独立的存储单元组成(也许是一个集群,我真的不知道它是如何工作的:s
我已经安装了 elasticsearch 2.2.3 并在 2 个节点的集群中配置 节点 1 (elasticsearch.yml) cluster.name: my-cluster node.nam
我正在寻找现有的中间件解决方案,以解决服务集群/分布方面的问题,以实现负载平衡和可用性。我正在考虑基于消息传递系统(更具体地说,JMS)为此构建自己的基础设施。但是,如果可能的话,我宁愿使用已经存在的
在 Hadoop 集群中,数据是自动跨数据节点复制还是必须编程? 如果必须编程,那我该怎么做呢? 最佳答案 dfs.replication 的默认值为 3。这存在于您的 hdfs.site.xml 中
我已经设置了一个具有 1 个主节点和 2 个从节点的 redis 集群,哨兵在所有 3 个节点上运行。 在此设置之前,我的应用程序指向运行 Redis 实例的单个节点。 集群搭建完成后,我的应用应该指
所以,我正在设计一个具有多个 redis 实例的分布式系统来分解大量的流式写入,但发现很难清楚地了解事情是如何工作的。 从我读到的内容来看,正确配置的集群似乎会自动对“错误实例”上的请求进行分片和重定
我有一个关于redis集群架构的问题。 我正在设置一个 Redis 集群,并遵循基本建议:3 硕士3个奴隶 有什么方法可以在 Amazon LB、HAProxy、Nginx 等负载均衡器后面配置此集群
那么集群背后的想法是什么? 您有多台机器具有相同的数据库副本,您在其中传播读/写?这是正确的吗? 这个想法如何运作?当我进行选择查询时,集群会分析哪个服务器的读/写较少并将我的查询指向该服务器? 当您
目录 一.系统环境 二.前言 三.Kubernetes 3.1 概述 3.2 Kube
我是一名优秀的程序员,十分优秀!